Add QpackStreams into QuicSpdySession.

Currently our QuicStreamIdManager is still in a gQUIC style, which doesn't support creating static streams later in a session. Thus in this CL, all QPACK send streams are created right at QuicSpdySession initialization.

For the same reason, QPACK stream types are sent on the wire once handshake is confirmed. In the future we can delay it until we send data on those streams.

gfe-relnote: v99 only, not used in prod.
PiperOrigin-RevId: 263776677
Change-Id: If710bde79ea2698f68710d7ac36ca6b039556260
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index 4d7d1df..b3ea094 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -15,6 +15,7 @@
 #include "net/third_party/quiche/src/quic/core/http/http_constants.h"
 #include "net/third_party/quiche/src/quic/core/http/quic_spdy_client_stream.h"
 #include "net/third_party/quiche/src/quic/core/qpack/qpack_encoder_test_utils.h"
+#include "net/third_party/quiche/src/quic/core/quic_data_writer.h"
 #include "net/third_party/quiche/src/quic/core/quic_epoll_connection_helper.h"
 #include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
 #include "net/third_party/quiche/src/quic/core/quic_framer.h"
@@ -1670,8 +1671,8 @@
 
 TEST_P(EndToEndTest, SetIndependentMaxIncomingDynamicStreamsLimits) {
   // Each endpoint can set max incoming dynamic streams independently.
-  const uint32_t kClientMaxIncomingDynamicStreams = 2;
-  const uint32_t kServerMaxIncomingDynamicStreams = 1;
+  const uint32_t kClientMaxIncomingDynamicStreams = 4;
+  const uint32_t kServerMaxIncomingDynamicStreams = 3;
   client_config_.SetMaxIncomingBidirectionalStreamsToSend(
       kClientMaxIncomingDynamicStreams);
   server_config_.SetMaxIncomingBidirectionalStreamsToSend(
@@ -1739,6 +1740,7 @@
             server_max_open_outgoing_bidirectional_streams);
   EXPECT_EQ(kClientMaxIncomingDynamicStreams,
             server_max_open_outgoing_unidirectional_streams);
+
   server_thread_->Resume();
 }
 
@@ -2280,7 +2282,10 @@
     // below, the settings frame might not be received.
     HttpEncoder encoder;
     SettingsFrame settings;
-    settings.values[6] = kDefaultMaxUncompressedHeaderSize;
+    settings.values[SETTINGS_MAX_HEADER_LIST_SIZE] =
+        kDefaultMaxUncompressedHeaderSize;
+    settings.values[SETTINGS_QPACK_MAX_TABLE_CAPACITY] =
+        kDefaultQpackMaxDynamicTableCapacity;
     std::unique_ptr<char[]> buffer;
     auto header_length = encoder.SerializeSettingsFrame(settings, &buffer);
     QuicByteCount win_difference1 = QuicFlowControllerPeer::ReceiveWindowSize(
@@ -2294,11 +2299,15 @@
     EXPECT_TRUE(win_difference1 == 0 ||
                 win_difference1 ==
                     header_length +
-                        QuicDataWriter::GetVarInt62Len(kControlStream));
+                        QuicDataWriter::GetVarInt62Len(kControlStream) +
+                        QuicDataWriter::GetVarInt62Len(kQpackEncoderStream) +
+                        QuicDataWriter::GetVarInt62Len(kQpackDecoderStream));
     EXPECT_TRUE(win_difference2 == 0 ||
                 win_difference2 ==
                     header_length +
-                        QuicDataWriter::GetVarInt62Len(kControlStream));
+                        QuicDataWriter::GetVarInt62Len(kControlStream) +
+                        QuicDataWriter::GetVarInt62Len(kQpackEncoderStream) +
+                        QuicDataWriter::GetVarInt62Len(kQpackDecoderStream));
     // The test returns early because in this version, headers stream no longer
     // sends settings.
     return;
diff --git a/quic/core/http/quic_send_control_stream.cc b/quic/core/http/quic_send_control_stream.cc
index e033b61..ab02ae9 100644
--- a/quic/core/http/quic_send_control_stream.cc
+++ b/quic/core/http/quic_send_control_stream.cc
@@ -42,6 +42,8 @@
   SettingsFrame settings;
   settings.values[SETTINGS_MAX_HEADER_LIST_SIZE] =
       max_inbound_header_list_size_;
+  settings.values[SETTINGS_QPACK_MAX_TABLE_CAPACITY] =
+      kDefaultQpackMaxDynamicTableCapacity;
   std::unique_ptr<char[]> buffer;
   QuicByteCount frame_length =
       encoder_.SerializeSettingsFrame(settings, &buffer);
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index 1da419e..7fbccad 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -309,6 +309,10 @@
     : QuicSession(connection, visitor, config, supported_versions),
       send_control_stream_(nullptr),
       receive_control_stream_(nullptr),
+      qpack_encoder_receive_stream_(nullptr),
+      qpack_decoder_receive_stream_(nullptr),
+      qpack_encoder_send_stream_(nullptr),
+      qpack_decoder_send_stream_(nullptr),
       max_inbound_header_list_size_(kDefaultMaxUncompressedHeaderSize),
       max_outbound_header_list_size_(kDefaultMaxUncompressedHeaderSize),
       server_push_enabled_(true),
@@ -364,13 +368,10 @@
                          /*stream_already_counted = */ false);
   } else {
     qpack_encoder_ = QuicMakeUnique<QpackEncoder>(this);
-    qpack_encoder_->set_qpack_stream_sender_delegate(
-        &encoder_stream_sender_delegate_);
     qpack_decoder_ =
         QuicMakeUnique<QpackDecoder>(kDefaultQpackMaxDynamicTableCapacity,
                                      /* maximum_blocked_streams = */ 0, this);
-    qpack_decoder_->set_qpack_stream_sender_delegate(
-        &decoder_stream_sender_delegate_);
+    MaybeInitializeHttp3UnidirectionalStreams();
     // TODO(b/112770235): Set sensible limit on maximum number of blocked
     // streams.
     // TODO(b/112770235): Send SETTINGS_QPACK_MAX_TABLE_CAPACITY with value
@@ -378,10 +379,6 @@
     // with limit on maximum number of blocked streams.
   }
 
-  if (VersionHasStreamType(connection()->transport_version())) {
-    MaybeInitializeHttp3UnidirectionalStreams();
-  }
-
   spdy_framer_visitor_->set_max_header_list_size(max_inbound_header_list_size_);
 
   // Limit HPACK buffering to 2x header list size limit.
@@ -548,6 +545,10 @@
 void QuicSpdySession::SendMaxHeaderListSize(size_t value) {
   if (VersionHasStreamType(connection()->transport_version())) {
     send_control_stream_->SendSettingsFrame();
+    // TODO(renjietang): Remove this once stream id manager can take dynamically
+    // created HTTP/3 unidirectional streams.
+    qpack_encoder_send_stream_->SendStreamType();
+    qpack_decoder_send_stream_->SendStreamType();
     return;
   }
   SpdySettingsIR settings_frame;
@@ -893,6 +894,7 @@
       RegisterStaticStream(std::move(receive_stream),
                            /*stream_already_counted = */ true);
       receive_control_stream_->SetUnblocked();
+      QUIC_DVLOG(1) << "Receive Control stream is created";
       return true;
     }
     case kServerPushStream: {  // Push Stream.
@@ -900,12 +902,26 @@
       stream->SetUnblocked();
       return true;
     }
-    case kQpackEncoderStream:  // QPACK encoder stream.
-      // TODO(bnc): Create QPACK encoder stream.
-      break;
-    case kQpackDecoderStream:  // QPACK decoder stream.
-      // TODO(bnc): Create QPACK decoder stream.
-      break;
+    case kQpackEncoderStream: {  // QPACK encoder stream.
+      auto encoder_receive = QuicMakeUnique<QpackReceiveStream>(
+          pending, qpack_decoder_->encoder_stream_receiver());
+      qpack_encoder_receive_stream_ = encoder_receive.get();
+      RegisterStaticStream(std::move(encoder_receive),
+                           /*stream_already_counted = */ true);
+      qpack_encoder_receive_stream_->SetUnblocked();
+      QUIC_DVLOG(1) << "Receive QPACK Encoder stream is created";
+      return true;
+    }
+    case kQpackDecoderStream: {  // QPACK decoder stream.
+      auto decoder_receive = QuicMakeUnique<QpackReceiveStream>(
+          pending, qpack_encoder_->decoder_stream_receiver());
+      qpack_decoder_receive_stream_ = decoder_receive.get();
+      RegisterStaticStream(std::move(decoder_receive),
+                           /*stream_already_counted = */ true);
+      qpack_decoder_receive_stream_->SetUnblocked();
+      QUIC_DVLOG(1) << "Receive Qpack Decoder stream is created";
+      return true;
+    }
     default:
       SendStopSending(kHttpUnknownStreamType, pending->id());
       pending->StopReading();
@@ -923,6 +939,28 @@
     RegisterStaticStream(std::move(send_control),
                          /*stream_already_counted = */ false);
   }
+
+  if (!qpack_decoder_send_stream_ &&
+      CanOpenNextOutgoingUnidirectionalStream()) {
+    auto decoder_send = QuicMakeUnique<QpackSendStream>(
+        GetNextOutgoingUnidirectionalStreamId(), this, kQpackDecoderStream);
+    qpack_decoder_send_stream_ = decoder_send.get();
+    RegisterStaticStream(std::move(decoder_send),
+                         /*stream_already_counted = */ false);
+    qpack_decoder_->set_qpack_stream_sender_delegate(
+        qpack_decoder_send_stream_);
+  }
+
+  if (!qpack_encoder_send_stream_ &&
+      CanOpenNextOutgoingUnidirectionalStream()) {
+    auto encoder_send = QuicMakeUnique<QpackSendStream>(
+        GetNextOutgoingUnidirectionalStreamId(), this, kQpackEncoderStream);
+    qpack_encoder_send_stream_ = encoder_send.get();
+    RegisterStaticStream(std::move(encoder_send),
+                         /*stream_already_counted = */ false);
+    qpack_encoder_->set_qpack_stream_sender_delegate(
+        qpack_encoder_send_stream_);
+  }
 }
 
 void QuicSpdySession::OnCanCreateNewOutgoingStream(bool unidirectional) {
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index ce8fff0..bab74d3 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -18,6 +18,8 @@
 #include "net/third_party/quiche/src/quic/core/qpack/qpack_decoder_stream_sender.h"
 #include "net/third_party/quiche/src/quic/core/qpack/qpack_encoder.h"
 #include "net/third_party/quiche/src/quic/core/qpack/qpack_encoder_stream_sender.h"
+#include "net/third_party/quiche/src/quic/core/qpack/qpack_receive_stream.h"
+#include "net/third_party/quiche/src/quic/core/qpack/qpack_send_stream.h"
 #include "net/third_party/quiche/src/quic/core/qpack/qpack_utils.h"
 #include "net/third_party/quiche/src/quic/core/quic_session.h"
 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
@@ -284,11 +286,17 @@
   // Pointer to the header stream in stream_map_.
   QuicHeadersStream* headers_stream_;
 
-  // HTTP/3 control streams. They are owned by QuicSession inside dynamic
+  // HTTP/3 control streams. They are owned by QuicSession inside
   // stream map, and can be accessed by those unowned pointers below.
   QuicSendControlStream* send_control_stream_;
   QuicReceiveControlStream* receive_control_stream_;
 
+  // Pointers to HTTP/3 QPACK streams in stream map.
+  QpackReceiveStream* qpack_encoder_receive_stream_;
+  QpackReceiveStream* qpack_decoder_receive_stream_;
+  QpackSendStream* qpack_encoder_send_stream_;
+  QpackSendStream* qpack_decoder_send_stream_;
+
   // The maximum size of a header block that will be accepted from the peer,
   // defined per spec as key + value + overhead per field (uncompressed).
   size_t max_inbound_header_list_size_;
@@ -313,10 +321,6 @@
   spdy::SpdyFramer spdy_framer_;
   http2::Http2DecoderAdapter h2_deframer_;
   std::unique_ptr<SpdyFramerVisitor> spdy_framer_visitor_;
-
-  // TODO(renjietang): Replace these two members with actual QPACK send streams.
-  NoopQpackStreamSenderDelegate encoder_stream_sender_delegate_;
-  NoopQpackStreamSenderDelegate decoder_stream_sender_delegate_;
   QuicStreamId max_allowed_push_id_;
 };
 
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index e58a651..9ee89bf 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -17,6 +17,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_packets.h"
 #include "net/third_party/quiche/src/quic/core/quic_stream.h"
 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
+#include "net/third_party/quiche/src/quic/core/quic_versions.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_arraysize.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_expect_bug.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
@@ -693,13 +694,16 @@
         .WillRepeatedly(Invoke(
             this, &QuicSpdySessionTestServer::ClearMaxStreamsControlFrame));
   }
+
   // Encryption needs to be established before data can be sent.
   CryptoHandshakeMessage msg;
   MockPacketWriter* writer = static_cast<MockPacketWriter*>(
       QuicConnectionPeer::GetWriter(session_.connection()));
   EXPECT_CALL(*writer, WritePacket(_, _, _, _, _))
-      .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0)));
+      .Times(testing::AnyNumber())
+      .WillRepeatedly(Return(WriteResult(WRITE_STATUS_OK, 0)));
   session_.GetMutableCryptoStream()->OnHandshakeMessage(msg);
+  testing::Mock::VerifyAndClearExpectations(writer);
 
   // Drive congestion control manually.
   MockSendAlgorithm* send_algorithm = new StrictMock<MockSendAlgorithm>;
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc
index ef9d6ff..f247df4 100644
--- a/quic/core/http/quic_spdy_stream_test.cc
+++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -1905,8 +1905,23 @@
     return;
   }
 
+  if (GetParam().handshake_protocol == PROTOCOL_TLS1_3) {
+    // TODO(nharper, b/112643533): Figure out why this test fails when TLS is
+    // enabled and fix it.
+    return;
+  }
+
+  testing::InSequence s;
   Initialize(kShouldProcessData);
 
+  auto decoder_send_stream =
+      QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
+
+  // The stream byte will be written in the first byte.
+  EXPECT_CALL(*session_, WritevData(decoder_send_stream,
+                                    decoder_send_stream->id(), 1, 0, _));
+  EXPECT_CALL(*session_, WritevData(decoder_send_stream,
+                                    decoder_send_stream->id(), _, _, _));
   // Deliver dynamic table entry to decoder.
   session_->qpack_decoder()->OnInsertWithoutNameReference("foo", "bar");
 
@@ -1927,6 +1942,8 @@
                                          headers.length(), data));
   EXPECT_EQ(kDataFramePayload, stream_->data());
 
+  EXPECT_CALL(*session_, WritevData(decoder_send_stream,
+                                    decoder_send_stream->id(), _, _, _));
   // Deliver second dynamic table entry to decoder.
   session_->qpack_decoder()->OnInsertWithoutNameReference("trailing", "foobar");
 
@@ -1950,6 +1967,13 @@
     return;
   }
 
+  if (GetParam().handshake_protocol == PROTOCOL_TLS1_3) {
+    // TODO(nharper, b/112643533): Figure out why this test fails when TLS is
+    // enabled and fix it.
+    return;
+  }
+
+  testing::InSequence s;
   Initialize(kShouldProcessData);
 
   // HEADERS frame referencing first dynamic table entry.
@@ -1959,6 +1983,14 @@
   // Decoding is blocked because dynamic table entry has not been received yet.
   EXPECT_FALSE(stream_->headers_decompressed());
 
+  auto decoder_send_stream =
+      QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
+
+  // The stream byte will be written in the first byte.
+  EXPECT_CALL(*session_, WritevData(decoder_send_stream,
+                                    decoder_send_stream->id(), 1, 0, _));
+  EXPECT_CALL(*session_, WritevData(decoder_send_stream,
+                                    decoder_send_stream->id(), _, _, _));
   // Deliver dynamic table entry to decoder.
   session_->qpack_decoder()->OnInsertWithoutNameReference("foo", "bar");
   EXPECT_TRUE(stream_->headers_decompressed());
@@ -1982,6 +2014,8 @@
   // Decoding is blocked because dynamic table entry has not been received yet.
   EXPECT_FALSE(stream_->trailers_decompressed());
 
+  EXPECT_CALL(*session_, WritevData(decoder_send_stream,
+                                    decoder_send_stream->id(), _, _, _));
   // Deliver second dynamic table entry to decoder.
   session_->qpack_decoder()->OnInsertWithoutNameReference("trailing", "foobar");
   EXPECT_TRUE(stream_->trailers_decompressed());
@@ -2028,6 +2062,13 @@
     return;
   }
 
+  if (GetParam().handshake_protocol == PROTOCOL_TLS1_3) {
+    // TODO(nharper, b/112643533): Figure out why this test fails when TLS is
+    // enabled and fix it.
+    return;
+  }
+
+  testing::InSequence s;
   Initialize(kShouldProcessData);
 
   // HEADERS frame referencing first dynamic table entry.
@@ -2037,6 +2078,14 @@
   // Decoding is blocked because dynamic table entry has not been received yet.
   EXPECT_FALSE(stream_->headers_decompressed());
 
+  auto decoder_send_stream =
+      QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
+
+  // The stream byte will be written in the first byte.
+  EXPECT_CALL(*session_, WritevData(decoder_send_stream,
+                                    decoder_send_stream->id(), 1, 0, _));
+  EXPECT_CALL(*session_, WritevData(decoder_send_stream,
+                                    decoder_send_stream->id(), _, _, _));
   // Deliver dynamic table entry to decoder.
   session_->qpack_decoder()->OnInsertWithoutNameReference("foo", "bar");
   EXPECT_TRUE(stream_->headers_decompressed());