diff --git a/quic/core/http/quic_receive_control_stream_test.cc b/quic/core/http/quic_receive_control_stream_test.cc
index 0dba0af..73e8fed 100644
--- a/quic/core/http/quic_receive_control_stream_test.cc
+++ b/quic/core/http/quic_receive_control_stream_test.cc
@@ -50,7 +50,7 @@
   std::vector<TestParams> params;
   ParsedQuicVersionVector all_supported_versions = AllSupportedVersions();
   for (const auto& version : AllSupportedVersions()) {
-    if (!VersionHasControlStreams(version.transport_version)) {
+    if (!VersionHasStreamType(version.transport_version)) {
       continue;
     }
     for (Perspective p : {Perspective::IS_SERVER, Perspective::IS_CLIENT}) {
diff --git a/quic/core/http/quic_send_control_stream_test.cc b/quic/core/http/quic_send_control_stream_test.cc
index 980cdf5..a015c5f 100644
--- a/quic/core/http/quic_send_control_stream_test.cc
+++ b/quic/core/http/quic_send_control_stream_test.cc
@@ -50,7 +50,7 @@
   std::vector<TestParams> params;
   ParsedQuicVersionVector all_supported_versions = AllSupportedVersions();
   for (const auto& version : AllSupportedVersions()) {
-    if (!VersionHasControlStreams(version.transport_version)) {
+    if (!VersionHasStreamType(version.transport_version)) {
       continue;
     }
     for (Perspective p : {Perspective::IS_SERVER, Perspective::IS_CLIENT}) {
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index 93d10e1..35b2d5e 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -51,6 +51,9 @@
 
 namespace {
 
+// TODO(renjietang): remove this once HTTP/3 error codes are adopted.
+const uint16_t kHttpUnknownStreamType = 0x0D;
+
 class HeaderTableDebugVisitor : public HpackHeaderTable::DebugVisitorInterface {
  public:
   HeaderTableDebugVisitor(const QuicClock* clock,
@@ -563,8 +566,8 @@
 }
 
 bool QuicSpdySession::UsesPendingStreams() const {
-  DCHECK(VersionHasControlStreams(connection()->transport_version()));
-  return false;
+  DCHECK(VersionHasStreamType(connection()->transport_version()));
+  return true;
 }
 
 size_t QuicSpdySession::WriteHeadersOnHeadersStreamImpl(
@@ -730,40 +733,41 @@
   return false;
 }
 
-void QuicSpdySession::ProcessPendingStream(PendingStream* pending) {
-  DCHECK(VersionHasControlStreams(connection()->transport_version()));
+bool QuicSpdySession::ProcessPendingStream(PendingStream* pending) {
+  DCHECK(VersionHasStreamType(connection()->transport_version()));
   struct iovec iov;
   if (!pending->sequencer()->GetReadableRegion(&iov)) {
-    // We don't have the first byte yet.
-    return;
+    // The first byte hasn't been received yet.
+    return false;
   }
 
   QuicDataReader reader(static_cast<char*>(iov.iov_base), iov.iov_len);
+  uint8_t stream_type_length = reader.PeekVarInt62Length();
   uint64_t stream_type = 0;
   if (!reader.ReadVarInt62(&stream_type)) {
-    return;
+    return false;
   }
-  CreateIncomingStreamFromPending(pending->id(), stream_type);
-}
+  pending->MarkConsumed(stream_type_length);
 
-void QuicSpdySession::CreateIncomingStreamFromPending(QuicStreamId id,
-                                                      uint64_t stream_type) {
   switch (stream_type) {
-    case 0x00:  // HTTP/3 control stream.
+    case kControlStream:  // HTTP/3 control stream.
       // TODO(renjietang): Create incoming control stream.
       break;
-    case 0x01:  // Push Stream.
-      break;
-    case 0x02:  // QPACK encoder stream.
+    case kServerPushStream: {  // Push Stream.
+      QuicSpdyStream* stream = CreateIncomingStream(std::move(*pending));
+      stream->SetUnblocked();
+      return true;
+    }
+    case kQpackEncoderStream:  // QPACK encoder stream.
       // TODO(bnc): Create QPACK encoder stream.
       break;
-    case 0x03:  // QPACK decoder stream.
+    case kQpackDecoderStream:  // QPACK decoder stream.
       // TODO(bnc): Create QPACK decoder stream.
       break;
     default:
-      SendStopSending(0x0D, id);
-      return;
+      SendStopSending(kHttpUnknownStreamType, pending->id());
   }
+  return false;
 }
 
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index 1935a2f..d0978c1 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -28,6 +28,12 @@
 class QuicSpdySessionPeer;
 }  // namespace test
 
+// Unidirectional stream types define by IETF HTTP/3 draft in section 3.2.
+const uint64_t kControlStream = 0;
+const uint64_t kServerPushStream = 1;
+const uint64_t kQpackEncoderStream = 2;
+const uint64_t kQpackDecoderStream = 3;
+
 // QuicHpackDebugVisitor gathers data used for understanding HPACK HoL
 // dynamics.  Specifically, it is to help predict the compression
 // penalty of avoiding HoL by chagning how the dynamic table is used.
@@ -198,9 +204,10 @@
   // Overridden to buffer incoming unidirectional streams for version 99.
   bool UsesPendingStreams() const override;
 
-  // Overridden to Process HTTP/3 stream types. No action will be taken if
-  // stream type cannot be read.
-  void ProcessPendingStream(PendingStream* pending) override;
+  // Overridden to Process HTTP/3 stream types. H/3 streams will be created from
+  // pending streams accordingly if the stream type can be read. Returns true if
+  // unidirectional streams are created.
+  bool ProcessPendingStream(PendingStream* pending) override;
 
   size_t WriteHeadersOnHeadersStreamImpl(
       QuicStreamId id,
@@ -241,10 +248,6 @@
   void set_max_uncompressed_header_bytes(
       size_t set_max_uncompressed_header_bytes);
 
-  // Creates HTTP/3 unidirectional stream of |id| and |type|. Sends
-  // STOP_SENDING frame if |type| is not supported.
-  void CreateIncomingStreamFromPending(QuicStreamId id, uint64_t type);
-
  private:
   friend class test::QuicSpdySessionPeer;
 
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 1a33fd1..d40ad73 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -417,10 +417,10 @@
                          ::testing::ValuesIn(AllSupportedVersions()));
 
 TEST_P(QuicSpdySessionTestServer, UsesPendingStreams) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
-  EXPECT_FALSE(session_.UsesPendingStreams());
+  EXPECT_TRUE(session_.UsesPendingStreams());
 }
 
 TEST_P(QuicSpdySessionTestServer, PeerAddress) {
@@ -1619,10 +1619,10 @@
                          ::testing::ValuesIn(AllSupportedVersions()));
 
 TEST_P(QuicSpdySessionTestClient, UsesPendingStreams) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
-  EXPECT_FALSE(session_.UsesPendingStreams());
+  EXPECT_TRUE(session_.UsesPendingStreams());
 }
 
 TEST_P(QuicSpdySessionTestClient, AvailableStreamsClient) {
@@ -1718,6 +1718,55 @@
   }
 }
 
+TEST_P(QuicSpdySessionTestClient, Http3ServerPush) {
+  if (!VersionHasStreamType(transport_version())) {
+    return;
+  }
+
+  char type[] = {0x01};
+  std::string data = std::string(type, 1) + "header";
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+  QuicStreamId stream_id1 =
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+  QuicStreamFrame data1(stream_id1, false, 0, QuicStringPiece(data));
+  session_.OnStreamFrame(data1);
+  EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams());
+  QuicStream* stream = session_.GetOrCreateDynamicStream(stream_id1);
+  EXPECT_EQ(1u, stream->flow_controller()->bytes_consumed());
+  EXPECT_EQ(1u, session_.flow_controller()->bytes_consumed());
+
+  char unoptimized_type[] = {0x80, 0x00, 0x00, 0x01};
+  data = std::string(unoptimized_type, 4) + "header";
+  QuicStreamId stream_id2 =
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 1);
+  QuicStreamFrame data2(stream_id2, false, 0, QuicStringPiece(data));
+  session_.OnStreamFrame(data2);
+  EXPECT_EQ(2u, session_.GetNumOpenIncomingStreams());
+  stream = session_.GetOrCreateDynamicStream(stream_id2);
+  EXPECT_EQ(4u, stream->flow_controller()->bytes_consumed());
+  EXPECT_EQ(5u, session_.flow_controller()->bytes_consumed());
+}
+
+TEST_P(QuicSpdySessionTestClient, Http3ServerPushOutofOrderFrame) {
+  if (!VersionHasStreamType(transport_version())) {
+    return;
+  }
+
+  char type[] = {0x01};
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+  QuicStreamFrame data1(
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0),
+      false, 1, QuicStringPiece("header"));
+  session_.OnStreamFrame(data1);
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+
+  QuicStreamFrame data2(
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0),
+      false, 0, QuicStringPiece(type, 1));
+  session_.OnStreamFrame(data2);
+  EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams());
+}
+
 TEST_P(QuicSpdySessionTestServer, ZombieStreams) {
   TestStream* stream2 = session_.CreateOutgoingBidirectionalStream();
   QuicStreamPeer::SetStreamBytesWritten(3, stream2);
@@ -1890,7 +1939,7 @@
 }
 
 TEST_P(QuicSpdySessionTestServer, SimplePendingStreamType) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
   PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
@@ -1909,7 +1958,7 @@
 }
 
 TEST_P(QuicSpdySessionTestServer, SimplePendingStreamTypeOutOfOrderDelivery) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
   PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
@@ -1932,7 +1981,7 @@
 
 TEST_P(QuicSpdySessionTestServer,
        MultipleBytesPendingStreamTypeOutOfOrderDelivery) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
   PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index 0127c5b..d424cc0 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -16,6 +16,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
 #include "net/third_party/quiche/src/quic/core/quic_write_blocked_list.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_arraysize.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
@@ -205,6 +206,23 @@
     SpdyHeaderBlock header_block,
     bool fin,
     QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
+  QuicConnection::ScopedPacketFlusher flusher(
+      spdy_session_->connection(), QuicConnection::SEND_ACK_IF_PENDING);
+  // Send stream type for server push stream
+  if (VersionHasStreamType(session()->connection()->transport_version()) &&
+      type() == WRITE_UNIDIRECTIONAL && send_buffer().stream_offset() == 0) {
+    char data[sizeof(kServerPushStream)];
+    QuicDataWriter writer(QUIC_ARRAYSIZE(data), data);
+    writer.WriteVarInt62(kServerPushStream);
+
+    // Similar to frame headers, stream type byte shouldn't be exposed to upper
+    // layer applications.
+    unacked_frame_headers_offsets_.Add(0, writer.length());
+
+    QUIC_LOG(INFO) << "Stream " << id() << " is writing type as server push";
+    WriteOrBufferData(QuicStringPiece(writer.data(), writer.length()), false,
+                      nullptr);
+  }
   size_t bytes_written =
       WriteHeadersImpl(std::move(header_block), fin, std::move(ack_listener));
   if (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index 3c5a166..dc0d6f5 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -153,6 +153,8 @@
   // Clears |header_list_|.
   void ConsumeHeaderList();
 
+  void SetUnblocked() { sequencer()->SetUnblocked(); }
+
   // This block of functions wraps the sequencer's functions of the same
   // name.  These methods return uncompressed data until that has
   // been fully processed.  Then they simply delegate to the sequencer.
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index f605e56..89e08ef 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -157,7 +157,7 @@
 }
 
 void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
-  DCHECK(VersionHasControlStreams(connection()->transport_version()));
+  DCHECK(VersionHasStreamType(connection()->transport_version()));
   QuicStreamId stream_id = frame.stream_id;
 
   PendingStream* pending = GetOrCreatePendingStream(stream_id);
@@ -171,7 +171,12 @@
   }
 
   pending->OnStreamFrame(frame);
-  ProcessPendingStream(pending);
+  if (ProcessPendingStream(pending)) {
+    // The pending stream should now be in the scope of normal streams.
+    DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
+        << "Stream " << stream_id << " not created";
+    pending_stream_map_.erase(stream_id);
+  }
 }
 
 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
@@ -192,7 +197,7 @@
     return;
   }
 
-  if (VersionHasControlStreams(connection()->transport_version()) &&
+  if (VersionHasStreamType(connection()->transport_version()) &&
       UsesPendingStreams() &&
       QuicUtils::GetStreamType(stream_id, perspective(),
                                IsIncomingStream(stream_id)) ==
@@ -327,7 +332,7 @@
 }
 
 void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
-  DCHECK(VersionHasControlStreams(connection()->transport_version()));
+  DCHECK(VersionHasStreamType(connection()->transport_version()));
   QuicStreamId stream_id = frame.stream_id;
 
   PendingStream* pending = GetOrCreatePendingStream(stream_id);
@@ -362,7 +367,7 @@
     visitor_->OnRstStreamReceived(frame);
   }
 
-  if (VersionHasControlStreams(connection()->transport_version()) &&
+  if (VersionHasStreamType(connection()->transport_version()) &&
       UsesPendingStreams() &&
       QuicUtils::GetStreamType(stream_id, perspective(),
                                IsIncomingStream(stream_id)) ==
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index 5e527f3..513ed9b 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -558,8 +558,9 @@
   }
 
   // Processes the stream type information of |pending| depending on
-  // different kinds of sessions' own rules.
-  virtual void ProcessPendingStream(PendingStream* pending) {}
+  // different kinds of sessions' own rules. Returns true if the pending stream
+  // is converted into a normal stream.
+  virtual bool ProcessPendingStream(PendingStream* pending) { return false; }
 
   bool eliminate_static_stream_map() const {
     return eliminate_static_stream_map_;
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index 5b86bdd..e2996db 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -219,12 +219,14 @@
   // QuicSession doesn't do anything in this method. So it's overridden here to
   // test that the session handles pending streams correctly in terms of
   // receiving stream frames.
-  void ProcessPendingStream(PendingStream* pending) override {
+  bool ProcessPendingStream(PendingStream* pending) override {
     struct iovec iov;
     if (pending->sequencer()->GetReadableRegion(&iov)) {
       // Create TestStream once the first byte is received.
       CreateIncomingStream(std::move(*pending));
+      return true;
     }
+    return false;
   }
 
   bool IsClosedStream(QuicStreamId id) {
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 32b168d..94cffd4 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -68,8 +68,9 @@
 }
 
 void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
-  QUIC_BUG << "AddBytesConsumed should not be called.";
-  CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Unexpected bytes consumed");
+  // It will be called when the metadata of the stream is consumed.
+  flow_controller_.AddBytesConsumed(bytes);
+  connection_flow_controller_->AddBytesConsumed(bytes);
 }
 
 void PendingStream::Reset(QuicRstStreamErrorCode error) {
@@ -168,6 +169,10 @@
   return true;
 }
 
+void PendingStream::MarkConsumed(size_t num_bytes) {
+  sequencer_.MarkConsumed(num_bytes);
+}
+
 QuicStream::QuicStream(PendingStream pending, StreamType type, bool is_static)
     : QuicStream(pending.id_,
                  pending.session_,
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index df1f7f8..80ee4e4 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -76,6 +76,8 @@
 
   const QuicStreamSequencer* sequencer() const { return &sequencer_; }
 
+  void MarkConsumed(size_t num_bytes);
+
  private:
   friend class QuicStream;
 
diff --git a/quic/core/quic_versions.h b/quic/core/quic_versions.h
index fb811cf..939ee36 100644
--- a/quic/core/quic_versions.h
+++ b/quic/core/quic_versions.h
@@ -382,8 +382,8 @@
   return transport_version == QUIC_VERSION_99;
 }
 
-// Returns whether |transport_version| has HTTP/3 Control stream.
-QUIC_EXPORT_PRIVATE inline bool VersionHasControlStreams(
+// Returns whether |transport_version| has HTTP/3 stream type.
+QUIC_EXPORT_PRIVATE inline bool VersionHasStreamType(
     QuicTransportVersion transport_version) {
   return transport_version == QUIC_VERSION_99;
 }
