Use PeekRegion() instead of PrefetchNextRegion() in HTTP/3 streams.

Use PeekRegion() instead of PrefetchNextRegion() in QuicSpdyStream and
QuicReceiveControlStream.  Since offset is incremented after the ProcessInput()
call, not before, reentrancy has to be avoided in QuicSpdyStream.  (No such
issue in QuicReceiveControlStream because no HttpDecoder::Visitor implementation
methods can result in OnDataAvailable() being re-entered.

Unfortunately this is neither pretty nor simple, but necessary for blocked
decoding.

gfe-relnote: n/a, change in QUIC v99-only code path.
PiperOrigin-RevId: 255633598
Change-Id: I2d6d438c1e09837138e9301b10b2366f34e80790
diff --git a/quic/core/http/quic_receive_control_stream.cc b/quic/core/http/quic_receive_control_stream.cc
index 85585bd..17a3c82 100644
--- a/quic/core/http/quic_receive_control_stream.cc
+++ b/quic/core/http/quic_receive_control_stream.cc
@@ -123,7 +123,8 @@
 QuicReceiveControlStream::QuicReceiveControlStream(PendingStream* pending)
     : QuicStream(pending, READ_UNIDIRECTIONAL, /*is_static=*/true),
       received_settings_length_(0),
-      http_decoder_visitor_(new HttpDecoderVisitor(this)) {
+      http_decoder_visitor_(new HttpDecoderVisitor(this)),
+      sequencer_offset_(sequencer()->NumBytesConsumed()) {
   decoder_.set_visitor(http_decoder_visitor_.get());
   sequencer()->set_level_triggered(true);
 }
@@ -141,10 +142,16 @@
 
 void QuicReceiveControlStream::OnDataAvailable() {
   iovec iov;
-  while (!reading_stopped() && decoder_.error() == QUIC_NO_ERROR &&
-         sequencer()->PrefetchNextRegion(&iov)) {
-    decoder_.ProcessInput(reinterpret_cast<const char*>(iov.iov_base),
-                          iov.iov_len);
+  while (!reading_stopped() && decoder_.error() == QUIC_NO_ERROR) {
+    DCHECK_GE(sequencer_offset_, sequencer()->NumBytesConsumed());
+    if (!sequencer()->PeekRegion(sequencer_offset_, &iov)) {
+      break;
+    }
+
+    DCHECK(!sequencer()->IsClosed());
+    QuicByteCount processed_bytes = decoder_.ProcessInput(
+        reinterpret_cast<const char*>(iov.iov_base), iov.iov_len);
+    sequencer_offset_ += processed_bytes;
   }
 }
 
diff --git a/quic/core/http/quic_receive_control_stream.h b/quic/core/http/quic_receive_control_stream.h
index 79964ba..e4e9c20 100644
--- a/quic/core/http/quic_receive_control_stream.h
+++ b/quic/core/http/quic_receive_control_stream.h
@@ -46,6 +46,12 @@
 
   // HttpDecoder's visitor.
   std::unique_ptr<HttpDecoderVisitor> http_decoder_visitor_;
+
+  // Sequencer offset keeping track of how much data HttpDecoder has processed.
+  // Initial value is sequencer()->NumBytesConsumed() at time of
+  // QuicReceiveControlStream construction: that is the length of the
+  // unidirectional stream type at the beginning of the stream.
+  QuicStreamOffset sequencer_offset_;
 };
 
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index 6db96fa..f02eaeb 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -171,9 +171,12 @@
       headers_bytes_to_be_marked_consumed_(0),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
       body_buffer_(sequencer()),
+      sequencer_offset_(0),
+      is_decoder_processing_input_(false),
       ack_listener_(nullptr) {
   DCHECK(!QuicUtils::IsCryptoStreamId(
       spdy_session->connection()->transport_version(), id));
+  DCHECK_EQ(0u, sequencer()->NumBytesConsumed());
   // If headers are sent on the headers stream, then do not receive any
   // callbacks from the sequencer until headers are complete.
   if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
@@ -202,6 +205,8 @@
       headers_bytes_to_be_marked_consumed_(0),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
       body_buffer_(sequencer()),
+      sequencer_offset_(sequencer()->NumBytesConsumed()),
+      is_decoder_processing_input_(false),
       ack_listener_(nullptr) {
   DCHECK(!QuicUtils::IsCryptoStreamId(
       spdy_session->connection()->transport_version(), id()));
@@ -621,12 +626,24 @@
     return;
   }
 
+  if (is_decoder_processing_input_) {
+    // Let the outermost nested OnDataAvailable() call do the work.
+    return;
+  }
+
   iovec iov;
-  while (!reading_stopped() && decoder_.error() == QUIC_NO_ERROR &&
-         sequencer()->PrefetchNextRegion(&iov)) {
+  while (!reading_stopped() && decoder_.error() == QUIC_NO_ERROR) {
+    DCHECK_GE(sequencer_offset_, sequencer()->NumBytesConsumed());
+    if (!sequencer()->PeekRegion(sequencer_offset_, &iov)) {
+      break;
+    }
+
     DCHECK(!sequencer()->IsClosed());
-    decoder_.ProcessInput(reinterpret_cast<const char*>(iov.iov_base),
-                          iov.iov_len);
+    is_decoder_processing_input_ = true;
+    QuicByteCount processed_bytes = decoder_.ProcessInput(
+        reinterpret_cast<const char*>(iov.iov_base), iov.iov_len);
+    is_decoder_processing_input_ = false;
+    sequencer_offset_ += processed_bytes;
   }
 
   // Do not call OnBodyAvailable() until headers are consumed.
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index 57f2fe0..b6b46fe 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -291,6 +291,16 @@
   // Buffer that contains decoded data of the stream.
   QuicSpdyStreamBodyBuffer body_buffer_;
 
+  // Sequencer offset keeping track of how much data HttpDecoder has processed.
+  // Initial value is zero for fresh streams, or sequencer()->NumBytesConsumed()
+  // at time of construction if a PendingStream is converted to account for the
+  // length of the unidirectional stream type at the beginning of the stream.
+  QuicStreamOffset sequencer_offset_;
+
+  // True when inside an HttpDecoder::ProcessInput() call.
+  // Used for detecting reentrancy.
+  bool is_decoder_processing_input_;
+
   // Ack listener of this stream, and it is notified when any of written bytes
   // are acked or retransmitted.
   QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener_;