Create a QuartcMultiplexer which separates streams and datagrams into channels.

QuartcMultiplexer effectively reorganizes the calls and callbacks for
QuartcSession into three categories:
 - Per-channel send events
 - Per-channel receive events
 - Session-wide events that are not multiplexed

QuartcMultiplexer works at a low level to hide streams and messages from other
senders and receivers.  It consists of a core multiplexer object which interacts
with the QuartcEndpoint/Session, a channel sender which handles outgoing data,
and a channel receiver which handles incoming data.

The sender has a specific channel id, specified on creation.  A channel writes
this channel id in a varint form at the start of each new stream or datagram it
sends.

The multiplexer intercepts all the callbacks for incoming streams and datagrams.
It reads a varint from the start of each stream or datagram to identify the
channel id.  It then looks up a receiver for that channel and delegates the
stream or datagram to that receiver.

A default receiver may be registered to handle all streams or datagrams not
assigned to a specific receiver.  This allows endpoints to dispatch unhandled
data to a catch-all, or to await incoming data before registering a specific
receiver.  The latter may be useful to in conjunction with some forms of
negotiation; eg. when certain ranges of channel ids are allocated to different
protocols, but it is unknown which protocol will be used at startup.

gfe-relnote: n/a (quartc only)
PiperOrigin-RevId: 260588723
Change-Id: I4a3b815b48c4f825c47bc60b0b3fd76d4e3614a5
diff --git a/quic/quartc/quartc_stream.cc b/quic/quartc/quartc_stream.cc
index 607fa43..f18d42d 100644
--- a/quic/quartc/quartc_stream.cc
+++ b/quic/quartc/quartc_stream.cc
@@ -27,24 +27,28 @@
 QuartcStream::~QuartcStream() {}
 
 void QuartcStream::OnDataAvailable() {
-  bool fin = sequencer()->ReadableBytes() + sequencer()->NumBytesConsumed() ==
-             sequencer()->close_offset();
+  size_t bytes_consumed = 0;
+  do {
+    bool fin = sequencer()->ReadableBytes() + sequencer()->NumBytesConsumed() ==
+               sequencer()->close_offset();
 
-  // Upper bound on number of readable regions.  Each complete block's worth of
-  // data crosses at most one region boundary.  The remainder may cross one more
-  // boundary.  Number of regions is one more than the number of region
-  // boundaries crossed.
-  size_t iov_length = sequencer()->ReadableBytes() /
-                          QuicStreamSequencerBuffer::kBlockSizeBytes +
-                      2;
-  std::unique_ptr<iovec[]> iovecs = QuicMakeUnique<iovec[]>(iov_length);
-  iov_length = sequencer()->GetReadableRegions(iovecs.get(), iov_length);
+    // Upper bound on number of readable regions.  Each complete block's worth
+    // of data crosses at most one region boundary.  The remainder may cross one
+    // more boundary.  Number of regions is one more than the number of region
+    // boundaries crossed.
+    size_t iov_length = sequencer()->ReadableBytes() /
+                            QuicStreamSequencerBuffer::kBlockSizeBytes +
+                        2;
+    std::unique_ptr<iovec[]> iovecs = QuicMakeUnique<iovec[]>(iov_length);
+    iov_length = sequencer()->GetReadableRegions(iovecs.get(), iov_length);
 
-  sequencer()->MarkConsumed(
-      delegate_->OnReceived(this, iovecs.get(), iov_length, fin));
-  if (sequencer()->IsClosed()) {
-    OnFinRead();
-  }
+    bytes_consumed = delegate_->OnReceived(this, iovecs.get(), iov_length, fin);
+    sequencer()->MarkConsumed(bytes_consumed);
+    if (sequencer()->IsClosed()) {
+      OnFinRead();
+      break;
+    }
+  } while (bytes_consumed > 0);
 }
 
 void QuartcStream::OnClose() {
@@ -56,8 +60,9 @@
 void QuartcStream::OnStreamDataConsumed(size_t bytes_consumed) {
   QuicStream::OnStreamDataConsumed(bytes_consumed);
 
-  DCHECK(delegate_);
-  delegate_->OnBufferChanged(this);
+  if (delegate_) {
+    delegate_->OnBufferChanged(this);
+  }
 }
 
 void QuartcStream::OnDataBuffered(
@@ -65,8 +70,9 @@
     QuicByteCount /*data_length*/,
     const QuicReferenceCountedPointer<
         QuicAckListenerInterface>& /*ack_listener*/) {
-  DCHECK(delegate_);
-  delegate_->OnBufferChanged(this);
+  if (delegate_) {
+    delegate_->OnBufferChanged(this);
+  }
 }
 
 bool QuartcStream::OnStreamFrameAcked(QuicStreamOffset offset,
@@ -156,10 +162,6 @@
 }
 
 void QuartcStream::SetDelegate(Delegate* delegate) {
-  if (delegate_) {
-    QUIC_LOG(WARNING) << "The delegate for Stream " << id()
-                      << " has already been set.";
-  }
   delegate_ = delegate;
   DCHECK(delegate_);
 }