Create a QuartcMultiplexer which separates streams and datagrams into channels.
QuartcMultiplexer effectively reorganizes the calls and callbacks for
QuartcSession into three categories:
- Per-channel send events
- Per-channel receive events
- Session-wide events that are not multiplexed
QuartcMultiplexer works at a low level to hide streams and messages from other
senders and receivers. It consists of a core multiplexer object which interacts
with the QuartcEndpoint/Session, a channel sender which handles outgoing data,
and a channel receiver which handles incoming data.
The sender has a specific channel id, specified on creation. A channel writes
this channel id in a varint form at the start of each new stream or datagram it
sends.
The multiplexer intercepts all the callbacks for incoming streams and datagrams.
It reads a varint from the start of each stream or datagram to identify the
channel id. It then looks up a receiver for that channel and delegates the
stream or datagram to that receiver.
A default receiver may be registered to handle all streams or datagrams not
assigned to a specific receiver. This allows endpoints to dispatch unhandled
data to a catch-all, or to await incoming data before registering a specific
receiver. The latter may be useful to in conjunction with some forms of
negotiation; eg. when certain ranges of channel ids are allocated to different
protocols, but it is unknown which protocol will be used at startup.
gfe-relnote: n/a (quartc only)
PiperOrigin-RevId: 260588723
Change-Id: I4a3b815b48c4f825c47bc60b0b3fd76d4e3614a5
diff --git a/quic/quartc/quartc_stream.cc b/quic/quartc/quartc_stream.cc
index 607fa43..f18d42d 100644
--- a/quic/quartc/quartc_stream.cc
+++ b/quic/quartc/quartc_stream.cc
@@ -27,24 +27,28 @@
QuartcStream::~QuartcStream() {}
void QuartcStream::OnDataAvailable() {
- bool fin = sequencer()->ReadableBytes() + sequencer()->NumBytesConsumed() ==
- sequencer()->close_offset();
+ size_t bytes_consumed = 0;
+ do {
+ bool fin = sequencer()->ReadableBytes() + sequencer()->NumBytesConsumed() ==
+ sequencer()->close_offset();
- // Upper bound on number of readable regions. Each complete block's worth of
- // data crosses at most one region boundary. The remainder may cross one more
- // boundary. Number of regions is one more than the number of region
- // boundaries crossed.
- size_t iov_length = sequencer()->ReadableBytes() /
- QuicStreamSequencerBuffer::kBlockSizeBytes +
- 2;
- std::unique_ptr<iovec[]> iovecs = QuicMakeUnique<iovec[]>(iov_length);
- iov_length = sequencer()->GetReadableRegions(iovecs.get(), iov_length);
+ // Upper bound on number of readable regions. Each complete block's worth
+ // of data crosses at most one region boundary. The remainder may cross one
+ // more boundary. Number of regions is one more than the number of region
+ // boundaries crossed.
+ size_t iov_length = sequencer()->ReadableBytes() /
+ QuicStreamSequencerBuffer::kBlockSizeBytes +
+ 2;
+ std::unique_ptr<iovec[]> iovecs = QuicMakeUnique<iovec[]>(iov_length);
+ iov_length = sequencer()->GetReadableRegions(iovecs.get(), iov_length);
- sequencer()->MarkConsumed(
- delegate_->OnReceived(this, iovecs.get(), iov_length, fin));
- if (sequencer()->IsClosed()) {
- OnFinRead();
- }
+ bytes_consumed = delegate_->OnReceived(this, iovecs.get(), iov_length, fin);
+ sequencer()->MarkConsumed(bytes_consumed);
+ if (sequencer()->IsClosed()) {
+ OnFinRead();
+ break;
+ }
+ } while (bytes_consumed > 0);
}
void QuartcStream::OnClose() {
@@ -56,8 +60,9 @@
void QuartcStream::OnStreamDataConsumed(size_t bytes_consumed) {
QuicStream::OnStreamDataConsumed(bytes_consumed);
- DCHECK(delegate_);
- delegate_->OnBufferChanged(this);
+ if (delegate_) {
+ delegate_->OnBufferChanged(this);
+ }
}
void QuartcStream::OnDataBuffered(
@@ -65,8 +70,9 @@
QuicByteCount /*data_length*/,
const QuicReferenceCountedPointer<
QuicAckListenerInterface>& /*ack_listener*/) {
- DCHECK(delegate_);
- delegate_->OnBufferChanged(this);
+ if (delegate_) {
+ delegate_->OnBufferChanged(this);
+ }
}
bool QuartcStream::OnStreamFrameAcked(QuicStreamOffset offset,
@@ -156,10 +162,6 @@
}
void QuartcStream::SetDelegate(Delegate* delegate) {
- if (delegate_) {
- QUIC_LOG(WARNING) << "The delegate for Stream " << id()
- << " has already been set.";
- }
delegate_ = delegate;
DCHECK(delegate_);
}