Support HTTP/3 style of Server push.

Server push stream now has stream type of 0x01 encoded as variable length integer. It will be sent along with the first stream write. And the peer will open the incoming stream once the stream type byte is received.

Pending stream is enabled in this CL.

gfe-relnote: version 99 only. Not in production.
PiperOrigin-RevId: 249914001
Change-Id: I291d1cc98ce44f930722608f82f9829da033c213
diff --git a/quic/core/http/quic_receive_control_stream_test.cc b/quic/core/http/quic_receive_control_stream_test.cc
index 0dba0af..73e8fed 100644
--- a/quic/core/http/quic_receive_control_stream_test.cc
+++ b/quic/core/http/quic_receive_control_stream_test.cc
@@ -50,7 +50,7 @@
   std::vector<TestParams> params;
   ParsedQuicVersionVector all_supported_versions = AllSupportedVersions();
   for (const auto& version : AllSupportedVersions()) {
-    if (!VersionHasControlStreams(version.transport_version)) {
+    if (!VersionHasStreamType(version.transport_version)) {
       continue;
     }
     for (Perspective p : {Perspective::IS_SERVER, Perspective::IS_CLIENT}) {
diff --git a/quic/core/http/quic_send_control_stream_test.cc b/quic/core/http/quic_send_control_stream_test.cc
index 980cdf5..a015c5f 100644
--- a/quic/core/http/quic_send_control_stream_test.cc
+++ b/quic/core/http/quic_send_control_stream_test.cc
@@ -50,7 +50,7 @@
   std::vector<TestParams> params;
   ParsedQuicVersionVector all_supported_versions = AllSupportedVersions();
   for (const auto& version : AllSupportedVersions()) {
-    if (!VersionHasControlStreams(version.transport_version)) {
+    if (!VersionHasStreamType(version.transport_version)) {
       continue;
     }
     for (Perspective p : {Perspective::IS_SERVER, Perspective::IS_CLIENT}) {
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index 93d10e1..35b2d5e 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -51,6 +51,9 @@
 
 namespace {
 
+// TODO(renjietang): remove this once HTTP/3 error codes are adopted.
+const uint16_t kHttpUnknownStreamType = 0x0D;
+
 class HeaderTableDebugVisitor : public HpackHeaderTable::DebugVisitorInterface {
  public:
   HeaderTableDebugVisitor(const QuicClock* clock,
@@ -563,8 +566,8 @@
 }
 
 bool QuicSpdySession::UsesPendingStreams() const {
-  DCHECK(VersionHasControlStreams(connection()->transport_version()));
-  return false;
+  DCHECK(VersionHasStreamType(connection()->transport_version()));
+  return true;
 }
 
 size_t QuicSpdySession::WriteHeadersOnHeadersStreamImpl(
@@ -730,40 +733,41 @@
   return false;
 }
 
-void QuicSpdySession::ProcessPendingStream(PendingStream* pending) {
-  DCHECK(VersionHasControlStreams(connection()->transport_version()));
+bool QuicSpdySession::ProcessPendingStream(PendingStream* pending) {
+  DCHECK(VersionHasStreamType(connection()->transport_version()));
   struct iovec iov;
   if (!pending->sequencer()->GetReadableRegion(&iov)) {
-    // We don't have the first byte yet.
-    return;
+    // The first byte hasn't been received yet.
+    return false;
   }
 
   QuicDataReader reader(static_cast<char*>(iov.iov_base), iov.iov_len);
+  uint8_t stream_type_length = reader.PeekVarInt62Length();
   uint64_t stream_type = 0;
   if (!reader.ReadVarInt62(&stream_type)) {
-    return;
+    return false;
   }
-  CreateIncomingStreamFromPending(pending->id(), stream_type);
-}
+  pending->MarkConsumed(stream_type_length);
 
-void QuicSpdySession::CreateIncomingStreamFromPending(QuicStreamId id,
-                                                      uint64_t stream_type) {
   switch (stream_type) {
-    case 0x00:  // HTTP/3 control stream.
+    case kControlStream:  // HTTP/3 control stream.
       // TODO(renjietang): Create incoming control stream.
       break;
-    case 0x01:  // Push Stream.
-      break;
-    case 0x02:  // QPACK encoder stream.
+    case kServerPushStream: {  // Push Stream.
+      QuicSpdyStream* stream = CreateIncomingStream(std::move(*pending));
+      stream->SetUnblocked();
+      return true;
+    }
+    case kQpackEncoderStream:  // QPACK encoder stream.
       // TODO(bnc): Create QPACK encoder stream.
       break;
-    case 0x03:  // QPACK decoder stream.
+    case kQpackDecoderStream:  // QPACK decoder stream.
       // TODO(bnc): Create QPACK decoder stream.
       break;
     default:
-      SendStopSending(0x0D, id);
-      return;
+      SendStopSending(kHttpUnknownStreamType, pending->id());
   }
+  return false;
 }
 
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index 1935a2f..d0978c1 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -28,6 +28,12 @@
 class QuicSpdySessionPeer;
 }  // namespace test
 
+// Unidirectional stream types define by IETF HTTP/3 draft in section 3.2.
+const uint64_t kControlStream = 0;
+const uint64_t kServerPushStream = 1;
+const uint64_t kQpackEncoderStream = 2;
+const uint64_t kQpackDecoderStream = 3;
+
 // QuicHpackDebugVisitor gathers data used for understanding HPACK HoL
 // dynamics.  Specifically, it is to help predict the compression
 // penalty of avoiding HoL by chagning how the dynamic table is used.
@@ -198,9 +204,10 @@
   // Overridden to buffer incoming unidirectional streams for version 99.
   bool UsesPendingStreams() const override;
 
-  // Overridden to Process HTTP/3 stream types. No action will be taken if
-  // stream type cannot be read.
-  void ProcessPendingStream(PendingStream* pending) override;
+  // Overridden to Process HTTP/3 stream types. H/3 streams will be created from
+  // pending streams accordingly if the stream type can be read. Returns true if
+  // unidirectional streams are created.
+  bool ProcessPendingStream(PendingStream* pending) override;
 
   size_t WriteHeadersOnHeadersStreamImpl(
       QuicStreamId id,
@@ -241,10 +248,6 @@
   void set_max_uncompressed_header_bytes(
       size_t set_max_uncompressed_header_bytes);
 
-  // Creates HTTP/3 unidirectional stream of |id| and |type|. Sends
-  // STOP_SENDING frame if |type| is not supported.
-  void CreateIncomingStreamFromPending(QuicStreamId id, uint64_t type);
-
  private:
   friend class test::QuicSpdySessionPeer;
 
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 1a33fd1..d40ad73 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -417,10 +417,10 @@
                          ::testing::ValuesIn(AllSupportedVersions()));
 
 TEST_P(QuicSpdySessionTestServer, UsesPendingStreams) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
-  EXPECT_FALSE(session_.UsesPendingStreams());
+  EXPECT_TRUE(session_.UsesPendingStreams());
 }
 
 TEST_P(QuicSpdySessionTestServer, PeerAddress) {
@@ -1619,10 +1619,10 @@
                          ::testing::ValuesIn(AllSupportedVersions()));
 
 TEST_P(QuicSpdySessionTestClient, UsesPendingStreams) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
-  EXPECT_FALSE(session_.UsesPendingStreams());
+  EXPECT_TRUE(session_.UsesPendingStreams());
 }
 
 TEST_P(QuicSpdySessionTestClient, AvailableStreamsClient) {
@@ -1718,6 +1718,55 @@
   }
 }
 
+TEST_P(QuicSpdySessionTestClient, Http3ServerPush) {
+  if (!VersionHasStreamType(transport_version())) {
+    return;
+  }
+
+  char type[] = {0x01};
+  std::string data = std::string(type, 1) + "header";
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+  QuicStreamId stream_id1 =
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+  QuicStreamFrame data1(stream_id1, false, 0, QuicStringPiece(data));
+  session_.OnStreamFrame(data1);
+  EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams());
+  QuicStream* stream = session_.GetOrCreateDynamicStream(stream_id1);
+  EXPECT_EQ(1u, stream->flow_controller()->bytes_consumed());
+  EXPECT_EQ(1u, session_.flow_controller()->bytes_consumed());
+
+  char unoptimized_type[] = {0x80, 0x00, 0x00, 0x01};
+  data = std::string(unoptimized_type, 4) + "header";
+  QuicStreamId stream_id2 =
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 1);
+  QuicStreamFrame data2(stream_id2, false, 0, QuicStringPiece(data));
+  session_.OnStreamFrame(data2);
+  EXPECT_EQ(2u, session_.GetNumOpenIncomingStreams());
+  stream = session_.GetOrCreateDynamicStream(stream_id2);
+  EXPECT_EQ(4u, stream->flow_controller()->bytes_consumed());
+  EXPECT_EQ(5u, session_.flow_controller()->bytes_consumed());
+}
+
+TEST_P(QuicSpdySessionTestClient, Http3ServerPushOutofOrderFrame) {
+  if (!VersionHasStreamType(transport_version())) {
+    return;
+  }
+
+  char type[] = {0x01};
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+  QuicStreamFrame data1(
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0),
+      false, 1, QuicStringPiece("header"));
+  session_.OnStreamFrame(data1);
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+
+  QuicStreamFrame data2(
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0),
+      false, 0, QuicStringPiece(type, 1));
+  session_.OnStreamFrame(data2);
+  EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams());
+}
+
 TEST_P(QuicSpdySessionTestServer, ZombieStreams) {
   TestStream* stream2 = session_.CreateOutgoingBidirectionalStream();
   QuicStreamPeer::SetStreamBytesWritten(3, stream2);
@@ -1890,7 +1939,7 @@
 }
 
 TEST_P(QuicSpdySessionTestServer, SimplePendingStreamType) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
   PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
@@ -1909,7 +1958,7 @@
 }
 
 TEST_P(QuicSpdySessionTestServer, SimplePendingStreamTypeOutOfOrderDelivery) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
   PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
@@ -1932,7 +1981,7 @@
 
 TEST_P(QuicSpdySessionTestServer,
        MultipleBytesPendingStreamTypeOutOfOrderDelivery) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
   PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index 0127c5b..d424cc0 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -16,6 +16,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
 #include "net/third_party/quiche/src/quic/core/quic_write_blocked_list.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_arraysize.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
@@ -205,6 +206,23 @@
     SpdyHeaderBlock header_block,
     bool fin,
     QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
+  QuicConnection::ScopedPacketFlusher flusher(
+      spdy_session_->connection(), QuicConnection::SEND_ACK_IF_PENDING);
+  // Send stream type for server push stream
+  if (VersionHasStreamType(session()->connection()->transport_version()) &&
+      type() == WRITE_UNIDIRECTIONAL && send_buffer().stream_offset() == 0) {
+    char data[sizeof(kServerPushStream)];
+    QuicDataWriter writer(QUIC_ARRAYSIZE(data), data);
+    writer.WriteVarInt62(kServerPushStream);
+
+    // Similar to frame headers, stream type byte shouldn't be exposed to upper
+    // layer applications.
+    unacked_frame_headers_offsets_.Add(0, writer.length());
+
+    QUIC_LOG(INFO) << "Stream " << id() << " is writing type as server push";
+    WriteOrBufferData(QuicStringPiece(writer.data(), writer.length()), false,
+                      nullptr);
+  }
   size_t bytes_written =
       WriteHeadersImpl(std::move(header_block), fin, std::move(ack_listener));
   if (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index 3c5a166..dc0d6f5 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -153,6 +153,8 @@
   // Clears |header_list_|.
   void ConsumeHeaderList();
 
+  void SetUnblocked() { sequencer()->SetUnblocked(); }
+
   // This block of functions wraps the sequencer's functions of the same
   // name.  These methods return uncompressed data until that has
   // been fully processed.  Then they simply delegate to the sequencer.
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index f605e56..89e08ef 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -157,7 +157,7 @@
 }
 
 void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
-  DCHECK(VersionHasControlStreams(connection()->transport_version()));
+  DCHECK(VersionHasStreamType(connection()->transport_version()));
   QuicStreamId stream_id = frame.stream_id;
 
   PendingStream* pending = GetOrCreatePendingStream(stream_id);
@@ -171,7 +171,12 @@
   }
 
   pending->OnStreamFrame(frame);
-  ProcessPendingStream(pending);
+  if (ProcessPendingStream(pending)) {
+    // The pending stream should now be in the scope of normal streams.
+    DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
+        << "Stream " << stream_id << " not created";
+    pending_stream_map_.erase(stream_id);
+  }
 }
 
 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
@@ -192,7 +197,7 @@
     return;
   }
 
-  if (VersionHasControlStreams(connection()->transport_version()) &&
+  if (VersionHasStreamType(connection()->transport_version()) &&
       UsesPendingStreams() &&
       QuicUtils::GetStreamType(stream_id, perspective(),
                                IsIncomingStream(stream_id)) ==
@@ -327,7 +332,7 @@
 }
 
 void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
-  DCHECK(VersionHasControlStreams(connection()->transport_version()));
+  DCHECK(VersionHasStreamType(connection()->transport_version()));
   QuicStreamId stream_id = frame.stream_id;
 
   PendingStream* pending = GetOrCreatePendingStream(stream_id);
@@ -362,7 +367,7 @@
     visitor_->OnRstStreamReceived(frame);
   }
 
-  if (VersionHasControlStreams(connection()->transport_version()) &&
+  if (VersionHasStreamType(connection()->transport_version()) &&
       UsesPendingStreams() &&
       QuicUtils::GetStreamType(stream_id, perspective(),
                                IsIncomingStream(stream_id)) ==
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index 5e527f3..513ed9b 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -558,8 +558,9 @@
   }
 
   // Processes the stream type information of |pending| depending on
-  // different kinds of sessions' own rules.
-  virtual void ProcessPendingStream(PendingStream* pending) {}
+  // different kinds of sessions' own rules. Returns true if the pending stream
+  // is converted into a normal stream.
+  virtual bool ProcessPendingStream(PendingStream* pending) { return false; }
 
   bool eliminate_static_stream_map() const {
     return eliminate_static_stream_map_;
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index 5b86bdd..e2996db 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -219,12 +219,14 @@
   // QuicSession doesn't do anything in this method. So it's overridden here to
   // test that the session handles pending streams correctly in terms of
   // receiving stream frames.
-  void ProcessPendingStream(PendingStream* pending) override {
+  bool ProcessPendingStream(PendingStream* pending) override {
     struct iovec iov;
     if (pending->sequencer()->GetReadableRegion(&iov)) {
       // Create TestStream once the first byte is received.
       CreateIncomingStream(std::move(*pending));
+      return true;
     }
+    return false;
   }
 
   bool IsClosedStream(QuicStreamId id) {
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 32b168d..94cffd4 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -68,8 +68,9 @@
 }
 
 void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
-  QUIC_BUG << "AddBytesConsumed should not be called.";
-  CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Unexpected bytes consumed");
+  // It will be called when the metadata of the stream is consumed.
+  flow_controller_.AddBytesConsumed(bytes);
+  connection_flow_controller_->AddBytesConsumed(bytes);
 }
 
 void PendingStream::Reset(QuicRstStreamErrorCode error) {
@@ -168,6 +169,10 @@
   return true;
 }
 
+void PendingStream::MarkConsumed(size_t num_bytes) {
+  sequencer_.MarkConsumed(num_bytes);
+}
+
 QuicStream::QuicStream(PendingStream pending, StreamType type, bool is_static)
     : QuicStream(pending.id_,
                  pending.session_,
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index df1f7f8..80ee4e4 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -76,6 +76,8 @@
 
   const QuicStreamSequencer* sequencer() const { return &sequencer_; }
 
+  void MarkConsumed(size_t num_bytes);
+
  private:
   friend class QuicStream;
 
diff --git a/quic/core/quic_versions.h b/quic/core/quic_versions.h
index fb811cf..939ee36 100644
--- a/quic/core/quic_versions.h
+++ b/quic/core/quic_versions.h
@@ -382,8 +382,8 @@
   return transport_version == QUIC_VERSION_99;
 }
 
-// Returns whether |transport_version| has HTTP/3 Control stream.
-QUIC_EXPORT_PRIVATE inline bool VersionHasControlStreams(
+// Returns whether |transport_version| has HTTP/3 stream type.
+QUIC_EXPORT_PRIVATE inline bool VersionHasStreamType(
     QuicTransportVersion transport_version) {
   return transport_version == QUIC_VERSION_99;
 }
diff --git a/quic/tools/quic_simple_server_session_test.cc b/quic/tools/quic_simple_server_session_test.cc
index e08af9a..58c98dd 100644
--- a/quic/tools/quic_simple_server_session_test.cc
+++ b/quic/tools/quic_simple_server_session_test.cc
@@ -648,6 +648,12 @@
         // Since flow control window is smaller than response body, not the
         // whole body will be sent.
         QuicStreamOffset offset = 0;
+        if (VersionHasStreamType(connection_->transport_version())) {
+          EXPECT_CALL(*connection_,
+                      SendStreamData(stream_id, 1, offset, NO_FIN));
+          offset++;
+        }
+
         if (VersionUsesQpack(connection_->transport_version())) {
           EXPECT_CALL(*connection_,
                       SendStreamData(stream_id, kHeadersFrameHeaderLength,
@@ -713,6 +719,11 @@
   // After an open stream is marked draining, a new stream is expected to be
   // created and a response sent on the stream.
   QuicStreamOffset offset = 0;
+  if (VersionHasStreamType(connection_->transport_version())) {
+    EXPECT_CALL(*connection_,
+                SendStreamData(next_out_going_stream_id, 1, offset, NO_FIN));
+    offset++;
+  }
   if (VersionUsesQpack(connection_->transport_version())) {
     EXPECT_CALL(*connection_,
                 SendStreamData(next_out_going_stream_id,
@@ -790,6 +801,11 @@
       GetNthServerInitiatedUnidirectionalId(kMaxStreamsForTest);
   InSequence s;
   QuicStreamOffset offset = 0;
+  if (VersionHasStreamType(connection_->transport_version())) {
+    EXPECT_CALL(*connection_,
+                SendStreamData(stream_not_reset, 1, offset, NO_FIN));
+    offset++;
+  }
   if (VersionUsesQpack(connection_->transport_version())) {
     EXPECT_CALL(*connection_,
                 SendStreamData(stream_not_reset, kHeadersFrameHeaderLength,
@@ -853,6 +869,11 @@
                 OnStreamReset(stream_got_reset, QUIC_RST_ACKNOWLEDGEMENT));
   }
   QuicStreamOffset offset = 0;
+  if (VersionHasStreamType(connection_->transport_version())) {
+    EXPECT_CALL(*connection_,
+                SendStreamData(stream_to_open, 1, offset, NO_FIN));
+    offset++;
+  }
   if (VersionUsesQpack(connection_->transport_version())) {
     EXPECT_CALL(*connection_,
                 SendStreamData(stream_to_open, kHeadersFrameHeaderLength,