Create a QuartcMultiplexer which separates streams and datagrams into channels.

QuartcMultiplexer effectively reorganizes the calls and callbacks for
QuartcSession into three categories:
 - Per-channel send events
 - Per-channel receive events
 - Session-wide events that are not multiplexed

QuartcMultiplexer works at a low level to hide streams and messages from other
senders and receivers.  It consists of a core multiplexer object which interacts
with the QuartcEndpoint/Session, a channel sender which handles outgoing data,
and a channel receiver which handles incoming data.

The sender has a specific channel id, specified on creation.  A channel writes
this channel id in a varint form at the start of each new stream or datagram it
sends.

The multiplexer intercepts all the callbacks for incoming streams and datagrams.
It reads a varint from the start of each stream or datagram to identify the
channel id.  It then looks up a receiver for that channel and delegates the
stream or datagram to that receiver.

A default receiver may be registered to handle all streams or datagrams not
assigned to a specific receiver.  This allows endpoints to dispatch unhandled
data to a catch-all, or to await incoming data before registering a specific
receiver.  The latter may be useful to in conjunction with some forms of
negotiation; eg. when certain ranges of channel ids are allocated to different
protocols, but it is unknown which protocol will be used at startup.

gfe-relnote: n/a (quartc only)
PiperOrigin-RevId: 260588723
Change-Id: I4a3b815b48c4f825c47bc60b0b3fd76d4e3614a5
diff --git a/quic/quartc/quartc_multiplexer.cc b/quic/quartc/quartc_multiplexer.cc
new file mode 100644
index 0000000..001e219
--- /dev/null
+++ b/quic/quartc/quartc_multiplexer.cc
@@ -0,0 +1,269 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h"
+
+#include <cstdint>
+
+#include "net/third_party/quiche/src/quic/core/quic_data_writer.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
+
+namespace quic {
+
+QuartcSendChannel::QuartcSendChannel(QuartcMultiplexer* multiplexer,
+                                     uint64_t id,
+                                     QuicBufferAllocator* allocator,
+                                     Delegate* delegate)
+    : multiplexer_(multiplexer),
+      id_(id),
+      encoded_length_(QuicDataWriter::GetVarInt62Len(id_)),
+      allocator_(allocator),
+      delegate_(delegate) {}
+
+QuartcStream* QuartcSendChannel::CreateOutgoingBidirectionalStream() {
+  if (!session_) {
+    QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_;
+    return nullptr;
+  }
+  QuicMemSlice id_slice = EncodeChannelId();
+
+  QuartcStream* stream = session_->CreateOutgoingBidirectionalStream();
+  QuicConsumedData consumed =
+      stream->WriteMemSlices(QuicMemSliceSpan(&id_slice), /*fin=*/false);
+  DCHECK_EQ(consumed.bytes_consumed, encoded_length_);
+  return stream;
+}
+
+bool QuartcSendChannel::SendOrQueueMessage(QuicMemSliceSpan message,
+                                           int64_t datagram_id) {
+  if (!session_) {
+    QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_
+                     << "datagram size=" << message.total_length();
+    return false;
+  }
+  QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);  // Empty storage.
+  storage.Append(EncodeChannelId());
+
+  message.ConsumeAll(
+      [&storage](QuicMemSlice slice) { storage.Append(std::move(slice)); });
+
+  // Allocate a unique datagram id so that notifications can be routed back to
+  // the right send channel.
+  int64_t unique_datagram_id = multiplexer_->AllocateDatagramId(this);
+  multiplexer_to_user_datagram_ids_[unique_datagram_id] = datagram_id;
+
+  return session_->SendOrQueueMessage(storage.ToSpan(), unique_datagram_id);
+}
+
+void QuartcSendChannel::OnMessageSent(int64_t datagram_id) {
+  // Map back to the caller-chosen |datagram_id|.
+  datagram_id = multiplexer_to_user_datagram_ids_[datagram_id];
+  delegate_->OnMessageSent(datagram_id);
+}
+
+void QuartcSendChannel::OnMessageAcked(int64_t datagram_id,
+                                       QuicTime receive_timestamp) {
+  // Map back to the caller-chosen |datagram_id|.
+  auto it = multiplexer_to_user_datagram_ids_.find(datagram_id);
+  if (it == multiplexer_to_user_datagram_ids_.end()) {
+    QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id="
+                     << datagram_id;
+    return;
+  }
+  delegate_->OnMessageAcked(it->second, receive_timestamp);
+  multiplexer_to_user_datagram_ids_.erase(it);
+}
+
+void QuartcSendChannel::OnMessageLost(int64_t datagram_id) {
+  // Map back to the caller-chosen |datagram_id|.
+  auto it = multiplexer_to_user_datagram_ids_.find(datagram_id);
+  if (it == multiplexer_to_user_datagram_ids_.end()) {
+    QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id="
+                     << datagram_id;
+    return;
+  }
+  delegate_->OnMessageLost(it->second);
+  multiplexer_to_user_datagram_ids_.erase(it);
+}
+
+void QuartcSendChannel::OnSessionCreated(QuartcSession* session) {
+  session_ = session;
+}
+
+QuicMemSlice QuartcSendChannel::EncodeChannelId() {
+  QuicMemSlice id_slice(allocator_, encoded_length_);
+  QuicDataWriter writer(encoded_length_, const_cast<char*>(id_slice.data()));
+  writer.WriteVarInt62(id_);
+  return id_slice;
+}
+
+QuartcMultiplexer::QuartcMultiplexer(
+    QuicBufferAllocator* allocator,
+    QuartcSessionEventDelegate* session_delegate,
+    QuartcReceiveChannel* default_receive_channel)
+    : allocator_(allocator),
+      session_delegate_(session_delegate),
+      default_receive_channel_(default_receive_channel) {
+  CHECK_NE(session_delegate_, nullptr);
+  CHECK_NE(default_receive_channel_, nullptr);
+}
+
+QuartcSendChannel* QuartcMultiplexer::CreateSendChannel(
+    uint64_t channel_id,
+    QuartcSendChannel::Delegate* delegate) {
+  send_channels_.push_back(QuicMakeUnique<QuartcSendChannel>(
+      this, channel_id, allocator_, delegate));
+  if (session_) {
+    send_channels_.back()->OnSessionCreated(session_);
+  }
+  return send_channels_.back().get();
+}
+
+void QuartcMultiplexer::RegisterReceiveChannel(uint64_t channel_id,
+                                               QuartcReceiveChannel* channel) {
+  if (channel == nullptr) {
+    receive_channels_.erase(channel_id);
+    return;
+  }
+  auto& registered_channel = receive_channels_[channel_id];
+  if (registered_channel) {
+    QUIC_LOG(DFATAL) << "Attempted to overwrite existing channel_id="
+                     << channel_id;
+    return;
+  }
+  registered_channel = channel;
+}
+
+int64_t QuartcMultiplexer::AllocateDatagramId(QuartcSendChannel* channel) {
+  send_channels_by_datagram_id_[next_datagram_id_] = channel;
+  return next_datagram_id_++;
+}
+
+void QuartcMultiplexer::OnSessionCreated(QuartcSession* session) {
+  for (auto& channel : send_channels_) {
+    channel->OnSessionCreated(session);
+  }
+  session_ = session;
+  session_delegate_->OnSessionCreated(session);
+}
+
+void QuartcMultiplexer::OnCryptoHandshakeComplete() {
+  session_delegate_->OnCryptoHandshakeComplete();
+}
+
+void QuartcMultiplexer::OnConnectionWritable() {
+  session_delegate_->OnConnectionWritable();
+}
+
+void QuartcMultiplexer::OnIncomingStream(QuartcStream* stream) {
+  stream->SetDelegate(this);
+}
+
+void QuartcMultiplexer::OnCongestionControlChange(
+    QuicBandwidth bandwidth_estimate,
+    QuicBandwidth pacing_rate,
+    QuicTime::Delta latest_rtt) {
+  session_delegate_->OnCongestionControlChange(bandwidth_estimate, pacing_rate,
+                                               latest_rtt);
+}
+
+void QuartcMultiplexer::OnConnectionClosed(
+    const QuicConnectionCloseFrame& frame,
+    ConnectionCloseSource source) {
+  session_delegate_->OnConnectionClosed(frame, source);
+}
+
+void QuartcMultiplexer::OnMessageReceived(QuicStringPiece message) {
+  QuicDataReader reader(message);
+  QuicVariableLengthIntegerLength channel_id_length =
+      reader.PeekVarInt62Length();
+
+  uint64_t channel_id;
+  if (!reader.ReadVarInt62(&channel_id)) {
+    QUIC_LOG(DFATAL) << "Received message without properly encoded channel id";
+    return;
+  }
+
+  QuartcReceiveChannel* channel = default_receive_channel_;
+  auto it = receive_channels_.find(channel_id);
+  if (it != receive_channels_.end()) {
+    channel = it->second;
+  }
+
+  channel->OnMessageReceived(channel_id, message.substr(channel_id_length));
+}
+
+void QuartcMultiplexer::OnMessageSent(int64_t datagram_id) {
+  auto it = send_channels_by_datagram_id_.find(datagram_id);
+  if (it == send_channels_by_datagram_id_.end()) {
+    return;
+  }
+  it->second->OnMessageSent(datagram_id);
+}
+
+void QuartcMultiplexer::OnMessageAcked(int64_t datagram_id,
+                                       QuicTime receive_timestamp) {
+  auto it = send_channels_by_datagram_id_.find(datagram_id);
+  if (it == send_channels_by_datagram_id_.end()) {
+    return;
+  }
+  it->second->OnMessageAcked(datagram_id, receive_timestamp);
+  send_channels_by_datagram_id_.erase(it);
+}
+
+void QuartcMultiplexer::OnMessageLost(int64_t datagram_id) {
+  auto it = send_channels_by_datagram_id_.find(datagram_id);
+  if (it == send_channels_by_datagram_id_.end()) {
+    return;
+  }
+  it->second->OnMessageLost(datagram_id);
+  send_channels_by_datagram_id_.erase(it);
+}
+
+size_t QuartcMultiplexer::OnReceived(QuartcStream* stream,
+                                     iovec* iov,
+                                     size_t iov_length,
+                                     bool /*fin*/) {
+  if (iov == nullptr || iov_length <= 0) {
+    return 0;
+  }
+
+  QuicDataReader reader(static_cast<char*>(iov[0].iov_base), iov[0].iov_len);
+  QuicVariableLengthIntegerLength channel_id_length =
+      reader.PeekVarInt62Length();
+
+  uint64_t channel_id;
+  if (reader.BytesRemaining() >= channel_id_length) {
+    // Fast path, have enough data to read immediately.
+    if (!reader.ReadVarInt62(&channel_id)) {
+      return 0;
+    }
+  } else {
+    // Slow path, need to coalesce multiple iovecs.
+    std::string data;
+    for (size_t i = 0; i < iov_length; ++i) {
+      data += std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len);
+    }
+    QuicDataReader combined_reader(data);
+    if (!combined_reader.ReadVarInt62(&channel_id)) {
+      return 0;
+    }
+  }
+
+  QuartcReceiveChannel* channel = default_receive_channel_;
+  auto it = receive_channels_.find(channel_id);
+  if (it != receive_channels_.end()) {
+    channel = it->second;
+  }
+  channel->OnIncomingStream(channel_id, stream);
+  return channel_id_length;
+}
+
+void QuartcMultiplexer::OnClose(QuartcStream* /*stream*/) {}
+
+void QuartcMultiplexer::OnBufferChanged(QuartcStream* /*stream*/) {}
+
+}  // namespace quic
diff --git a/quic/quartc/quartc_multiplexer.h b/quic/quartc/quartc_multiplexer.h
new file mode 100644
index 0000000..aa19e70
--- /dev/null
+++ b/quic/quartc/quartc_multiplexer.h
@@ -0,0 +1,190 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_QUARTC_QUARTC_MULTIPLEXER_H_
+#define QUICHE_QUIC_QUARTC_QUARTC_MULTIPLEXER_H_
+
+#include <cstdint>
+
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_endpoint.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_session.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_stream.h"
+
+namespace quic {
+
+class QuartcMultiplexer;
+
+// A single, multiplexed send channel within a Quartc session.  A send channel
+// wraps send-side operations with an outgoing multiplex id.
+class QuartcSendChannel {
+ public:
+  class Delegate {
+   public:
+    virtual ~Delegate() = default;
+
+    // Called when a message with |datagram_id| is sent by this channel.
+    virtual void OnMessageSent(int64_t datagram_id) = 0;
+
+    // Called when a message sent on this channel with |datagram_id| is acked.
+    // |receive_timestamp| indicates when the peer received this message,
+    // according to the peer's clock.
+    virtual void OnMessageAcked(int64_t datagram_id,
+                                QuicTime receive_timestamp) = 0;
+
+    // Called when a message sent on this channel with |datagram_id| is lost.
+    virtual void OnMessageLost(int64_t datagram_id) = 0;
+  };
+
+  QuartcSendChannel(QuartcMultiplexer* multiplexer,
+                    uint64_t id,
+                    QuicBufferAllocator* allocator,
+                    Delegate* delegate);
+  virtual ~QuartcSendChannel() = default;
+
+  // Creates a new, outgoing stream on this channel.
+  //
+  // Automatically writes the channel id to the start of the stream.  The caller
+  // SHOULD create a |ScopedPacketFlusher| before calling this function to
+  // prevent the channel id from being sent by itself.
+  QuartcStream* CreateOutgoingBidirectionalStream();
+
+  // Writes |message| to the session.  Prepends the channel's send id before any
+  // following message data.
+  bool SendOrQueueMessage(QuicMemSliceSpan message, int64_t datagram_id);
+
+  // Gets the current largest message payload for this channel.  Returns the
+  // largest payload size supported by the session minus overhead required to
+  // encode this channel's send id.
+  QuicPacketLength GetCurrentLargestMessagePayload() const;
+
+  // The following are called by the multiplexer to deliver message
+  // notifications.  The |datagram_id| passed to these is unique per-message,
+  // and must be translated back to the sender's chosen datagram_id.
+  void OnMessageSent(int64_t datagram_id);
+  void OnMessageAcked(int64_t datagram_id, QuicTime receive_timestamp);
+  void OnMessageLost(int64_t datagram_id);
+  void OnSessionCreated(QuartcSession* session);
+
+ private:
+  // Creates a mem slice containing a varint-62 encoded channel id.
+  QuicMemSlice EncodeChannelId();
+
+  QuartcMultiplexer* const multiplexer_;
+  const uint64_t id_;
+  const QuicVariableLengthIntegerLength encoded_length_;
+  QuicBufferAllocator* const allocator_;
+  Delegate* const delegate_;
+
+  QuartcSession* session_;
+
+  // Map of multiplexer-chosen to user/caller-specified datagram ids.  The user
+  // may specify any number as a datagram's id.  This number does not have to be
+  // unique across channels (nor even within a single channel).  In order
+  // to demux sent, acked, and lost messages, the multiplexer assigns a globally
+  // unique id to each message.  This map is used to restore the original caller
+  // datagram id before issuing callbacks.
+  QuicUnorderedMap<int64_t, int64_t> multiplexer_to_user_datagram_ids_;
+};
+
+// A single, multiplexed receive channel within a Quartc session.  A receive
+// channel is a delegate which accepts incoming streams and datagrams on one (or
+// more) channel ids.
+class QuartcReceiveChannel {
+ public:
+  virtual ~QuartcReceiveChannel() = default;
+
+  // Called when a new incoming stream arrives on this channel.
+  virtual void OnIncomingStream(uint64_t channel_id, QuartcStream* stream) = 0;
+
+  // Called when a message is recieved by this channel.
+  virtual void OnMessageReceived(uint64_t channel_id,
+                                 QuicStringPiece message) = 0;
+};
+
+// Delegate for session-wide events.
+class QuartcSessionEventDelegate {
+ public:
+  virtual ~QuartcSessionEventDelegate() = default;
+
+  virtual void OnSessionCreated(QuartcSession* session) = 0;
+  virtual void OnCryptoHandshakeComplete() = 0;
+  virtual void OnConnectionWritable() = 0;
+  virtual void OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
+                                         QuicBandwidth pacing_rate,
+                                         QuicTime::Delta latest_rtt) = 0;
+  virtual void OnConnectionClosed(const QuicConnectionCloseFrame& frame,
+                                  ConnectionCloseSource source) = 0;
+};
+
+// A multiplexer capable of sending and receiving data on multiple channels.
+class QuartcMultiplexer : public QuartcEndpoint::Delegate,
+                          public QuartcStream::Delegate {
+ public:
+  // Creates a new multiplexer.  |session_delegate| handles all session-wide
+  // events, while |default_receive_channel| handles incoming data on unknown
+  // or unregistered channel ids.  Neither |session_delegate| nor
+  // |default_receive_channel| may be nullptr, and both must outlive the
+  // multiplexer.
+  QuartcMultiplexer(QuicBufferAllocator* allocator,
+                    QuartcSessionEventDelegate* session_delegate,
+                    QuartcReceiveChannel* default_receive_channel);
+
+  // Creates a new send channel.  The channel is owned by the multiplexer, and
+  // references to it must not outlive the multiplexer.
+  QuartcSendChannel* CreateSendChannel(uint64_t channel_id,
+                                       QuartcSendChannel::Delegate* delegate);
+
+  // Registers a receiver for incoming data on |channel_id|.
+  void RegisterReceiveChannel(uint64_t channel_id,
+                              QuartcReceiveChannel* channel);
+
+  // Allocates a datagram id to |channel|.
+  int64_t AllocateDatagramId(QuartcSendChannel* channel);
+
+  // QuartcEndpoint::Delegate overrides.
+  void OnSessionCreated(QuartcSession* session) override;
+
+  // QuartcSession::Delegate overrides.
+  void OnCryptoHandshakeComplete() override;
+  void OnConnectionWritable() override;
+  void OnIncomingStream(QuartcStream* stream) override;
+  void OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
+                                 QuicBandwidth pacing_rate,
+                                 QuicTime::Delta latest_rtt) override;
+  void OnConnectionClosed(const QuicConnectionCloseFrame& frame,
+                          ConnectionCloseSource source) override;
+  void OnMessageReceived(QuicStringPiece message) override;
+  void OnMessageSent(int64_t datagram_id) override;
+  void OnMessageAcked(int64_t datagram_id, QuicTime receive_timestamp) override;
+  void OnMessageLost(int64_t datagram_id) override;
+
+  // QuartcStream::Delegate overrides.
+  size_t OnReceived(QuartcStream* stream,
+                    iovec* iov,
+                    size_t iov_length,
+                    bool fin) override;
+  void OnClose(QuartcStream* stream) override;
+  void OnBufferChanged(QuartcStream* stream) override;
+
+ private:
+  QuicBufferAllocator* const allocator_;
+  QuartcSessionEventDelegate* const session_delegate_;
+
+  QuartcSession* session_;
+  std::vector<std::unique_ptr<QuartcSendChannel>> send_channels_;
+  QuicUnorderedMap<uint64_t, QuartcReceiveChannel*> receive_channels_;
+  QuartcReceiveChannel* default_receive_channel_ = nullptr;
+
+  int64_t next_datagram_id_ = 1;
+  QuicUnorderedMap<int64_t, QuartcSendChannel*> send_channels_by_datagram_id_;
+};
+
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_QUARTC_QUARTC_MULTIPLEXER_H_
diff --git a/quic/quartc/quartc_multiplexer_test.cc b/quic/quartc/quartc_multiplexer_test.cc
new file mode 100644
index 0000000..64609cb
--- /dev/null
+++ b/quic/quartc/quartc_multiplexer_test.cc
@@ -0,0 +1,490 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h"
+
+#include <memory>
+
+#include "net/third_party/quiche/src/quic/core/frames/quic_connection_close_frame.h"
+#include "net/third_party/quiche/src/quic/core/quic_bandwidth.h"
+#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test_mem_slice_vector.h"
+#include "net/third_party/quiche/src/quic/quartc/counting_packet_filter.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_endpoint.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_fakes.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_session.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_stream.h"
+#include "net/third_party/quiche/src/quic/quartc/simulated_packet_transport.h"
+#include "net/third_party/quiche/src/quic/test_tools/simulator/link.h"
+#include "net/third_party/quiche/src/quic/test_tools/simulator/simulator.h"
+
+namespace quic {
+namespace {
+
+using ::testing::ElementsAreArray;
+using ::testing::Gt;
+using ::testing::IsEmpty;
+using ::testing::Pair;
+
+constexpr QuicTime::Delta kPropagationDelay =
+    QuicTime::Delta::FromMilliseconds(10);
+
+class FakeSessionEventDelegate : public QuartcSessionEventDelegate {
+ public:
+  void OnSessionCreated(QuartcSession* session) override {
+    session->StartCryptoHandshake();
+    session_ = session;
+  }
+
+  void OnConnectionWritable() override { ++writable_count_; }
+
+  void OnCryptoHandshakeComplete() override { ++handshake_count_; }
+
+  void OnConnectionClosed(const QuicConnectionCloseFrame& frame,
+                          ConnectionCloseSource source) override {
+    error_ = frame.quic_error_code;
+    close_source_ = source;
+  }
+
+  void OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
+                                 QuicBandwidth pacing_rate,
+                                 QuicTime::Delta latest_rtt) override {
+    latest_bandwidth_estimate_ = bandwidth_estimate;
+    latest_pacing_rate_ = pacing_rate;
+    latest_rtt_ = latest_rtt;
+  }
+
+  QuartcSession* session() { return session_; }
+  int writable_count() const { return writable_count_; }
+  int handshake_count() const { return handshake_count_; }
+  QuicErrorCode error() const { return error_; }
+  ConnectionCloseSource close_source() const { return close_source_; }
+  QuicBandwidth latest_bandwidth_estimate() const {
+    return latest_bandwidth_estimate_;
+  }
+  QuicBandwidth latest_pacing_rate() const { return latest_pacing_rate_; }
+  QuicTime::Delta latest_rtt() const { return latest_rtt_; }
+
+ private:
+  QuartcSession* session_ = nullptr;
+  int writable_count_ = 0;
+  int handshake_count_ = 0;
+  QuicErrorCode error_ = QUIC_NO_ERROR;
+  ConnectionCloseSource close_source_;
+  QuicBandwidth latest_bandwidth_estimate_ = QuicBandwidth::Zero();
+  QuicBandwidth latest_pacing_rate_ = QuicBandwidth::Zero();
+  QuicTime::Delta latest_rtt_ = QuicTime::Delta::Zero();
+};
+
+class FakeSendDelegate : public QuartcSendChannel::Delegate {
+ public:
+  void OnMessageSent(int64_t datagram_id) override {
+    datagrams_sent_.push_back(datagram_id);
+  }
+
+  void OnMessageAcked(int64_t datagram_id,
+                      QuicTime receive_timestamp) override {
+    datagrams_acked_.push_back({datagram_id, receive_timestamp});
+  }
+
+  void OnMessageLost(int64_t datagram_id) override {
+    datagrams_lost_.push_back(datagram_id);
+  }
+
+  const std::vector<int64_t>& datagrams_sent() const { return datagrams_sent_; }
+  const std::vector<std::pair<int64_t, QuicTime>>& datagrams_acked() const {
+    return datagrams_acked_;
+  }
+  const std::vector<int64_t>& datagrams_lost() const { return datagrams_lost_; }
+
+ private:
+  std::vector<int64_t> datagrams_sent_;
+  std::vector<std::pair<int64_t, QuicTime>> datagrams_acked_;
+  std::vector<int64_t> datagrams_lost_;
+};
+
+class FakeReceiveDelegate : public QuartcReceiveChannel,
+                            public QuartcStream::Delegate {
+ public:
+  const std::vector<std::pair<uint64_t, std::string>> messages_received()
+      const {
+    return messages_received_;
+  }
+
+  void OnIncomingStream(uint64_t channel_id, QuartcStream* stream) override {
+    stream->SetDelegate(this);
+    stream_to_channel_id_[stream] = channel_id;
+  }
+
+  void OnMessageReceived(uint64_t channel_id,
+                         QuicStringPiece message) override {
+    messages_received_.emplace_back(channel_id, message);
+  }
+
+  // Stream delegate overrides.
+  size_t OnReceived(QuartcStream* stream,
+                    iovec* iov,
+                    size_t iov_length,
+                    bool fin) override {
+    if (!fin) {
+      return 0;
+    }
+
+    size_t bytes = 0;
+    std::string message;
+    for (size_t i = 0; i < iov_length; ++i) {
+      message +=
+          std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len);
+      bytes += iov[i].iov_len;
+    }
+    QUIC_LOG(INFO) << "Received " << bytes << " byte message on channel "
+                   << stream_to_channel_id_[stream];
+    messages_received_.emplace_back(stream_to_channel_id_[stream], message);
+    return bytes;
+  }
+
+  void OnClose(QuartcStream* stream) override {
+    stream_to_channel_id_.erase(stream);
+  }
+
+  void OnBufferChanged(QuartcStream* /*stream*/) override {}
+
+ private:
+  std::vector<std::pair<uint64_t, std::string>> messages_received_;
+  QuicUnorderedMap<QuartcStream*, uint64_t> stream_to_channel_id_;
+};
+
+class QuartcMultiplexerTest : public QuicTest {
+ public:
+  QuartcMultiplexerTest()
+      : simulator_(),
+        client_transport_(&simulator_,
+                          "client_transport",
+                          "server_transport",
+                          10 * kDefaultMaxPacketSize),
+        server_transport_(&simulator_,
+                          "server_transport",
+                          "client_transport",
+                          10 * kDefaultMaxPacketSize),
+        client_filter_(&simulator_, "client_filter", &client_transport_),
+        client_server_link_(&client_filter_,
+                            &server_transport_,
+                            QuicBandwidth::FromKBitsPerSecond(10 * 1000),
+                            kPropagationDelay),
+        client_multiplexer_(simulator_.GetStreamSendBufferAllocator(),
+                            &client_session_delegate_,
+                            &client_default_receiver_),
+        server_multiplexer_(simulator_.GetStreamSendBufferAllocator(),
+                            &server_session_delegate_,
+                            &server_default_receiver_),
+        client_endpoint_(QuicMakeUnique<QuartcClientEndpoint>(
+            simulator_.GetAlarmFactory(),
+            simulator_.GetClock(),
+            simulator_.GetRandomGenerator(),
+            &client_multiplexer_,
+            quic::QuartcSessionConfig(),
+            /*serialized_server_config=*/"")),
+        server_endpoint_(QuicMakeUnique<QuartcServerEndpoint>(
+            simulator_.GetAlarmFactory(),
+            simulator_.GetClock(),
+            simulator_.GetRandomGenerator(),
+            &server_multiplexer_,
+            quic::QuartcSessionConfig())) {
+    // TODO(b/134175506): Remove when IETF QUIC supports receive timestamps.
+    SetQuicReloadableFlag(quic_enable_version_99, false);
+  }
+
+  void Connect() {
+    client_endpoint_->Connect(&client_transport_);
+    server_endpoint_->Connect(&server_transport_);
+    ASSERT_TRUE(simulator_.RunUntil([this]() {
+      return client_session_delegate_.writable_count() > 0 &&
+             server_session_delegate_.writable_count() > 0;
+    }));
+  }
+
+  void Disconnect() {
+    client_session_delegate_.session()->CloseConnection("test");
+    server_session_delegate_.session()->CloseConnection("test");
+  }
+
+ protected:
+  QuartcMultiplexer* client_multiplexer() { return &client_multiplexer_; }
+
+  QuartcMultiplexer* server_multiplexer() { return &server_multiplexer_; }
+
+  simulator::Simulator simulator_;
+
+  simulator::SimulatedQuartcPacketTransport client_transport_;
+  simulator::SimulatedQuartcPacketTransport server_transport_;
+  simulator::CountingPacketFilter client_filter_;
+  simulator::SymmetricLink client_server_link_;
+
+  FakeSessionEventDelegate client_session_delegate_;
+  FakeSessionEventDelegate server_session_delegate_;
+
+  FakeReceiveDelegate client_default_receiver_;
+  FakeReceiveDelegate server_default_receiver_;
+
+  QuartcMultiplexer client_multiplexer_;
+  QuartcMultiplexer server_multiplexer_;
+
+  std::unique_ptr<QuartcClientEndpoint> client_endpoint_;
+  std::unique_ptr<QuartcServerEndpoint> server_endpoint_;
+};
+
+TEST_F(QuartcMultiplexerTest, MultiplexMessages) {
+  Connect();
+
+  FakeSendDelegate send_delegate_1;
+  QuartcSendChannel* send_channel_1 =
+      client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
+  FakeSendDelegate send_delegate_2;
+  QuartcSendChannel* send_channel_2 =
+      client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
+
+  FakeReceiveDelegate receive_delegate_1;
+  server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
+
+  int num_messages = 10;
+  std::vector<std::pair<uint64_t, std::string>> messages_1;
+  messages_1.reserve(num_messages);
+  std::vector<std::pair<uint64_t, std::string>> messages_2;
+  messages_2.reserve(num_messages);
+  std::vector<int64_t> messages_sent_1;
+  std::vector<int64_t> messages_sent_2;
+  std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_1;
+  std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_2;
+  for (int i = 0; i < num_messages; ++i) {
+    messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
+    test::QuicTestMemSliceVector slice_1(
+        {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
+                        messages_1.back().second.size())});
+    send_channel_1->SendOrQueueMessage(slice_1.span(), i);
+    messages_sent_1.push_back(i);
+    ack_matchers_1.push_back(Pair(i, Gt(QuicTime::Zero())));
+
+    messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
+    test::QuicTestMemSliceVector slice_2(
+        {std::make_pair(const_cast<char*>(messages_2.back().second.data()),
+                        messages_2.back().second.size())});
+    // Use i + 5 as the datagram id for channel 2, so that some of the ids
+    // overlap and some are disjoint.
+    send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5);
+    messages_sent_2.push_back(i + 5);
+    ack_matchers_2.push_back(Pair(i + 5, Gt(QuicTime::Zero())));
+  }
+
+  EXPECT_TRUE(simulator_.RunUntil([&send_delegate_1, &send_delegate_2]() {
+    return send_delegate_1.datagrams_acked().size() == 10 &&
+           send_delegate_2.datagrams_acked().size() == 10;
+  }));
+
+  EXPECT_EQ(send_delegate_1.datagrams_sent(), messages_sent_1);
+  EXPECT_EQ(send_delegate_2.datagrams_sent(), messages_sent_2);
+
+  EXPECT_EQ(receive_delegate_1.messages_received(), messages_1);
+  EXPECT_EQ(server_default_receiver_.messages_received(), messages_2);
+
+  EXPECT_THAT(send_delegate_1.datagrams_acked(),
+              ElementsAreArray(ack_matchers_1));
+  EXPECT_THAT(send_delegate_2.datagrams_acked(),
+              ElementsAreArray(ack_matchers_2));
+}
+
+TEST_F(QuartcMultiplexerTest, MultiplexStreams) {
+  FakeSendDelegate send_delegate_1;
+  QuartcSendChannel* send_channel_1 =
+      client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
+  FakeSendDelegate send_delegate_2;
+  QuartcSendChannel* send_channel_2 =
+      client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
+
+  FakeQuartcStreamDelegate fake_send_stream_delegate;
+
+  FakeReceiveDelegate receive_delegate_1;
+  server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
+
+  Connect();
+
+  int num_messages = 10;
+  std::vector<std::pair<uint64_t, std::string>> messages_1;
+  messages_1.reserve(num_messages);
+  std::vector<std::pair<uint64_t, std::string>> messages_2;
+  messages_2.reserve(num_messages);
+  for (int i = 0; i < num_messages; ++i) {
+    messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
+    test::QuicTestMemSliceVector slice_1(
+        {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
+                        messages_1.back().second.size())});
+    QuartcStream* stream_1 =
+        send_channel_1->CreateOutgoingBidirectionalStream();
+    stream_1->SetDelegate(&fake_send_stream_delegate);
+    stream_1->WriteMemSlices(slice_1.span(), /*fin=*/true);
+
+    messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
+    test::QuicTestMemSliceVector slice_2(
+        {std::make_pair(const_cast<char*>(messages_2.back().second.data()),
+                        messages_2.back().second.size())});
+    QuartcStream* stream_2 =
+        send_channel_2->CreateOutgoingBidirectionalStream();
+    stream_2->SetDelegate(&fake_send_stream_delegate);
+    stream_2->WriteMemSlices(slice_2.span(), /*fin=*/true);
+  }
+
+  EXPECT_TRUE(simulator_.RunUntilOrTimeout(
+      [this, &receive_delegate_1]() {
+        return receive_delegate_1.messages_received().size() == 10 &&
+               server_default_receiver_.messages_received().size() == 10;
+      },
+      QuicTime::Delta::FromSeconds(5)));
+
+  EXPECT_EQ(receive_delegate_1.messages_received(), messages_1);
+  EXPECT_EQ(server_default_receiver_.messages_received(), messages_2);
+}
+
+// Tests that datagram-lost callbacks are invoked on the right send channel
+// delegate, and that they work with overlapping datagram ids.
+TEST_F(QuartcMultiplexerTest, MultiplexLostDatagrams) {
+  Connect();
+  ASSERT_TRUE(simulator_.RunUntil([this]() {
+    return client_session_delegate_.handshake_count() > 0 &&
+           server_session_delegate_.handshake_count() > 0;
+  }));
+
+  // Just drop everything we try to send.
+  client_filter_.set_packets_to_drop(30);
+
+  FakeSendDelegate send_delegate_1;
+  QuartcSendChannel* send_channel_1 =
+      client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
+  FakeSendDelegate send_delegate_2;
+  QuartcSendChannel* send_channel_2 =
+      client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
+
+  FakeQuartcStreamDelegate fake_send_stream_delegate;
+
+  FakeReceiveDelegate receive_delegate_1;
+  server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
+
+  int num_messages = 10;
+  std::vector<std::pair<uint64_t, std::string>> messages_1;
+  messages_1.reserve(num_messages);
+  std::vector<std::pair<uint64_t, std::string>> messages_2;
+  messages_2.reserve(num_messages);
+  std::vector<int64_t> messages_sent_1;
+  std::vector<int64_t> messages_sent_2;
+  for (int i = 0; i < num_messages; ++i) {
+    messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
+    test::QuicTestMemSliceVector slice_1(
+        {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
+                        messages_1.back().second.size())});
+    send_channel_1->SendOrQueueMessage(slice_1.span(), i);
+    messages_sent_1.push_back(i);
+
+    messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
+    test::QuicTestMemSliceVector slice_2(
+        {std::make_pair(const_cast<char*>(messages_2.back().second.data()),
+                        messages_2.back().second.size())});
+    // Use i + 5 as the datagram id for channel 2, so that some of the ids
+    // overlap and some are disjoint.
+    send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5);
+    messages_sent_2.push_back(i + 5);
+  }
+
+  // Now send something retransmittable to prompt loss detection.
+  // If we never send anything retransmittable, we will never get acks, and
+  // never detect losses.
+  messages_1.emplace_back(1, QuicStrCat("message for 1: ", num_messages));
+  test::QuicTestMemSliceVector slice(
+      {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
+                      messages_1.back().second.size())});
+  QuartcStream* stream_1 = send_channel_1->CreateOutgoingBidirectionalStream();
+  stream_1->SetDelegate(&fake_send_stream_delegate);
+  stream_1->WriteMemSlices(slice.span(), /*fin=*/true);
+
+  EXPECT_TRUE(simulator_.RunUntilOrTimeout(
+      [&send_delegate_1, &send_delegate_2]() {
+        return send_delegate_1.datagrams_lost().size() == 10 &&
+               send_delegate_2.datagrams_lost().size() == 10;
+      },
+      QuicTime::Delta::FromSeconds(60)));
+
+  EXPECT_EQ(send_delegate_1.datagrams_lost(), messages_sent_1);
+  EXPECT_EQ(send_delegate_2.datagrams_lost(), messages_sent_2);
+
+  EXPECT_THAT(send_delegate_1.datagrams_acked(), IsEmpty());
+  EXPECT_THAT(send_delegate_2.datagrams_acked(), IsEmpty());
+
+  EXPECT_THAT(receive_delegate_1.messages_received(), IsEmpty());
+  EXPECT_THAT(server_default_receiver_.messages_received(), IsEmpty());
+}
+
+TEST_F(QuartcMultiplexerTest, UnregisterReceiveChannel) {
+  Connect();
+
+  FakeSendDelegate send_delegate;
+  QuartcSendChannel* send_channel =
+      client_multiplexer()->CreateSendChannel(1, &send_delegate);
+  FakeQuartcStreamDelegate fake_send_stream_delegate;
+
+  FakeReceiveDelegate receive_delegate;
+  server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate);
+  server_multiplexer()->RegisterReceiveChannel(1, nullptr);
+
+  int num_messages = 10;
+  std::vector<std::pair<uint64_t, std::string>> messages;
+  messages.reserve(num_messages);
+  std::vector<int64_t> messages_sent;
+  std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers;
+  for (int i = 0; i < num_messages; ++i) {
+    messages.emplace_back(1, QuicStrCat("message for 1: ", i));
+    test::QuicTestMemSliceVector slice(
+        {std::make_pair(const_cast<char*>(messages.back().second.data()),
+                        messages.back().second.size())});
+    send_channel->SendOrQueueMessage(slice.span(), i);
+    messages_sent.push_back(i);
+    ack_matchers.push_back(Pair(i, Gt(QuicTime::Zero())));
+  }
+
+  EXPECT_TRUE(simulator_.RunUntil([&send_delegate]() {
+    return send_delegate.datagrams_acked().size() == 10;
+  }));
+
+  EXPECT_EQ(send_delegate.datagrams_sent(), messages_sent);
+  EXPECT_EQ(server_default_receiver_.messages_received(), messages);
+  EXPECT_THAT(send_delegate.datagrams_acked(), ElementsAreArray(ack_matchers));
+}
+
+TEST_F(QuartcMultiplexerTest, CloseEvent) {
+  Connect();
+  Disconnect();
+
+  EXPECT_EQ(client_session_delegate_.error(), QUIC_CONNECTION_CANCELLED);
+  EXPECT_EQ(server_session_delegate_.error(), QUIC_CONNECTION_CANCELLED);
+}
+
+TEST_F(QuartcMultiplexerTest, CongestionEvent) {
+  Connect();
+  ASSERT_TRUE(simulator_.RunUntil([this]() {
+    return client_session_delegate_.handshake_count() > 0 &&
+           server_session_delegate_.handshake_count() > 0;
+  }));
+
+  EXPECT_GT(client_session_delegate_.latest_bandwidth_estimate(),
+            QuicBandwidth::Zero());
+  EXPECT_GT(client_session_delegate_.latest_pacing_rate(),
+            QuicBandwidth::Zero());
+  EXPECT_GT(client_session_delegate_.latest_rtt(), QuicTime::Delta::Zero());
+}
+
+}  // namespace
+}  // namespace quic
diff --git a/quic/quartc/quartc_stream.cc b/quic/quartc/quartc_stream.cc
index 607fa43..f18d42d 100644
--- a/quic/quartc/quartc_stream.cc
+++ b/quic/quartc/quartc_stream.cc
@@ -27,24 +27,28 @@
 QuartcStream::~QuartcStream() {}
 
 void QuartcStream::OnDataAvailable() {
-  bool fin = sequencer()->ReadableBytes() + sequencer()->NumBytesConsumed() ==
-             sequencer()->close_offset();
+  size_t bytes_consumed = 0;
+  do {
+    bool fin = sequencer()->ReadableBytes() + sequencer()->NumBytesConsumed() ==
+               sequencer()->close_offset();
 
-  // Upper bound on number of readable regions.  Each complete block's worth of
-  // data crosses at most one region boundary.  The remainder may cross one more
-  // boundary.  Number of regions is one more than the number of region
-  // boundaries crossed.
-  size_t iov_length = sequencer()->ReadableBytes() /
-                          QuicStreamSequencerBuffer::kBlockSizeBytes +
-                      2;
-  std::unique_ptr<iovec[]> iovecs = QuicMakeUnique<iovec[]>(iov_length);
-  iov_length = sequencer()->GetReadableRegions(iovecs.get(), iov_length);
+    // Upper bound on number of readable regions.  Each complete block's worth
+    // of data crosses at most one region boundary.  The remainder may cross one
+    // more boundary.  Number of regions is one more than the number of region
+    // boundaries crossed.
+    size_t iov_length = sequencer()->ReadableBytes() /
+                            QuicStreamSequencerBuffer::kBlockSizeBytes +
+                        2;
+    std::unique_ptr<iovec[]> iovecs = QuicMakeUnique<iovec[]>(iov_length);
+    iov_length = sequencer()->GetReadableRegions(iovecs.get(), iov_length);
 
-  sequencer()->MarkConsumed(
-      delegate_->OnReceived(this, iovecs.get(), iov_length, fin));
-  if (sequencer()->IsClosed()) {
-    OnFinRead();
-  }
+    bytes_consumed = delegate_->OnReceived(this, iovecs.get(), iov_length, fin);
+    sequencer()->MarkConsumed(bytes_consumed);
+    if (sequencer()->IsClosed()) {
+      OnFinRead();
+      break;
+    }
+  } while (bytes_consumed > 0);
 }
 
 void QuartcStream::OnClose() {
@@ -56,8 +60,9 @@
 void QuartcStream::OnStreamDataConsumed(size_t bytes_consumed) {
   QuicStream::OnStreamDataConsumed(bytes_consumed);
 
-  DCHECK(delegate_);
-  delegate_->OnBufferChanged(this);
+  if (delegate_) {
+    delegate_->OnBufferChanged(this);
+  }
 }
 
 void QuartcStream::OnDataBuffered(
@@ -65,8 +70,9 @@
     QuicByteCount /*data_length*/,
     const QuicReferenceCountedPointer<
         QuicAckListenerInterface>& /*ack_listener*/) {
-  DCHECK(delegate_);
-  delegate_->OnBufferChanged(this);
+  if (delegate_) {
+    delegate_->OnBufferChanged(this);
+  }
 }
 
 bool QuartcStream::OnStreamFrameAcked(QuicStreamOffset offset,
@@ -156,10 +162,6 @@
 }
 
 void QuartcStream::SetDelegate(Delegate* delegate) {
-  if (delegate_) {
-    QUIC_LOG(WARNING) << "The delegate for Stream " << id()
-                      << " has already been set.";
-  }
   delegate_ = delegate;
   DCHECK(delegate_);
 }