Create a QuartcMultiplexer which separates streams and datagrams into channels. QuartcMultiplexer effectively reorganizes the calls and callbacks for QuartcSession into three categories: - Per-channel send events - Per-channel receive events - Session-wide events that are not multiplexed QuartcMultiplexer works at a low level to hide streams and messages from other senders and receivers. It consists of a core multiplexer object which interacts with the QuartcEndpoint/Session, a channel sender which handles outgoing data, and a channel receiver which handles incoming data. The sender has a specific channel id, specified on creation. A channel writes this channel id in a varint form at the start of each new stream or datagram it sends. The multiplexer intercepts all the callbacks for incoming streams and datagrams. It reads a varint from the start of each stream or datagram to identify the channel id. It then looks up a receiver for that channel and delegates the stream or datagram to that receiver. A default receiver may be registered to handle all streams or datagrams not assigned to a specific receiver. This allows endpoints to dispatch unhandled data to a catch-all, or to await incoming data before registering a specific receiver. The latter may be useful to in conjunction with some forms of negotiation; eg. when certain ranges of channel ids are allocated to different protocols, but it is unknown which protocol will be used at startup. gfe-relnote: n/a (quartc only) PiperOrigin-RevId: 260588723 Change-Id: I4a3b815b48c4f825c47bc60b0b3fd76d4e3614a5
diff --git a/quic/quartc/quartc_multiplexer.cc b/quic/quartc/quartc_multiplexer.cc new file mode 100644 index 0000000..001e219 --- /dev/null +++ b/quic/quartc/quartc_multiplexer.cc
@@ -0,0 +1,269 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h" + +#include <cstdint> + +#include "net/third_party/quiche/src/quic/core/quic_data_writer.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h" + +namespace quic { + +QuartcSendChannel::QuartcSendChannel(QuartcMultiplexer* multiplexer, + uint64_t id, + QuicBufferAllocator* allocator, + Delegate* delegate) + : multiplexer_(multiplexer), + id_(id), + encoded_length_(QuicDataWriter::GetVarInt62Len(id_)), + allocator_(allocator), + delegate_(delegate) {} + +QuartcStream* QuartcSendChannel::CreateOutgoingBidirectionalStream() { + if (!session_) { + QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_; + return nullptr; + } + QuicMemSlice id_slice = EncodeChannelId(); + + QuartcStream* stream = session_->CreateOutgoingBidirectionalStream(); + QuicConsumedData consumed = + stream->WriteMemSlices(QuicMemSliceSpan(&id_slice), /*fin=*/false); + DCHECK_EQ(consumed.bytes_consumed, encoded_length_); + return stream; +} + +bool QuartcSendChannel::SendOrQueueMessage(QuicMemSliceSpan message, + int64_t datagram_id) { + if (!session_) { + QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_ + << "datagram size=" << message.total_length(); + return false; + } + QuicMemSliceStorage storage(nullptr, 0, nullptr, 0); // Empty storage. + storage.Append(EncodeChannelId()); + + message.ConsumeAll( + [&storage](QuicMemSlice slice) { storage.Append(std::move(slice)); }); + + // Allocate a unique datagram id so that notifications can be routed back to + // the right send channel. + int64_t unique_datagram_id = multiplexer_->AllocateDatagramId(this); + multiplexer_to_user_datagram_ids_[unique_datagram_id] = datagram_id; + + return session_->SendOrQueueMessage(storage.ToSpan(), unique_datagram_id); +} + +void QuartcSendChannel::OnMessageSent(int64_t datagram_id) { + // Map back to the caller-chosen |datagram_id|. + datagram_id = multiplexer_to_user_datagram_ids_[datagram_id]; + delegate_->OnMessageSent(datagram_id); +} + +void QuartcSendChannel::OnMessageAcked(int64_t datagram_id, + QuicTime receive_timestamp) { + // Map back to the caller-chosen |datagram_id|. + auto it = multiplexer_to_user_datagram_ids_.find(datagram_id); + if (it == multiplexer_to_user_datagram_ids_.end()) { + QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id=" + << datagram_id; + return; + } + delegate_->OnMessageAcked(it->second, receive_timestamp); + multiplexer_to_user_datagram_ids_.erase(it); +} + +void QuartcSendChannel::OnMessageLost(int64_t datagram_id) { + // Map back to the caller-chosen |datagram_id|. + auto it = multiplexer_to_user_datagram_ids_.find(datagram_id); + if (it == multiplexer_to_user_datagram_ids_.end()) { + QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id=" + << datagram_id; + return; + } + delegate_->OnMessageLost(it->second); + multiplexer_to_user_datagram_ids_.erase(it); +} + +void QuartcSendChannel::OnSessionCreated(QuartcSession* session) { + session_ = session; +} + +QuicMemSlice QuartcSendChannel::EncodeChannelId() { + QuicMemSlice id_slice(allocator_, encoded_length_); + QuicDataWriter writer(encoded_length_, const_cast<char*>(id_slice.data())); + writer.WriteVarInt62(id_); + return id_slice; +} + +QuartcMultiplexer::QuartcMultiplexer( + QuicBufferAllocator* allocator, + QuartcSessionEventDelegate* session_delegate, + QuartcReceiveChannel* default_receive_channel) + : allocator_(allocator), + session_delegate_(session_delegate), + default_receive_channel_(default_receive_channel) { + CHECK_NE(session_delegate_, nullptr); + CHECK_NE(default_receive_channel_, nullptr); +} + +QuartcSendChannel* QuartcMultiplexer::CreateSendChannel( + uint64_t channel_id, + QuartcSendChannel::Delegate* delegate) { + send_channels_.push_back(QuicMakeUnique<QuartcSendChannel>( + this, channel_id, allocator_, delegate)); + if (session_) { + send_channels_.back()->OnSessionCreated(session_); + } + return send_channels_.back().get(); +} + +void QuartcMultiplexer::RegisterReceiveChannel(uint64_t channel_id, + QuartcReceiveChannel* channel) { + if (channel == nullptr) { + receive_channels_.erase(channel_id); + return; + } + auto& registered_channel = receive_channels_[channel_id]; + if (registered_channel) { + QUIC_LOG(DFATAL) << "Attempted to overwrite existing channel_id=" + << channel_id; + return; + } + registered_channel = channel; +} + +int64_t QuartcMultiplexer::AllocateDatagramId(QuartcSendChannel* channel) { + send_channels_by_datagram_id_[next_datagram_id_] = channel; + return next_datagram_id_++; +} + +void QuartcMultiplexer::OnSessionCreated(QuartcSession* session) { + for (auto& channel : send_channels_) { + channel->OnSessionCreated(session); + } + session_ = session; + session_delegate_->OnSessionCreated(session); +} + +void QuartcMultiplexer::OnCryptoHandshakeComplete() { + session_delegate_->OnCryptoHandshakeComplete(); +} + +void QuartcMultiplexer::OnConnectionWritable() { + session_delegate_->OnConnectionWritable(); +} + +void QuartcMultiplexer::OnIncomingStream(QuartcStream* stream) { + stream->SetDelegate(this); +} + +void QuartcMultiplexer::OnCongestionControlChange( + QuicBandwidth bandwidth_estimate, + QuicBandwidth pacing_rate, + QuicTime::Delta latest_rtt) { + session_delegate_->OnCongestionControlChange(bandwidth_estimate, pacing_rate, + latest_rtt); +} + +void QuartcMultiplexer::OnConnectionClosed( + const QuicConnectionCloseFrame& frame, + ConnectionCloseSource source) { + session_delegate_->OnConnectionClosed(frame, source); +} + +void QuartcMultiplexer::OnMessageReceived(QuicStringPiece message) { + QuicDataReader reader(message); + QuicVariableLengthIntegerLength channel_id_length = + reader.PeekVarInt62Length(); + + uint64_t channel_id; + if (!reader.ReadVarInt62(&channel_id)) { + QUIC_LOG(DFATAL) << "Received message without properly encoded channel id"; + return; + } + + QuartcReceiveChannel* channel = default_receive_channel_; + auto it = receive_channels_.find(channel_id); + if (it != receive_channels_.end()) { + channel = it->second; + } + + channel->OnMessageReceived(channel_id, message.substr(channel_id_length)); +} + +void QuartcMultiplexer::OnMessageSent(int64_t datagram_id) { + auto it = send_channels_by_datagram_id_.find(datagram_id); + if (it == send_channels_by_datagram_id_.end()) { + return; + } + it->second->OnMessageSent(datagram_id); +} + +void QuartcMultiplexer::OnMessageAcked(int64_t datagram_id, + QuicTime receive_timestamp) { + auto it = send_channels_by_datagram_id_.find(datagram_id); + if (it == send_channels_by_datagram_id_.end()) { + return; + } + it->second->OnMessageAcked(datagram_id, receive_timestamp); + send_channels_by_datagram_id_.erase(it); +} + +void QuartcMultiplexer::OnMessageLost(int64_t datagram_id) { + auto it = send_channels_by_datagram_id_.find(datagram_id); + if (it == send_channels_by_datagram_id_.end()) { + return; + } + it->second->OnMessageLost(datagram_id); + send_channels_by_datagram_id_.erase(it); +} + +size_t QuartcMultiplexer::OnReceived(QuartcStream* stream, + iovec* iov, + size_t iov_length, + bool /*fin*/) { + if (iov == nullptr || iov_length <= 0) { + return 0; + } + + QuicDataReader reader(static_cast<char*>(iov[0].iov_base), iov[0].iov_len); + QuicVariableLengthIntegerLength channel_id_length = + reader.PeekVarInt62Length(); + + uint64_t channel_id; + if (reader.BytesRemaining() >= channel_id_length) { + // Fast path, have enough data to read immediately. + if (!reader.ReadVarInt62(&channel_id)) { + return 0; + } + } else { + // Slow path, need to coalesce multiple iovecs. + std::string data; + for (size_t i = 0; i < iov_length; ++i) { + data += std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len); + } + QuicDataReader combined_reader(data); + if (!combined_reader.ReadVarInt62(&channel_id)) { + return 0; + } + } + + QuartcReceiveChannel* channel = default_receive_channel_; + auto it = receive_channels_.find(channel_id); + if (it != receive_channels_.end()) { + channel = it->second; + } + channel->OnIncomingStream(channel_id, stream); + return channel_id_length; +} + +void QuartcMultiplexer::OnClose(QuartcStream* /*stream*/) {} + +void QuartcMultiplexer::OnBufferChanged(QuartcStream* /*stream*/) {} + +} // namespace quic
diff --git a/quic/quartc/quartc_multiplexer.h b/quic/quartc/quartc_multiplexer.h new file mode 100644 index 0000000..aa19e70 --- /dev/null +++ b/quic/quartc/quartc_multiplexer.h
@@ -0,0 +1,190 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_QUARTC_QUARTC_MULTIPLEXER_H_ +#define QUICHE_QUIC_QUARTC_QUARTC_MULTIPLEXER_H_ + +#include <cstdint> + +#include "net/third_party/quiche/src/quic/core/quic_time.h" +#include "net/third_party/quiche/src/quic/core/quic_types.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h" +#include "net/third_party/quiche/src/quic/quartc/quartc_endpoint.h" +#include "net/third_party/quiche/src/quic/quartc/quartc_session.h" +#include "net/third_party/quiche/src/quic/quartc/quartc_stream.h" + +namespace quic { + +class QuartcMultiplexer; + +// A single, multiplexed send channel within a Quartc session. A send channel +// wraps send-side operations with an outgoing multiplex id. +class QuartcSendChannel { + public: + class Delegate { + public: + virtual ~Delegate() = default; + + // Called when a message with |datagram_id| is sent by this channel. + virtual void OnMessageSent(int64_t datagram_id) = 0; + + // Called when a message sent on this channel with |datagram_id| is acked. + // |receive_timestamp| indicates when the peer received this message, + // according to the peer's clock. + virtual void OnMessageAcked(int64_t datagram_id, + QuicTime receive_timestamp) = 0; + + // Called when a message sent on this channel with |datagram_id| is lost. + virtual void OnMessageLost(int64_t datagram_id) = 0; + }; + + QuartcSendChannel(QuartcMultiplexer* multiplexer, + uint64_t id, + QuicBufferAllocator* allocator, + Delegate* delegate); + virtual ~QuartcSendChannel() = default; + + // Creates a new, outgoing stream on this channel. + // + // Automatically writes the channel id to the start of the stream. The caller + // SHOULD create a |ScopedPacketFlusher| before calling this function to + // prevent the channel id from being sent by itself. + QuartcStream* CreateOutgoingBidirectionalStream(); + + // Writes |message| to the session. Prepends the channel's send id before any + // following message data. + bool SendOrQueueMessage(QuicMemSliceSpan message, int64_t datagram_id); + + // Gets the current largest message payload for this channel. Returns the + // largest payload size supported by the session minus overhead required to + // encode this channel's send id. + QuicPacketLength GetCurrentLargestMessagePayload() const; + + // The following are called by the multiplexer to deliver message + // notifications. The |datagram_id| passed to these is unique per-message, + // and must be translated back to the sender's chosen datagram_id. + void OnMessageSent(int64_t datagram_id); + void OnMessageAcked(int64_t datagram_id, QuicTime receive_timestamp); + void OnMessageLost(int64_t datagram_id); + void OnSessionCreated(QuartcSession* session); + + private: + // Creates a mem slice containing a varint-62 encoded channel id. + QuicMemSlice EncodeChannelId(); + + QuartcMultiplexer* const multiplexer_; + const uint64_t id_; + const QuicVariableLengthIntegerLength encoded_length_; + QuicBufferAllocator* const allocator_; + Delegate* const delegate_; + + QuartcSession* session_; + + // Map of multiplexer-chosen to user/caller-specified datagram ids. The user + // may specify any number as a datagram's id. This number does not have to be + // unique across channels (nor even within a single channel). In order + // to demux sent, acked, and lost messages, the multiplexer assigns a globally + // unique id to each message. This map is used to restore the original caller + // datagram id before issuing callbacks. + QuicUnorderedMap<int64_t, int64_t> multiplexer_to_user_datagram_ids_; +}; + +// A single, multiplexed receive channel within a Quartc session. A receive +// channel is a delegate which accepts incoming streams and datagrams on one (or +// more) channel ids. +class QuartcReceiveChannel { + public: + virtual ~QuartcReceiveChannel() = default; + + // Called when a new incoming stream arrives on this channel. + virtual void OnIncomingStream(uint64_t channel_id, QuartcStream* stream) = 0; + + // Called when a message is recieved by this channel. + virtual void OnMessageReceived(uint64_t channel_id, + QuicStringPiece message) = 0; +}; + +// Delegate for session-wide events. +class QuartcSessionEventDelegate { + public: + virtual ~QuartcSessionEventDelegate() = default; + + virtual void OnSessionCreated(QuartcSession* session) = 0; + virtual void OnCryptoHandshakeComplete() = 0; + virtual void OnConnectionWritable() = 0; + virtual void OnCongestionControlChange(QuicBandwidth bandwidth_estimate, + QuicBandwidth pacing_rate, + QuicTime::Delta latest_rtt) = 0; + virtual void OnConnectionClosed(const QuicConnectionCloseFrame& frame, + ConnectionCloseSource source) = 0; +}; + +// A multiplexer capable of sending and receiving data on multiple channels. +class QuartcMultiplexer : public QuartcEndpoint::Delegate, + public QuartcStream::Delegate { + public: + // Creates a new multiplexer. |session_delegate| handles all session-wide + // events, while |default_receive_channel| handles incoming data on unknown + // or unregistered channel ids. Neither |session_delegate| nor + // |default_receive_channel| may be nullptr, and both must outlive the + // multiplexer. + QuartcMultiplexer(QuicBufferAllocator* allocator, + QuartcSessionEventDelegate* session_delegate, + QuartcReceiveChannel* default_receive_channel); + + // Creates a new send channel. The channel is owned by the multiplexer, and + // references to it must not outlive the multiplexer. + QuartcSendChannel* CreateSendChannel(uint64_t channel_id, + QuartcSendChannel::Delegate* delegate); + + // Registers a receiver for incoming data on |channel_id|. + void RegisterReceiveChannel(uint64_t channel_id, + QuartcReceiveChannel* channel); + + // Allocates a datagram id to |channel|. + int64_t AllocateDatagramId(QuartcSendChannel* channel); + + // QuartcEndpoint::Delegate overrides. + void OnSessionCreated(QuartcSession* session) override; + + // QuartcSession::Delegate overrides. + void OnCryptoHandshakeComplete() override; + void OnConnectionWritable() override; + void OnIncomingStream(QuartcStream* stream) override; + void OnCongestionControlChange(QuicBandwidth bandwidth_estimate, + QuicBandwidth pacing_rate, + QuicTime::Delta latest_rtt) override; + void OnConnectionClosed(const QuicConnectionCloseFrame& frame, + ConnectionCloseSource source) override; + void OnMessageReceived(QuicStringPiece message) override; + void OnMessageSent(int64_t datagram_id) override; + void OnMessageAcked(int64_t datagram_id, QuicTime receive_timestamp) override; + void OnMessageLost(int64_t datagram_id) override; + + // QuartcStream::Delegate overrides. + size_t OnReceived(QuartcStream* stream, + iovec* iov, + size_t iov_length, + bool fin) override; + void OnClose(QuartcStream* stream) override; + void OnBufferChanged(QuartcStream* stream) override; + + private: + QuicBufferAllocator* const allocator_; + QuartcSessionEventDelegate* const session_delegate_; + + QuartcSession* session_; + std::vector<std::unique_ptr<QuartcSendChannel>> send_channels_; + QuicUnorderedMap<uint64_t, QuartcReceiveChannel*> receive_channels_; + QuartcReceiveChannel* default_receive_channel_ = nullptr; + + int64_t next_datagram_id_ = 1; + QuicUnorderedMap<int64_t, QuartcSendChannel*> send_channels_by_datagram_id_; +}; + +} // namespace quic + +#endif // QUICHE_QUIC_QUARTC_QUARTC_MULTIPLEXER_H_
diff --git a/quic/quartc/quartc_multiplexer_test.cc b/quic/quartc/quartc_multiplexer_test.cc new file mode 100644 index 0000000..64609cb --- /dev/null +++ b/quic/quartc/quartc_multiplexer_test.cc
@@ -0,0 +1,490 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h" + +#include <memory> + +#include "net/third_party/quiche/src/quic/core/frames/quic_connection_close_frame.h" +#include "net/third_party/quiche/src/quic/core/quic_bandwidth.h" +#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h" +#include "net/third_party/quiche/src/quic/core/quic_time.h" +#include "net/third_party/quiche/src/quic/core/quic_types.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_test.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_test_mem_slice_vector.h" +#include "net/third_party/quiche/src/quic/quartc/counting_packet_filter.h" +#include "net/third_party/quiche/src/quic/quartc/quartc_endpoint.h" +#include "net/third_party/quiche/src/quic/quartc/quartc_fakes.h" +#include "net/third_party/quiche/src/quic/quartc/quartc_session.h" +#include "net/third_party/quiche/src/quic/quartc/quartc_stream.h" +#include "net/third_party/quiche/src/quic/quartc/simulated_packet_transport.h" +#include "net/third_party/quiche/src/quic/test_tools/simulator/link.h" +#include "net/third_party/quiche/src/quic/test_tools/simulator/simulator.h" + +namespace quic { +namespace { + +using ::testing::ElementsAreArray; +using ::testing::Gt; +using ::testing::IsEmpty; +using ::testing::Pair; + +constexpr QuicTime::Delta kPropagationDelay = + QuicTime::Delta::FromMilliseconds(10); + +class FakeSessionEventDelegate : public QuartcSessionEventDelegate { + public: + void OnSessionCreated(QuartcSession* session) override { + session->StartCryptoHandshake(); + session_ = session; + } + + void OnConnectionWritable() override { ++writable_count_; } + + void OnCryptoHandshakeComplete() override { ++handshake_count_; } + + void OnConnectionClosed(const QuicConnectionCloseFrame& frame, + ConnectionCloseSource source) override { + error_ = frame.quic_error_code; + close_source_ = source; + } + + void OnCongestionControlChange(QuicBandwidth bandwidth_estimate, + QuicBandwidth pacing_rate, + QuicTime::Delta latest_rtt) override { + latest_bandwidth_estimate_ = bandwidth_estimate; + latest_pacing_rate_ = pacing_rate; + latest_rtt_ = latest_rtt; + } + + QuartcSession* session() { return session_; } + int writable_count() const { return writable_count_; } + int handshake_count() const { return handshake_count_; } + QuicErrorCode error() const { return error_; } + ConnectionCloseSource close_source() const { return close_source_; } + QuicBandwidth latest_bandwidth_estimate() const { + return latest_bandwidth_estimate_; + } + QuicBandwidth latest_pacing_rate() const { return latest_pacing_rate_; } + QuicTime::Delta latest_rtt() const { return latest_rtt_; } + + private: + QuartcSession* session_ = nullptr; + int writable_count_ = 0; + int handshake_count_ = 0; + QuicErrorCode error_ = QUIC_NO_ERROR; + ConnectionCloseSource close_source_; + QuicBandwidth latest_bandwidth_estimate_ = QuicBandwidth::Zero(); + QuicBandwidth latest_pacing_rate_ = QuicBandwidth::Zero(); + QuicTime::Delta latest_rtt_ = QuicTime::Delta::Zero(); +}; + +class FakeSendDelegate : public QuartcSendChannel::Delegate { + public: + void OnMessageSent(int64_t datagram_id) override { + datagrams_sent_.push_back(datagram_id); + } + + void OnMessageAcked(int64_t datagram_id, + QuicTime receive_timestamp) override { + datagrams_acked_.push_back({datagram_id, receive_timestamp}); + } + + void OnMessageLost(int64_t datagram_id) override { + datagrams_lost_.push_back(datagram_id); + } + + const std::vector<int64_t>& datagrams_sent() const { return datagrams_sent_; } + const std::vector<std::pair<int64_t, QuicTime>>& datagrams_acked() const { + return datagrams_acked_; + } + const std::vector<int64_t>& datagrams_lost() const { return datagrams_lost_; } + + private: + std::vector<int64_t> datagrams_sent_; + std::vector<std::pair<int64_t, QuicTime>> datagrams_acked_; + std::vector<int64_t> datagrams_lost_; +}; + +class FakeReceiveDelegate : public QuartcReceiveChannel, + public QuartcStream::Delegate { + public: + const std::vector<std::pair<uint64_t, std::string>> messages_received() + const { + return messages_received_; + } + + void OnIncomingStream(uint64_t channel_id, QuartcStream* stream) override { + stream->SetDelegate(this); + stream_to_channel_id_[stream] = channel_id; + } + + void OnMessageReceived(uint64_t channel_id, + QuicStringPiece message) override { + messages_received_.emplace_back(channel_id, message); + } + + // Stream delegate overrides. + size_t OnReceived(QuartcStream* stream, + iovec* iov, + size_t iov_length, + bool fin) override { + if (!fin) { + return 0; + } + + size_t bytes = 0; + std::string message; + for (size_t i = 0; i < iov_length; ++i) { + message += + std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len); + bytes += iov[i].iov_len; + } + QUIC_LOG(INFO) << "Received " << bytes << " byte message on channel " + << stream_to_channel_id_[stream]; + messages_received_.emplace_back(stream_to_channel_id_[stream], message); + return bytes; + } + + void OnClose(QuartcStream* stream) override { + stream_to_channel_id_.erase(stream); + } + + void OnBufferChanged(QuartcStream* /*stream*/) override {} + + private: + std::vector<std::pair<uint64_t, std::string>> messages_received_; + QuicUnorderedMap<QuartcStream*, uint64_t> stream_to_channel_id_; +}; + +class QuartcMultiplexerTest : public QuicTest { + public: + QuartcMultiplexerTest() + : simulator_(), + client_transport_(&simulator_, + "client_transport", + "server_transport", + 10 * kDefaultMaxPacketSize), + server_transport_(&simulator_, + "server_transport", + "client_transport", + 10 * kDefaultMaxPacketSize), + client_filter_(&simulator_, "client_filter", &client_transport_), + client_server_link_(&client_filter_, + &server_transport_, + QuicBandwidth::FromKBitsPerSecond(10 * 1000), + kPropagationDelay), + client_multiplexer_(simulator_.GetStreamSendBufferAllocator(), + &client_session_delegate_, + &client_default_receiver_), + server_multiplexer_(simulator_.GetStreamSendBufferAllocator(), + &server_session_delegate_, + &server_default_receiver_), + client_endpoint_(QuicMakeUnique<QuartcClientEndpoint>( + simulator_.GetAlarmFactory(), + simulator_.GetClock(), + simulator_.GetRandomGenerator(), + &client_multiplexer_, + quic::QuartcSessionConfig(), + /*serialized_server_config=*/"")), + server_endpoint_(QuicMakeUnique<QuartcServerEndpoint>( + simulator_.GetAlarmFactory(), + simulator_.GetClock(), + simulator_.GetRandomGenerator(), + &server_multiplexer_, + quic::QuartcSessionConfig())) { + // TODO(b/134175506): Remove when IETF QUIC supports receive timestamps. + SetQuicReloadableFlag(quic_enable_version_99, false); + } + + void Connect() { + client_endpoint_->Connect(&client_transport_); + server_endpoint_->Connect(&server_transport_); + ASSERT_TRUE(simulator_.RunUntil([this]() { + return client_session_delegate_.writable_count() > 0 && + server_session_delegate_.writable_count() > 0; + })); + } + + void Disconnect() { + client_session_delegate_.session()->CloseConnection("test"); + server_session_delegate_.session()->CloseConnection("test"); + } + + protected: + QuartcMultiplexer* client_multiplexer() { return &client_multiplexer_; } + + QuartcMultiplexer* server_multiplexer() { return &server_multiplexer_; } + + simulator::Simulator simulator_; + + simulator::SimulatedQuartcPacketTransport client_transport_; + simulator::SimulatedQuartcPacketTransport server_transport_; + simulator::CountingPacketFilter client_filter_; + simulator::SymmetricLink client_server_link_; + + FakeSessionEventDelegate client_session_delegate_; + FakeSessionEventDelegate server_session_delegate_; + + FakeReceiveDelegate client_default_receiver_; + FakeReceiveDelegate server_default_receiver_; + + QuartcMultiplexer client_multiplexer_; + QuartcMultiplexer server_multiplexer_; + + std::unique_ptr<QuartcClientEndpoint> client_endpoint_; + std::unique_ptr<QuartcServerEndpoint> server_endpoint_; +}; + +TEST_F(QuartcMultiplexerTest, MultiplexMessages) { + Connect(); + + FakeSendDelegate send_delegate_1; + QuartcSendChannel* send_channel_1 = + client_multiplexer()->CreateSendChannel(1, &send_delegate_1); + FakeSendDelegate send_delegate_2; + QuartcSendChannel* send_channel_2 = + client_multiplexer()->CreateSendChannel(2, &send_delegate_2); + + FakeReceiveDelegate receive_delegate_1; + server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1); + + int num_messages = 10; + std::vector<std::pair<uint64_t, std::string>> messages_1; + messages_1.reserve(num_messages); + std::vector<std::pair<uint64_t, std::string>> messages_2; + messages_2.reserve(num_messages); + std::vector<int64_t> messages_sent_1; + std::vector<int64_t> messages_sent_2; + std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_1; + std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_2; + for (int i = 0; i < num_messages; ++i) { + messages_1.emplace_back(1, QuicStrCat("message for 1: ", i)); + test::QuicTestMemSliceVector slice_1( + {std::make_pair(const_cast<char*>(messages_1.back().second.data()), + messages_1.back().second.size())}); + send_channel_1->SendOrQueueMessage(slice_1.span(), i); + messages_sent_1.push_back(i); + ack_matchers_1.push_back(Pair(i, Gt(QuicTime::Zero()))); + + messages_2.emplace_back(2, QuicStrCat("message for 2: ", i)); + test::QuicTestMemSliceVector slice_2( + {std::make_pair(const_cast<char*>(messages_2.back().second.data()), + messages_2.back().second.size())}); + // Use i + 5 as the datagram id for channel 2, so that some of the ids + // overlap and some are disjoint. + send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5); + messages_sent_2.push_back(i + 5); + ack_matchers_2.push_back(Pair(i + 5, Gt(QuicTime::Zero()))); + } + + EXPECT_TRUE(simulator_.RunUntil([&send_delegate_1, &send_delegate_2]() { + return send_delegate_1.datagrams_acked().size() == 10 && + send_delegate_2.datagrams_acked().size() == 10; + })); + + EXPECT_EQ(send_delegate_1.datagrams_sent(), messages_sent_1); + EXPECT_EQ(send_delegate_2.datagrams_sent(), messages_sent_2); + + EXPECT_EQ(receive_delegate_1.messages_received(), messages_1); + EXPECT_EQ(server_default_receiver_.messages_received(), messages_2); + + EXPECT_THAT(send_delegate_1.datagrams_acked(), + ElementsAreArray(ack_matchers_1)); + EXPECT_THAT(send_delegate_2.datagrams_acked(), + ElementsAreArray(ack_matchers_2)); +} + +TEST_F(QuartcMultiplexerTest, MultiplexStreams) { + FakeSendDelegate send_delegate_1; + QuartcSendChannel* send_channel_1 = + client_multiplexer()->CreateSendChannel(1, &send_delegate_1); + FakeSendDelegate send_delegate_2; + QuartcSendChannel* send_channel_2 = + client_multiplexer()->CreateSendChannel(2, &send_delegate_2); + + FakeQuartcStreamDelegate fake_send_stream_delegate; + + FakeReceiveDelegate receive_delegate_1; + server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1); + + Connect(); + + int num_messages = 10; + std::vector<std::pair<uint64_t, std::string>> messages_1; + messages_1.reserve(num_messages); + std::vector<std::pair<uint64_t, std::string>> messages_2; + messages_2.reserve(num_messages); + for (int i = 0; i < num_messages; ++i) { + messages_1.emplace_back(1, QuicStrCat("message for 1: ", i)); + test::QuicTestMemSliceVector slice_1( + {std::make_pair(const_cast<char*>(messages_1.back().second.data()), + messages_1.back().second.size())}); + QuartcStream* stream_1 = + send_channel_1->CreateOutgoingBidirectionalStream(); + stream_1->SetDelegate(&fake_send_stream_delegate); + stream_1->WriteMemSlices(slice_1.span(), /*fin=*/true); + + messages_2.emplace_back(2, QuicStrCat("message for 2: ", i)); + test::QuicTestMemSliceVector slice_2( + {std::make_pair(const_cast<char*>(messages_2.back().second.data()), + messages_2.back().second.size())}); + QuartcStream* stream_2 = + send_channel_2->CreateOutgoingBidirectionalStream(); + stream_2->SetDelegate(&fake_send_stream_delegate); + stream_2->WriteMemSlices(slice_2.span(), /*fin=*/true); + } + + EXPECT_TRUE(simulator_.RunUntilOrTimeout( + [this, &receive_delegate_1]() { + return receive_delegate_1.messages_received().size() == 10 && + server_default_receiver_.messages_received().size() == 10; + }, + QuicTime::Delta::FromSeconds(5))); + + EXPECT_EQ(receive_delegate_1.messages_received(), messages_1); + EXPECT_EQ(server_default_receiver_.messages_received(), messages_2); +} + +// Tests that datagram-lost callbacks are invoked on the right send channel +// delegate, and that they work with overlapping datagram ids. +TEST_F(QuartcMultiplexerTest, MultiplexLostDatagrams) { + Connect(); + ASSERT_TRUE(simulator_.RunUntil([this]() { + return client_session_delegate_.handshake_count() > 0 && + server_session_delegate_.handshake_count() > 0; + })); + + // Just drop everything we try to send. + client_filter_.set_packets_to_drop(30); + + FakeSendDelegate send_delegate_1; + QuartcSendChannel* send_channel_1 = + client_multiplexer()->CreateSendChannel(1, &send_delegate_1); + FakeSendDelegate send_delegate_2; + QuartcSendChannel* send_channel_2 = + client_multiplexer()->CreateSendChannel(2, &send_delegate_2); + + FakeQuartcStreamDelegate fake_send_stream_delegate; + + FakeReceiveDelegate receive_delegate_1; + server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1); + + int num_messages = 10; + std::vector<std::pair<uint64_t, std::string>> messages_1; + messages_1.reserve(num_messages); + std::vector<std::pair<uint64_t, std::string>> messages_2; + messages_2.reserve(num_messages); + std::vector<int64_t> messages_sent_1; + std::vector<int64_t> messages_sent_2; + for (int i = 0; i < num_messages; ++i) { + messages_1.emplace_back(1, QuicStrCat("message for 1: ", i)); + test::QuicTestMemSliceVector slice_1( + {std::make_pair(const_cast<char*>(messages_1.back().second.data()), + messages_1.back().second.size())}); + send_channel_1->SendOrQueueMessage(slice_1.span(), i); + messages_sent_1.push_back(i); + + messages_2.emplace_back(2, QuicStrCat("message for 2: ", i)); + test::QuicTestMemSliceVector slice_2( + {std::make_pair(const_cast<char*>(messages_2.back().second.data()), + messages_2.back().second.size())}); + // Use i + 5 as the datagram id for channel 2, so that some of the ids + // overlap and some are disjoint. + send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5); + messages_sent_2.push_back(i + 5); + } + + // Now send something retransmittable to prompt loss detection. + // If we never send anything retransmittable, we will never get acks, and + // never detect losses. + messages_1.emplace_back(1, QuicStrCat("message for 1: ", num_messages)); + test::QuicTestMemSliceVector slice( + {std::make_pair(const_cast<char*>(messages_1.back().second.data()), + messages_1.back().second.size())}); + QuartcStream* stream_1 = send_channel_1->CreateOutgoingBidirectionalStream(); + stream_1->SetDelegate(&fake_send_stream_delegate); + stream_1->WriteMemSlices(slice.span(), /*fin=*/true); + + EXPECT_TRUE(simulator_.RunUntilOrTimeout( + [&send_delegate_1, &send_delegate_2]() { + return send_delegate_1.datagrams_lost().size() == 10 && + send_delegate_2.datagrams_lost().size() == 10; + }, + QuicTime::Delta::FromSeconds(60))); + + EXPECT_EQ(send_delegate_1.datagrams_lost(), messages_sent_1); + EXPECT_EQ(send_delegate_2.datagrams_lost(), messages_sent_2); + + EXPECT_THAT(send_delegate_1.datagrams_acked(), IsEmpty()); + EXPECT_THAT(send_delegate_2.datagrams_acked(), IsEmpty()); + + EXPECT_THAT(receive_delegate_1.messages_received(), IsEmpty()); + EXPECT_THAT(server_default_receiver_.messages_received(), IsEmpty()); +} + +TEST_F(QuartcMultiplexerTest, UnregisterReceiveChannel) { + Connect(); + + FakeSendDelegate send_delegate; + QuartcSendChannel* send_channel = + client_multiplexer()->CreateSendChannel(1, &send_delegate); + FakeQuartcStreamDelegate fake_send_stream_delegate; + + FakeReceiveDelegate receive_delegate; + server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate); + server_multiplexer()->RegisterReceiveChannel(1, nullptr); + + int num_messages = 10; + std::vector<std::pair<uint64_t, std::string>> messages; + messages.reserve(num_messages); + std::vector<int64_t> messages_sent; + std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers; + for (int i = 0; i < num_messages; ++i) { + messages.emplace_back(1, QuicStrCat("message for 1: ", i)); + test::QuicTestMemSliceVector slice( + {std::make_pair(const_cast<char*>(messages.back().second.data()), + messages.back().second.size())}); + send_channel->SendOrQueueMessage(slice.span(), i); + messages_sent.push_back(i); + ack_matchers.push_back(Pair(i, Gt(QuicTime::Zero()))); + } + + EXPECT_TRUE(simulator_.RunUntil([&send_delegate]() { + return send_delegate.datagrams_acked().size() == 10; + })); + + EXPECT_EQ(send_delegate.datagrams_sent(), messages_sent); + EXPECT_EQ(server_default_receiver_.messages_received(), messages); + EXPECT_THAT(send_delegate.datagrams_acked(), ElementsAreArray(ack_matchers)); +} + +TEST_F(QuartcMultiplexerTest, CloseEvent) { + Connect(); + Disconnect(); + + EXPECT_EQ(client_session_delegate_.error(), QUIC_CONNECTION_CANCELLED); + EXPECT_EQ(server_session_delegate_.error(), QUIC_CONNECTION_CANCELLED); +} + +TEST_F(QuartcMultiplexerTest, CongestionEvent) { + Connect(); + ASSERT_TRUE(simulator_.RunUntil([this]() { + return client_session_delegate_.handshake_count() > 0 && + server_session_delegate_.handshake_count() > 0; + })); + + EXPECT_GT(client_session_delegate_.latest_bandwidth_estimate(), + QuicBandwidth::Zero()); + EXPECT_GT(client_session_delegate_.latest_pacing_rate(), + QuicBandwidth::Zero()); + EXPECT_GT(client_session_delegate_.latest_rtt(), QuicTime::Delta::Zero()); +} + +} // namespace +} // namespace quic
diff --git a/quic/quartc/quartc_stream.cc b/quic/quartc/quartc_stream.cc index 607fa43..f18d42d 100644 --- a/quic/quartc/quartc_stream.cc +++ b/quic/quartc/quartc_stream.cc
@@ -27,24 +27,28 @@ QuartcStream::~QuartcStream() {} void QuartcStream::OnDataAvailable() { - bool fin = sequencer()->ReadableBytes() + sequencer()->NumBytesConsumed() == - sequencer()->close_offset(); + size_t bytes_consumed = 0; + do { + bool fin = sequencer()->ReadableBytes() + sequencer()->NumBytesConsumed() == + sequencer()->close_offset(); - // Upper bound on number of readable regions. Each complete block's worth of - // data crosses at most one region boundary. The remainder may cross one more - // boundary. Number of regions is one more than the number of region - // boundaries crossed. - size_t iov_length = sequencer()->ReadableBytes() / - QuicStreamSequencerBuffer::kBlockSizeBytes + - 2; - std::unique_ptr<iovec[]> iovecs = QuicMakeUnique<iovec[]>(iov_length); - iov_length = sequencer()->GetReadableRegions(iovecs.get(), iov_length); + // Upper bound on number of readable regions. Each complete block's worth + // of data crosses at most one region boundary. The remainder may cross one + // more boundary. Number of regions is one more than the number of region + // boundaries crossed. + size_t iov_length = sequencer()->ReadableBytes() / + QuicStreamSequencerBuffer::kBlockSizeBytes + + 2; + std::unique_ptr<iovec[]> iovecs = QuicMakeUnique<iovec[]>(iov_length); + iov_length = sequencer()->GetReadableRegions(iovecs.get(), iov_length); - sequencer()->MarkConsumed( - delegate_->OnReceived(this, iovecs.get(), iov_length, fin)); - if (sequencer()->IsClosed()) { - OnFinRead(); - } + bytes_consumed = delegate_->OnReceived(this, iovecs.get(), iov_length, fin); + sequencer()->MarkConsumed(bytes_consumed); + if (sequencer()->IsClosed()) { + OnFinRead(); + break; + } + } while (bytes_consumed > 0); } void QuartcStream::OnClose() { @@ -56,8 +60,9 @@ void QuartcStream::OnStreamDataConsumed(size_t bytes_consumed) { QuicStream::OnStreamDataConsumed(bytes_consumed); - DCHECK(delegate_); - delegate_->OnBufferChanged(this); + if (delegate_) { + delegate_->OnBufferChanged(this); + } } void QuartcStream::OnDataBuffered( @@ -65,8 +70,9 @@ QuicByteCount /*data_length*/, const QuicReferenceCountedPointer< QuicAckListenerInterface>& /*ack_listener*/) { - DCHECK(delegate_); - delegate_->OnBufferChanged(this); + if (delegate_) { + delegate_->OnBufferChanged(this); + } } bool QuartcStream::OnStreamFrameAcked(QuicStreamOffset offset, @@ -156,10 +162,6 @@ } void QuartcStream::SetDelegate(Delegate* delegate) { - if (delegate_) { - QUIC_LOG(WARNING) << "The delegate for Stream " << id() - << " has already been set."; - } delegate_ = delegate; DCHECK(delegate_); }