Unify the way in which streams are marked unblocked after being converted from the pending status.

This does not change anything for existing code, but it does switch the order between SetUnblocked() and erase() on pending streams map, which is needed for a later WebTransport CL.

PiperOrigin-RevId: 364835817
Change-Id: I88f1a80271527c886dfe632aeff6430221ee1a1f
diff --git a/quic/core/http/quic_receive_control_stream.h b/quic/core/http/quic_receive_control_stream.h
index 326dba3..71f0e05 100644
--- a/quic/core/http/quic_receive_control_stream.h
+++ b/quic/core/http/quic_receive_control_stream.h
@@ -64,8 +64,6 @@
   bool OnUnknownFramePayload(absl::string_view payload) override;
   bool OnUnknownFrameEnd() override;
 
-  void SetUnblocked() { sequencer()->SetUnblocked(); }
-
   QuicSpdySession* spdy_session() { return spdy_session_; }
 
  private:
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index 2a2df67..1d20a9c 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -1348,13 +1348,13 @@
   return GetNumActiveStreams() + num_draining_streams() > 0;
 }
 
-bool QuicSpdySession::ProcessPendingStream(PendingStream* pending) {
+QuicStream* QuicSpdySession::ProcessPendingStream(PendingStream* pending) {
   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
   QUICHE_DCHECK(connection()->connected());
   struct iovec iov;
   if (!pending->sequencer()->GetReadableRegion(&iov)) {
     // The first byte hasn't been received yet.
-    return false;
+    return nullptr;
   }
 
   QuicDataReader reader(static_cast<char*>(iov.iov_base), iov.iov_len);
@@ -1367,7 +1367,7 @@
       // Mark all bytes consumed in order to close stream.
       pending->MarkConsumed(pending->sequencer()->close_offset());
     }
-    return false;
+    return nullptr;
   }
   pending->MarkConsumed(stream_type_length);
 
@@ -1375,58 +1375,54 @@
     case kControlStream: {  // HTTP/3 control stream.
       if (receive_control_stream_) {
         CloseConnectionOnDuplicateHttp3UnidirectionalStreams("Control");
-        return false;
+        return nullptr;
       }
       auto receive_stream =
           std::make_unique<QuicReceiveControlStream>(pending, this);
       receive_control_stream_ = receive_stream.get();
       ActivateStream(std::move(receive_stream));
-      receive_control_stream_->SetUnblocked();
       QUIC_DVLOG(1) << ENDPOINT << "Receive Control stream is created";
       if (debug_visitor_ != nullptr) {
         debug_visitor_->OnPeerControlStreamCreated(
             receive_control_stream_->id());
       }
-      return true;
+      return receive_control_stream_;
     }
     case kServerPushStream: {  // Push Stream.
       QuicSpdyStream* stream = CreateIncomingStream(pending);
-      stream->SetUnblocked();
-      return true;
+      return stream;
     }
     case kQpackEncoderStream: {  // QPACK encoder stream.
       if (qpack_encoder_receive_stream_) {
         CloseConnectionOnDuplicateHttp3UnidirectionalStreams("QPACK encoder");
-        return false;
+        return nullptr;
       }
       auto encoder_receive = std::make_unique<QpackReceiveStream>(
           pending, this, qpack_decoder_->encoder_stream_receiver());
       qpack_encoder_receive_stream_ = encoder_receive.get();
       ActivateStream(std::move(encoder_receive));
-      qpack_encoder_receive_stream_->SetUnblocked();
       QUIC_DVLOG(1) << ENDPOINT << "Receive QPACK Encoder stream is created";
       if (debug_visitor_ != nullptr) {
         debug_visitor_->OnPeerQpackEncoderStreamCreated(
             qpack_encoder_receive_stream_->id());
       }
-      return true;
+      return qpack_encoder_receive_stream_;
     }
     case kQpackDecoderStream: {  // QPACK decoder stream.
       if (qpack_decoder_receive_stream_) {
         CloseConnectionOnDuplicateHttp3UnidirectionalStreams("QPACK decoder");
-        return false;
+        return nullptr;
       }
       auto decoder_receive = std::make_unique<QpackReceiveStream>(
           pending, this, qpack_encoder_->decoder_stream_receiver());
       qpack_decoder_receive_stream_ = decoder_receive.get();
       ActivateStream(std::move(decoder_receive));
-      qpack_decoder_receive_stream_->SetUnblocked();
       QUIC_DVLOG(1) << ENDPOINT << "Receive QPACK Decoder stream is created";
       if (debug_visitor_ != nullptr) {
         debug_visitor_->OnPeerQpackDecoderStreamCreated(
             qpack_decoder_receive_stream_->id());
       }
-      return true;
+      return qpack_decoder_receive_stream_;
     }
     default:
       if (GetQuicReloadableFlag(quic_unify_stop_sending)) {
@@ -1440,7 +1436,7 @@
       }
       pending->StopReading();
   }
-  return false;
+  return nullptr;
 }
 
 void QuicSpdySession::MaybeInitializeHttp3UnidirectionalStreams() {
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index 596e9cb..a3fda3f 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -476,10 +476,10 @@
   // Overridden to buffer incoming unidirectional streams for version 99.
   bool UsesPendingStreams() const override;
 
-  // Overridden to Process HTTP/3 stream types. H/3 streams will be created from
-  // pending streams accordingly if the stream type can be read. Returns true if
-  // unidirectional streams are created.
-  bool ProcessPendingStream(PendingStream* pending) override;
+  // Processes incoming unidirectional streams; parses the stream type, and
+  // creates a new stream of the corresponding type.  Returns the pointer to the
+  // newly created stream, or nullptr if the stream type is not yet available.
+  QuicStream* ProcessPendingStream(PendingStream* pending) override;
 
   size_t WriteHeadersOnHeadersStreamImpl(
       QuicStreamId id,
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index f3ec720..61d4697 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -168,8 +168,6 @@
   // Clears |header_list_|.
   void ConsumeHeaderList();
 
-  void SetUnblocked() { sequencer()->SetUnblocked(); }
-
   // This block of functions wraps the sequencer's functions of the same
   // name.  These methods return uncompressed data until that has
   // been fully processed.  Then they simply delegate to the sequencer.
diff --git a/quic/core/qpack/qpack_receive_stream.h b/quic/core/qpack/qpack_receive_stream.h
index c9d2a77..6c04b94 100644
--- a/quic/core/qpack/qpack_receive_stream.h
+++ b/quic/core/qpack/qpack_receive_stream.h
@@ -33,8 +33,6 @@
   // Implementation of QuicStream.
   void OnDataAvailable() override;
 
-  void SetUnblocked() { sequencer()->SetUnblocked(); }
-
  private:
   QpackStreamReceiver* receiver_;
 };
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 7b912e9..b7622f5 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -182,11 +182,13 @@
   if (!connection()->connected()) {
     return;
   }
-  if (ProcessPendingStream(pending)) {
+  QuicStream* stream = ProcessPendingStream(pending);
+  if (stream != nullptr) {
     // The pending stream should now be in the scope of normal streams.
     QUICHE_DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
         << "Stream " << stream_id << " not created";
     pending_stream_map_.erase(stream_id);
+    stream->OnStreamCreatedFromPendingStream();
     return;
   }
   if (pending->sequencer()->IsClosed()) {
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index cd496cf..f225f26 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -745,10 +745,11 @@
   size_t num_draining_streams() const { return num_draining_streams_; }
 
   // Processes the stream type information of |pending| depending on
-  // different kinds of sessions' own rules. Returns true if the pending stream
-  // is converted into a normal stream.
-  virtual bool ProcessPendingStream(PendingStream* /*pending*/) {
-    return false;
+  // different kinds of sessions' own rules. If the pending stream has been
+  // converted to a normal stream, returns a pointer to the new stream;
+  // otherwise, returns nullptr.
+  virtual QuicStream* ProcessPendingStream(PendingStream* /*pending*/) {
+    return nullptr;
   }
 
   // Called by applications to perform |action| on active streams.
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index 7daec2b..5f4d097 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -293,14 +293,13 @@
   // QuicSession doesn't do anything in this method. So it's overridden here to
   // test that the session handles pending streams correctly in terms of
   // receiving stream frames.
-  bool ProcessPendingStream(PendingStream* pending) override {
+  QuicStream* ProcessPendingStream(PendingStream* pending) override {
     struct iovec iov;
     if (pending->sequencer()->GetReadableRegion(&iov)) {
       // Create TestStream once the first byte is received.
-      CreateIncomingStream(pending);
-      return true;
+      return CreateIncomingStream(pending);
     }
-    return false;
+    return nullptr;
   }
 
   bool IsClosedStream(QuicStreamId id) {
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 5e2ebd4..3f1b7d9 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -1454,4 +1454,8 @@
              : absl::nullopt;
 }
 
+void QuicStream::OnStreamCreatedFromPendingStream() {
+  sequencer()->SetUnblocked();
+}
+
 }  // namespace quic
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index 822ec18..6e6e623 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -379,6 +379,10 @@
   // True if buffered data in send buffer is below buffered_data_threshold_.
   bool CanWriteNewData() const;
 
+  // Called immediately after the stream is created from a pending stream,
+  // indicating it can start processing data.
+  void OnStreamCreatedFromPendingStream();
+
  protected:
   // Called when data of [offset, offset + data_length] is buffered in send
   // buffer.