Support HTTP/3 style of Server push.

Server push stream now has stream type of 0x01 encoded as variable length integer. It will be sent along with the first stream write. And the peer will open the incoming stream once the stream type byte is received.

Pending stream is enabled in this CL.

gfe-relnote: version 99 only. Not in production.
PiperOrigin-RevId: 249914001
Change-Id: I291d1cc98ce44f930722608f82f9829da033c213
diff --git a/quic/core/http/quic_receive_control_stream_test.cc b/quic/core/http/quic_receive_control_stream_test.cc
index 0dba0af..73e8fed 100644
--- a/quic/core/http/quic_receive_control_stream_test.cc
+++ b/quic/core/http/quic_receive_control_stream_test.cc
@@ -50,7 +50,7 @@
   std::vector<TestParams> params;
   ParsedQuicVersionVector all_supported_versions = AllSupportedVersions();
   for (const auto& version : AllSupportedVersions()) {
-    if (!VersionHasControlStreams(version.transport_version)) {
+    if (!VersionHasStreamType(version.transport_version)) {
       continue;
     }
     for (Perspective p : {Perspective::IS_SERVER, Perspective::IS_CLIENT}) {
diff --git a/quic/core/http/quic_send_control_stream_test.cc b/quic/core/http/quic_send_control_stream_test.cc
index 980cdf5..a015c5f 100644
--- a/quic/core/http/quic_send_control_stream_test.cc
+++ b/quic/core/http/quic_send_control_stream_test.cc
@@ -50,7 +50,7 @@
   std::vector<TestParams> params;
   ParsedQuicVersionVector all_supported_versions = AllSupportedVersions();
   for (const auto& version : AllSupportedVersions()) {
-    if (!VersionHasControlStreams(version.transport_version)) {
+    if (!VersionHasStreamType(version.transport_version)) {
       continue;
     }
     for (Perspective p : {Perspective::IS_SERVER, Perspective::IS_CLIENT}) {
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index 93d10e1..35b2d5e 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -51,6 +51,9 @@
 
 namespace {
 
+// TODO(renjietang): remove this once HTTP/3 error codes are adopted.
+const uint16_t kHttpUnknownStreamType = 0x0D;
+
 class HeaderTableDebugVisitor : public HpackHeaderTable::DebugVisitorInterface {
  public:
   HeaderTableDebugVisitor(const QuicClock* clock,
@@ -563,8 +566,8 @@
 }
 
 bool QuicSpdySession::UsesPendingStreams() const {
-  DCHECK(VersionHasControlStreams(connection()->transport_version()));
-  return false;
+  DCHECK(VersionHasStreamType(connection()->transport_version()));
+  return true;
 }
 
 size_t QuicSpdySession::WriteHeadersOnHeadersStreamImpl(
@@ -730,40 +733,41 @@
   return false;
 }
 
-void QuicSpdySession::ProcessPendingStream(PendingStream* pending) {
-  DCHECK(VersionHasControlStreams(connection()->transport_version()));
+bool QuicSpdySession::ProcessPendingStream(PendingStream* pending) {
+  DCHECK(VersionHasStreamType(connection()->transport_version()));
   struct iovec iov;
   if (!pending->sequencer()->GetReadableRegion(&iov)) {
-    // We don't have the first byte yet.
-    return;
+    // The first byte hasn't been received yet.
+    return false;
   }
 
   QuicDataReader reader(static_cast<char*>(iov.iov_base), iov.iov_len);
+  uint8_t stream_type_length = reader.PeekVarInt62Length();
   uint64_t stream_type = 0;
   if (!reader.ReadVarInt62(&stream_type)) {
-    return;
+    return false;
   }
-  CreateIncomingStreamFromPending(pending->id(), stream_type);
-}
+  pending->MarkConsumed(stream_type_length);
 
-void QuicSpdySession::CreateIncomingStreamFromPending(QuicStreamId id,
-                                                      uint64_t stream_type) {
   switch (stream_type) {
-    case 0x00:  // HTTP/3 control stream.
+    case kControlStream:  // HTTP/3 control stream.
       // TODO(renjietang): Create incoming control stream.
       break;
-    case 0x01:  // Push Stream.
-      break;
-    case 0x02:  // QPACK encoder stream.
+    case kServerPushStream: {  // Push Stream.
+      QuicSpdyStream* stream = CreateIncomingStream(std::move(*pending));
+      stream->SetUnblocked();
+      return true;
+    }
+    case kQpackEncoderStream:  // QPACK encoder stream.
       // TODO(bnc): Create QPACK encoder stream.
       break;
-    case 0x03:  // QPACK decoder stream.
+    case kQpackDecoderStream:  // QPACK decoder stream.
       // TODO(bnc): Create QPACK decoder stream.
       break;
     default:
-      SendStopSending(0x0D, id);
-      return;
+      SendStopSending(kHttpUnknownStreamType, pending->id());
   }
+  return false;
 }
 
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index 1935a2f..d0978c1 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -28,6 +28,12 @@
 class QuicSpdySessionPeer;
 }  // namespace test
 
+// Unidirectional stream types define by IETF HTTP/3 draft in section 3.2.
+const uint64_t kControlStream = 0;
+const uint64_t kServerPushStream = 1;
+const uint64_t kQpackEncoderStream = 2;
+const uint64_t kQpackDecoderStream = 3;
+
 // QuicHpackDebugVisitor gathers data used for understanding HPACK HoL
 // dynamics.  Specifically, it is to help predict the compression
 // penalty of avoiding HoL by chagning how the dynamic table is used.
@@ -198,9 +204,10 @@
   // Overridden to buffer incoming unidirectional streams for version 99.
   bool UsesPendingStreams() const override;
 
-  // Overridden to Process HTTP/3 stream types. No action will be taken if
-  // stream type cannot be read.
-  void ProcessPendingStream(PendingStream* pending) override;
+  // Overridden to Process HTTP/3 stream types. H/3 streams will be created from
+  // pending streams accordingly if the stream type can be read. Returns true if
+  // unidirectional streams are created.
+  bool ProcessPendingStream(PendingStream* pending) override;
 
   size_t WriteHeadersOnHeadersStreamImpl(
       QuicStreamId id,
@@ -241,10 +248,6 @@
   void set_max_uncompressed_header_bytes(
       size_t set_max_uncompressed_header_bytes);
 
-  // Creates HTTP/3 unidirectional stream of |id| and |type|. Sends
-  // STOP_SENDING frame if |type| is not supported.
-  void CreateIncomingStreamFromPending(QuicStreamId id, uint64_t type);
-
  private:
   friend class test::QuicSpdySessionPeer;
 
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 1a33fd1..d40ad73 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -417,10 +417,10 @@
                          ::testing::ValuesIn(AllSupportedVersions()));
 
 TEST_P(QuicSpdySessionTestServer, UsesPendingStreams) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
-  EXPECT_FALSE(session_.UsesPendingStreams());
+  EXPECT_TRUE(session_.UsesPendingStreams());
 }
 
 TEST_P(QuicSpdySessionTestServer, PeerAddress) {
@@ -1619,10 +1619,10 @@
                          ::testing::ValuesIn(AllSupportedVersions()));
 
 TEST_P(QuicSpdySessionTestClient, UsesPendingStreams) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
-  EXPECT_FALSE(session_.UsesPendingStreams());
+  EXPECT_TRUE(session_.UsesPendingStreams());
 }
 
 TEST_P(QuicSpdySessionTestClient, AvailableStreamsClient) {
@@ -1718,6 +1718,55 @@
   }
 }
 
+TEST_P(QuicSpdySessionTestClient, Http3ServerPush) {
+  if (!VersionHasStreamType(transport_version())) {
+    return;
+  }
+
+  char type[] = {0x01};
+  std::string data = std::string(type, 1) + "header";
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+  QuicStreamId stream_id1 =
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+  QuicStreamFrame data1(stream_id1, false, 0, QuicStringPiece(data));
+  session_.OnStreamFrame(data1);
+  EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams());
+  QuicStream* stream = session_.GetOrCreateDynamicStream(stream_id1);
+  EXPECT_EQ(1u, stream->flow_controller()->bytes_consumed());
+  EXPECT_EQ(1u, session_.flow_controller()->bytes_consumed());
+
+  char unoptimized_type[] = {0x80, 0x00, 0x00, 0x01};
+  data = std::string(unoptimized_type, 4) + "header";
+  QuicStreamId stream_id2 =
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 1);
+  QuicStreamFrame data2(stream_id2, false, 0, QuicStringPiece(data));
+  session_.OnStreamFrame(data2);
+  EXPECT_EQ(2u, session_.GetNumOpenIncomingStreams());
+  stream = session_.GetOrCreateDynamicStream(stream_id2);
+  EXPECT_EQ(4u, stream->flow_controller()->bytes_consumed());
+  EXPECT_EQ(5u, session_.flow_controller()->bytes_consumed());
+}
+
+TEST_P(QuicSpdySessionTestClient, Http3ServerPushOutofOrderFrame) {
+  if (!VersionHasStreamType(transport_version())) {
+    return;
+  }
+
+  char type[] = {0x01};
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+  QuicStreamFrame data1(
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0),
+      false, 1, QuicStringPiece("header"));
+  session_.OnStreamFrame(data1);
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+
+  QuicStreamFrame data2(
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0),
+      false, 0, QuicStringPiece(type, 1));
+  session_.OnStreamFrame(data2);
+  EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams());
+}
+
 TEST_P(QuicSpdySessionTestServer, ZombieStreams) {
   TestStream* stream2 = session_.CreateOutgoingBidirectionalStream();
   QuicStreamPeer::SetStreamBytesWritten(3, stream2);
@@ -1890,7 +1939,7 @@
 }
 
 TEST_P(QuicSpdySessionTestServer, SimplePendingStreamType) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
   PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
@@ -1909,7 +1958,7 @@
 }
 
 TEST_P(QuicSpdySessionTestServer, SimplePendingStreamTypeOutOfOrderDelivery) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
   PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
@@ -1932,7 +1981,7 @@
 
 TEST_P(QuicSpdySessionTestServer,
        MultipleBytesPendingStreamTypeOutOfOrderDelivery) {
-  if (!VersionHasControlStreams(transport_version())) {
+  if (!VersionHasStreamType(transport_version())) {
     return;
   }
   PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index 0127c5b..d424cc0 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -16,6 +16,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
 #include "net/third_party/quiche/src/quic/core/quic_write_blocked_list.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_arraysize.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
@@ -205,6 +206,23 @@
     SpdyHeaderBlock header_block,
     bool fin,
     QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
+  QuicConnection::ScopedPacketFlusher flusher(
+      spdy_session_->connection(), QuicConnection::SEND_ACK_IF_PENDING);
+  // Send stream type for server push stream
+  if (VersionHasStreamType(session()->connection()->transport_version()) &&
+      type() == WRITE_UNIDIRECTIONAL && send_buffer().stream_offset() == 0) {
+    char data[sizeof(kServerPushStream)];
+    QuicDataWriter writer(QUIC_ARRAYSIZE(data), data);
+    writer.WriteVarInt62(kServerPushStream);
+
+    // Similar to frame headers, stream type byte shouldn't be exposed to upper
+    // layer applications.
+    unacked_frame_headers_offsets_.Add(0, writer.length());
+
+    QUIC_LOG(INFO) << "Stream " << id() << " is writing type as server push";
+    WriteOrBufferData(QuicStringPiece(writer.data(), writer.length()), false,
+                      nullptr);
+  }
   size_t bytes_written =
       WriteHeadersImpl(std::move(header_block), fin, std::move(ack_listener));
   if (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index 3c5a166..dc0d6f5 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -153,6 +153,8 @@
   // Clears |header_list_|.
   void ConsumeHeaderList();
 
+  void SetUnblocked() { sequencer()->SetUnblocked(); }
+
   // This block of functions wraps the sequencer's functions of the same
   // name.  These methods return uncompressed data until that has
   // been fully processed.  Then they simply delegate to the sequencer.