Refactor QuicSession to allow subclasses to do their own thing on pending streams.
gfe-relnote: code refactor and v99 only. Not used in production.
PiperOrigin-RevId: 248015584
Change-Id: Ie0ac855070b304bd90a8e68392cff3cf4c5ac52a
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 8bdf3b5..0545fa6 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -149,6 +149,24 @@
}
}
+void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
+ DCHECK(VersionHasControlStreams(connection()->transport_version()));
+ QuicStreamId stream_id = frame.stream_id;
+
+ PendingStream* pending = GetOrCreatePendingStream(stream_id);
+
+ if (!pending) {
+ if (frame.fin) {
+ QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
+ OnFinalByteOffsetReceived(stream_id, final_byte_offset);
+ }
+ return;
+ }
+
+ pending->OnStreamFrame(frame);
+ ProcessPendingStream(pending);
+}
+
void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
// TODO(rch) deal with the error case of stream id 0.
QuicStreamId stream_id = frame.stream_id;
@@ -167,12 +185,18 @@
return;
}
- StreamHandler handler = GetOrCreateStreamImpl(stream_id, frame.offset != 0);
- if (handler.is_pending) {
- handler.pending->OnStreamFrame(frame);
+ if (VersionHasControlStreams(connection()->transport_version()) &&
+ UsesPendingStreams() &&
+ QuicUtils::GetStreamType(stream_id, perspective(),
+ IsIncomingStream(stream_id)) ==
+ READ_UNIDIRECTIONAL &&
+ dynamic_stream_map_.find(stream_id) == dynamic_stream_map_.end()) {
+ PendingStreamOnStreamFrame(frame);
return;
}
+ StreamHandler handler = GetOrCreateStreamImpl(stream_id);
+
if (!handler.stream) {
// The stream no longer exists, but we may still be interested in the
// final stream byte offset sent by the peer. A frame with a FIN can give
@@ -296,6 +320,21 @@
return true;
}
+void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
+ DCHECK(VersionHasControlStreams(connection()->transport_version()));
+ QuicStreamId stream_id = frame.stream_id;
+
+ PendingStream* pending = GetOrCreatePendingStream(stream_id);
+
+ if (!pending) {
+ HandleRstOnValidNonexistentStream(frame);
+ return;
+ }
+
+ pending->OnRstStreamFrame(frame);
+ ClosePendingStream(stream_id);
+}
+
void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
QuicStreamId stream_id = frame.stream_id;
if (stream_id ==
@@ -317,15 +356,18 @@
visitor_->OnRstStreamReceived(frame);
}
- // may_buffer is true here to allow subclasses to buffer streams until the
- // first byte of payload arrives which would allow sessions to delay
- // creation of the stream until the type is known.
- StreamHandler handler = GetOrCreateStreamImpl(stream_id, /*may_buffer=*/true);
- if (handler.is_pending) {
- handler.pending->OnRstStreamFrame(frame);
- ClosePendingStream(stream_id);
+ if (VersionHasControlStreams(connection()->transport_version()) &&
+ UsesPendingStreams() &&
+ QuicUtils::GetStreamType(stream_id, perspective(),
+ IsIncomingStream(stream_id)) ==
+ READ_UNIDIRECTIONAL &&
+ dynamic_stream_map_.find(stream_id) == dynamic_stream_map_.end()) {
+ PendingStreamOnRstStream(frame);
return;
}
+
+ StreamHandler handler = GetOrCreateStreamImpl(stream_id);
+
if (!handler.stream) {
HandleRstOnValidNonexistentStream(frame);
return; // Errors are handled by GetOrCreateStream.
@@ -1200,15 +1242,13 @@
}
QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
- StreamHandler handler =
- GetOrCreateStreamImpl(stream_id, /*may_buffer=*/false);
+ StreamHandler handler = GetOrCreateStreamImpl(stream_id);
DCHECK(!handler.is_pending);
return handler.stream;
}
QuicSession::StreamHandler QuicSession::GetOrCreateStreamImpl(
- QuicStreamId stream_id,
- bool may_buffer) {
+ QuicStreamId stream_id) {
if (eliminate_static_stream_map_ &&
stream_id ==
QuicUtils::GetCryptoStreamId(connection_->transport_version())) {
@@ -1219,7 +1259,7 @@
if (it != static_stream_map_.end()) {
return StreamHandler(it->second);
}
- return GetOrCreateDynamicStreamImpl(stream_id, may_buffer);
+ return GetOrCreateDynamicStreamImpl(stream_id);
}
void QuicSession::StreamDraining(QuicStreamId stream_id) {
@@ -1254,17 +1294,32 @@
return write_blocked_streams()->ShouldYield(stream_id);
}
+PendingStream* QuicSession::GetOrCreatePendingStream(QuicStreamId stream_id) {
+ auto it = pending_stream_map_.find(stream_id);
+ if (it != pending_stream_map_.end()) {
+ return it->second.get();
+ }
+
+ if (IsClosedStream(stream_id) ||
+ !MaybeIncreaseLargestPeerStreamId(stream_id)) {
+ return nullptr;
+ }
+
+ auto pending = QuicMakeUnique<PendingStream>(stream_id, this);
+ PendingStream* unowned_pending = pending.get();
+ pending_stream_map_[stream_id] = std::move(pending);
+ return unowned_pending;
+}
+
QuicStream* QuicSession::GetOrCreateDynamicStream(
const QuicStreamId stream_id) {
- StreamHandler handler =
- GetOrCreateDynamicStreamImpl(stream_id, /*may_buffer=*/false);
+ StreamHandler handler = GetOrCreateDynamicStreamImpl(stream_id);
DCHECK(!handler.is_pending);
return handler.stream;
}
QuicSession::StreamHandler QuicSession::GetOrCreateDynamicStreamImpl(
- QuicStreamId stream_id,
- bool may_buffer) {
+ QuicStreamId stream_id) {
DCHECK(!QuicContainsKey(static_stream_map_, stream_id))
<< "Attempt to call GetOrCreateDynamicStream for a static stream";
@@ -1282,21 +1337,6 @@
return StreamHandler();
}
- auto pending_it = pending_stream_map_.find(stream_id);
- if (pending_it != pending_stream_map_.end()) {
- DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
- if (may_buffer) {
- return StreamHandler(pending_it->second.get());
- }
- // The stream limit accounting has already been taken care of
- // when the PendingStream was created, so there is no need to
- // do so here. Now we can create the actual stream from the
- // PendingStream.
- StreamHandler handler(CreateIncomingStream(std::move(*pending_it->second)));
- pending_stream_map_.erase(pending_it);
- return handler;
- }
-
// TODO(fkastenholz): If we are creating a new stream and we have
// sent a goaway, we should ignore the stream creation. Need to
// add code to A) test if goaway was sent ("if (goaway_sent_)") and
@@ -1317,18 +1357,6 @@
}
}
- if (connection_->transport_version() == QUIC_VERSION_99 && may_buffer &&
- ShouldBufferIncomingStream(stream_id)) {
- ++num_dynamic_incoming_streams_;
- // Since STREAM frames may arrive out of order, delay creating the
- // stream object until the first byte arrives. Buffer the frames and
- // handle flow control accounting in the PendingStream.
- auto pending = QuicMakeUnique<PendingStream>(stream_id, this);
- StreamHandler handler(pending.get());
- pending_stream_map_[stream_id] = std::move(pending);
- return handler;
- }
-
return StreamHandler(CreateIncomingStream(stream_id));
}