Create an interface to queue up incoming streams in the client.

QuicTransport Web API requires the JavaScript application to actively pull new streams, whereas our QUIC code in general pushes the new streams onto the user.  The code added in this CL adapts one model to another.

gfe-relnote: n/a (not used in production)
PiperOrigin-RevId: 277973171
Change-Id: Ic03802de03ff2035694e6750cd6f6ec1bfa97abd
diff --git a/quic/quic_transport/quic_transport_client_session.cc b/quic/quic_transport/quic_transport_client_session.cc
index 105472f..f738371 100644
--- a/quic/quic_transport/quic_transport_client_session.cc
+++ b/quic/quic_transport/quic_transport_client_session.cc
@@ -43,13 +43,15 @@
     const ParsedQuicVersionVector& supported_versions,
     const QuicServerId& server_id,
     QuicCryptoClientConfig* crypto_config,
-    url::Origin origin)
+    url::Origin origin,
+    ClientVisitor* visitor)
     : QuicSession(connection,
                   owner,
                   config,
                   supported_versions,
                   /*num_expected_unidirectional_static_streams*/ 0),
-      origin_(origin) {
+      origin_(origin),
+      visitor_(visitor) {
   for (const ParsedQuicVersion& version : supported_versions) {
     QUIC_BUG_IF(version.handshake_protocol != PROTOCOL_TLS1_3)
         << "QuicTransport requires TLS 1.3 handshake";
@@ -62,9 +64,17 @@
 }
 
 QuicStream* QuicTransportClientSession::CreateIncomingStream(QuicStreamId id) {
+  QUIC_DVLOG(1) << "Creating incoming QuicTransport stream " << id;
   auto stream = std::make_unique<QuicTransportStream>(id, this, this);
   QuicTransportStream* stream_ptr = stream.get();
   ActivateStream(std::move(stream));
+  if (stream_ptr->type() == BIDIRECTIONAL) {
+    incoming_bidirectional_streams_.push_back(stream_ptr);
+    visitor_->OnIncomingBidirectionalStreamAvailable();
+  } else {
+    incoming_unidirectional_streams_.push_back(stream_ptr);
+    visitor_->OnIncomingUnidirectionalStreamAvailable();
+  }
   return stream_ptr;
 }
 
@@ -78,6 +88,26 @@
   SendClientIndication();
 }
 
+QuicTransportStream*
+QuicTransportClientSession::AcceptIncomingBidirectionalStream() {
+  if (incoming_bidirectional_streams_.empty()) {
+    return nullptr;
+  }
+  QuicTransportStream* stream = incoming_bidirectional_streams_.front();
+  incoming_bidirectional_streams_.pop_front();
+  return stream;
+}
+
+QuicTransportStream*
+QuicTransportClientSession::AcceptIncomingUnidirectionalStream() {
+  if (incoming_unidirectional_streams_.empty()) {
+    return nullptr;
+  }
+  QuicTransportStream* stream = incoming_unidirectional_streams_.front();
+  incoming_unidirectional_streams_.pop_front();
+  return stream;
+}
+
 std::string QuicTransportClientSession::SerializeClientIndication() {
   std::string serialized_origin = origin_.Serialize();
   if (serialized_origin.size() > std::numeric_limits<uint16_t>::max()) {
diff --git a/quic/quic_transport/quic_transport_client_session.h b/quic/quic_transport/quic_transport_client_session.h
index 2e575e1..12e5fdf 100644
--- a/quic/quic_transport/quic_transport_client_session.h
+++ b/quic/quic_transport/quic_transport_client_session.h
@@ -19,9 +19,11 @@
 #include "net/third_party/quiche/src/quic/core/quic_stream.h"
 #include "net/third_party/quiche/src/quic/core/quic_versions.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_protocol.h"
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_session_interface.h"
+#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
 
 namespace quic {
 
@@ -30,13 +32,25 @@
     : public QuicSession,
       public QuicTransportSessionInterface {
  public:
+  class QUIC_EXPORT_PRIVATE ClientVisitor {
+   public:
+    virtual ~ClientVisitor() {}
+
+    // Notifies the visitor when a new stream has been received.  The stream in
+    // question can be retrieved using AcceptIncomingBidirectionalStream() or
+    // AcceptIncomingUnidirectionalStream().
+    virtual void OnIncomingBidirectionalStreamAvailable() = 0;
+    virtual void OnIncomingUnidirectionalStreamAvailable() = 0;
+  };
+
   QuicTransportClientSession(QuicConnection* connection,
                              Visitor* owner,
                              const QuicConfig& config,
                              const ParsedQuicVersionVector& supported_versions,
                              const QuicServerId& server_id,
                              QuicCryptoClientConfig* crypto_config,
-                             url::Origin origin);
+                             url::Origin origin,
+                             ClientVisitor* visitor);
 
   std::vector<std::string> GetAlpnsToOffer() const override {
     return std::vector<std::string>({QuicTransportAlpn()});
@@ -68,6 +82,12 @@
 
   void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override;
 
+  // Return the earliest incoming stream that has been received by the session
+  // but has not been accepted.  Returns nullptr if there are no incoming
+  // streams.
+  QuicTransportStream* AcceptIncomingBidirectionalStream();
+  QuicTransportStream* AcceptIncomingUnidirectionalStream();
+
  protected:
   class QUIC_EXPORT_PRIVATE ClientIndication : public QuicStream {
    public:
@@ -88,8 +108,20 @@
 
   std::unique_ptr<QuicCryptoClientStream> crypto_stream_;
   url::Origin origin_;
+  ClientVisitor* visitor_;  // not owned
   bool client_indication_sent_ = false;
   bool ready_ = false;
+
+  // Contains all of the streams that has been received by the session but have
+  // not been processed by the application.
+  // TODO(vasilvv): currently, we always send MAX_STREAMS as long as the overall
+  // maximum number of streams for the connection has not been exceeded. We
+  // should also limit the maximum number of streams that the consuming code
+  // has not accepted to a smaller number, by checking the size of
+  // |incoming_bidirectional_streams_| and |incoming_unidirectional_streams_|
+  // before sending MAX_STREAMS.
+  QuicDeque<QuicTransportStream*> incoming_bidirectional_streams_;
+  QuicDeque<QuicTransportStream*> incoming_unidirectional_streams_;
 };
 
 }  // namespace quic
diff --git a/quic/quic_transport/quic_transport_client_session_test.cc b/quic/quic_transport/quic_transport_client_session_test.cc
index 24e1071..1d9f05e 100644
--- a/quic/quic_transport/quic_transport_client_session_test.cc
+++ b/quic/quic_transport/quic_transport_client_session_test.cc
@@ -20,6 +20,7 @@
 #include "net/third_party/quiche/src/quic/test_tools/quic_session_peer.h"
 #include "net/third_party/quiche/src/quic/test_tools/quic_stream_peer.h"
 #include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_transport_test_tools.h"
 
 namespace quic {
 namespace test {
@@ -50,32 +51,6 @@
   return result;
 }
 
-class TestClientSession : public QuicTransportClientSession {
- public:
-  using QuicTransportClientSession::QuicTransportClientSession;
-
-  class Stream : public QuicStream {
-   public:
-    using QuicStream::QuicStream;
-    void OnDataAvailable() override {}
-  };
-
-  QuicStream* CreateIncomingStream(QuicStreamId id) override {
-    auto stream = std::make_unique<Stream>(
-        id, this, /*is_static=*/false,
-        QuicUtils::GetStreamType(id, connection()->perspective(),
-                                 /*peer_initiated=*/true));
-    QuicStream* result = stream.get();
-    ActivateStream(std::move(stream));
-    return result;
-  }
-
-  QuicStream* CreateIncomingStream(PendingStream* /*pending*/) override {
-    QUIC_NOTREACHED();
-    return nullptr;
-  }
-};
-
 class QuicTransportClientSessionTest : public QuicTest {
  protected:
   QuicTransportClientSessionTest()
@@ -90,9 +65,9 @@
   }
 
   void CreateSession(url::Origin origin) {
-    session_ = std::make_unique<TestClientSession>(
+    session_ = std::make_unique<QuicTransportClientSession>(
         &connection_, nullptr, DefaultQuicConfig(), GetVersions(), server_id_,
-        &crypto_config_, origin);
+        &crypto_config_, origin, &visitor_);
     session_->Initialize();
     crypto_stream_ = static_cast<QuicCryptoClientStream*>(
         session_->GetMutableCryptoStream());
@@ -112,7 +87,8 @@
   PacketSavingConnection connection_;
   QuicServerId server_id_;
   QuicCryptoClientConfig crypto_config_;
-  std::unique_ptr<TestClientSession> session_;
+  MockClientVisitor visitor_;
+  std::unique_ptr<QuicTransportClientSession> session_;
   QuicCryptoClientStream* crypto_stream_;
 };
 
@@ -144,6 +120,23 @@
   EXPECT_QUIC_BUG(Connect(), "Client origin too long");
 }
 
+TEST_F(QuicTransportClientSessionTest, ReceiveNewStreams) {
+  Connect();
+  ASSERT_TRUE(session_->IsSessionReady());
+  ASSERT_TRUE(session_->AcceptIncomingUnidirectionalStream() == nullptr);
+
+  const QuicStreamId id = GetNthServerInitiatedUnidirectionalStreamId(
+      session_->transport_version(), 0);
+  QuicStreamFrame frame(id, /*fin=*/false, /*offset=*/0, "test");
+  EXPECT_CALL(visitor_, OnIncomingUnidirectionalStreamAvailable()).Times(1);
+  session_->OnStreamFrame(frame);
+
+  QuicTransportStream* stream = session_->AcceptIncomingUnidirectionalStream();
+  ASSERT_TRUE(stream != nullptr);
+  EXPECT_EQ(stream->ReadableBytes(), 4u);
+  EXPECT_EQ(stream->id(), id);
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/quic_transport/quic_transport_integration_test.cc b/quic/quic_transport/quic_transport_integration_test.cc
index f4c6a2a..9936830 100644
--- a/quic/quic_transport/quic_transport_integration_test.cc
+++ b/quic/quic_transport/quic_transport_integration_test.cc
@@ -20,6 +20,7 @@
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_server_session.h"
 #include "net/third_party/quiche/src/quic/test_tools/crypto_test_utils.h"
 #include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_transport_test_tools.h"
 #include "net/third_party/quiche/src/quic/test_tools/simulator/link.h"
 #include "net/third_party/quiche/src/quic/test_tools/simulator/quic_endpoint_base.h"
 #include "net/third_party/quiche/src/quic/test_tools/simulator/simulator.h"
@@ -78,7 +79,8 @@
                  GetVersions(),
                  QuicServerId("test.example.com", 443),
                  &crypto_config_,
-                 origin) {
+                 origin,
+                 &visitor_) {
     session_.Initialize();
   }
 
@@ -86,14 +88,10 @@
 
  private:
   QuicCryptoClientConfig crypto_config_;
+  MockClientVisitor visitor_;
   QuicTransportClientSession session_;
 };
 
-class MockServerVisitor : public QuicTransportServerSession::ServerVisitor {
- public:
-  MOCK_METHOD1(CheckOrigin, bool(url::Origin));
-};
-
 class QuicTransportServerEndpoint : public QuicTransportEndpointBase {
  public:
   QuicTransportServerEndpoint(Simulator* simulator,
diff --git a/quic/quic_transport/quic_transport_server_session_test.cc b/quic/quic_transport/quic_transport_server_session_test.cc
index 818c08d..42c3eef 100644
--- a/quic/quic_transport/quic_transport_server_session_test.cc
+++ b/quic/quic_transport/quic_transport_server_session_test.cc
@@ -21,6 +21,7 @@
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_protocol.h"
 #include "net/third_party/quiche/src/quic/test_tools/crypto_test_utils.h"
 #include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_transport_test_tools.h"
 
 namespace quic {
 namespace test {
@@ -48,11 +49,6 @@
   return {ParsedQuicVersion{PROTOCOL_TLS1_3, QUIC_VERSION_99}};
 }
 
-class MockVisitor : public QuicTransportServerSession::ServerVisitor {
- public:
-  MOCK_METHOD1(CheckOrigin, bool(url::Origin));
-};
-
 class QuicTransportServerSessionTest : public QuicTest {
  public:
   QuicTransportServerSessionTest()
@@ -111,7 +107,7 @@
   QuicCryptoServerConfig crypto_config_;
   std::unique_ptr<QuicTransportServerSession> session_;
   QuicCompressedCertsCache compressed_certs_cache_;
-  testing::StrictMock<MockVisitor> visitor_;
+  testing::StrictMock<MockServerVisitor> visitor_;
   QuicCryptoServerStream* crypto_stream_;
 };
 
diff --git a/quic/test_tools/quic_transport_test_tools.h b/quic/test_tools/quic_transport_test_tools.h
new file mode 100644
index 0000000..bf2dc0e
--- /dev/null
+++ b/quic/test_tools/quic_transport_test_tools.h
@@ -0,0 +1,29 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_TEST_TOOLS_QUIC_TRANSPORT_TEST_TOOLS_H_
+#define QUICHE_QUIC_TEST_TOOLS_QUIC_TRANSPORT_TEST_TOOLS_H_
+
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_client_session.h"
+#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_server_session.h"
+
+namespace quic {
+namespace test {
+
+class MockClientVisitor : public QuicTransportClientSession::ClientVisitor {
+ public:
+  MOCK_METHOD0(OnIncomingBidirectionalStreamAvailable, void());
+  MOCK_METHOD0(OnIncomingUnidirectionalStreamAvailable, void());
+};
+
+class MockServerVisitor : public QuicTransportServerSession::ServerVisitor {
+ public:
+  MOCK_METHOD1(CheckOrigin, bool(url::Origin));
+};
+
+}  // namespace test
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_TEST_TOOLS_QUIC_TRANSPORT_TEST_TOOLS_H_