Implement QuicTransportStream, a stream that can be only accessed after client indication is received.

gfe-relnote: n/a (not used in production)
PiperOrigin-RevId: 275553939
Change-Id: I19a2bf2f8335e6408204c7b5eac6dcfe2dc0a210
diff --git a/quic/quic_transport/quic_transport_client_session.cc b/quic/quic_transport/quic_transport_client_session.cc
index 032f61d..105472f 100644
--- a/quic/quic_transport/quic_transport_client_session.cc
+++ b/quic/quic_transport/quic_transport_client_session.cc
@@ -19,6 +19,7 @@
 #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_text_utils.h"
+#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
 
 namespace quic {
 
@@ -60,6 +61,13 @@
       crypto_config, proof_handler);
 }
 
+QuicStream* QuicTransportClientSession::CreateIncomingStream(QuicStreamId id) {
+  auto stream = std::make_unique<QuicTransportStream>(id, this, this);
+  QuicTransportStream* stream_ptr = stream.get();
+  ActivateStream(std::move(stream));
+  return stream_ptr;
+}
+
 void QuicTransportClientSession::OnCryptoHandshakeEvent(
     CryptoHandshakeEvent event) {
   QuicSession::OnCryptoHandshakeEvent(event);
diff --git a/quic/quic_transport/quic_transport_client_session.h b/quic/quic_transport/quic_transport_client_session.h
index 32149df..8c90aae 100644
--- a/quic/quic_transport/quic_transport_client_session.h
+++ b/quic/quic_transport/quic_transport_client_session.h
@@ -55,6 +55,13 @@
 
   bool IsSessionReady() const override { return ready_; }
 
+  QuicStream* CreateIncomingStream(QuicStreamId id) override;
+  QuicStream* CreateIncomingStream(PendingStream* /*pending*/) override {
+    QUIC_BUG << "QuicTransportClientSession::CreateIncomingStream("
+                "PendingStream) not implemented";
+    return nullptr;
+  }
+
   void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override;
 
  protected:
diff --git a/quic/quic_transport/quic_transport_server_session.cc b/quic/quic_transport/quic_transport_server_session.cc
index 92cffcd..fe4aec1 100644
--- a/quic/quic_transport/quic_transport_server_session.cc
+++ b/quic/quic_transport/quic_transport_server_session.cc
@@ -12,8 +12,8 @@
 #include "net/third_party/quiche/src/quic/core/quic_types.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_str_cat.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
-#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_client_session.h"
 #include "net/third_party/quiche/src/quic/quic_transport/quic_transport_protocol.h"
+#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
 
 namespace quic {
 
@@ -63,9 +63,10 @@
     return indication_ptr;
   }
 
-  // TODO(vasilvv): implement incoming data streams.
-  QUIC_BUG << "Not implemented";
-  return nullptr;
+  auto stream = std::make_unique<QuicTransportStream>(id, this, this);
+  QuicTransportStream* stream_ptr = stream.get();
+  ActivateStream(std::move(stream));
+  return stream_ptr;
 }
 
 QuicTransportServerSession::ClientIndication::ClientIndication(
diff --git a/quic/quic_transport/quic_transport_stream.cc b/quic/quic_transport/quic_transport_stream.cc
new file mode 100644
index 0000000..9546119
--- /dev/null
+++ b/quic/quic_transport/quic_transport_stream.cc
@@ -0,0 +1,88 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
+
+#include <sys/types.h>
+
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/core/quic_utils.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
+
+namespace quic {
+
+QuicTransportStream::QuicTransportStream(
+    QuicStreamId id,
+    QuicSession* session,
+    QuicTransportSessionInterface* session_interface)
+    : QuicStream(id,
+                 session,
+                 /*is_static=*/false,
+                 QuicUtils::GetStreamType(id,
+                                          session->connection()->perspective(),
+                                          session->IsIncomingStream(id))),
+      session_interface_(session_interface) {}
+
+size_t QuicTransportStream::Read(char* buffer, size_t buffer_size) {
+  if (!session_interface_->IsSessionReady()) {
+    return 0;
+  }
+
+  iovec iov;
+  iov.iov_base = buffer;
+  iov.iov_len = buffer_size;
+  return sequencer()->Readv(&iov, 1);
+}
+
+bool QuicTransportStream::Write(QuicStringPiece data) {
+  if (!CanWrite()) {
+    return false;
+  }
+
+  WriteOrBufferData(data, /*fin=*/false, nullptr);
+  return true;
+}
+
+bool QuicTransportStream::SendFin() {
+  if (!CanWrite()) {
+    return false;
+  }
+
+  WriteOrBufferData(QuicStringPiece(), /*fin=*/true, nullptr);
+  return true;
+}
+
+bool QuicTransportStream::CanWrite() const {
+  return session_interface_->IsSessionReady() && CanWriteNewData();
+}
+
+size_t QuicTransportStream::ReadableBytes() const {
+  if (!session_interface_->IsSessionReady()) {
+    return 0;
+  }
+
+  return sequencer()->ReadableBytes();
+}
+
+void QuicTransportStream::OnDataAvailable() {
+  if (ReadableBytes() == 0) {
+    return;
+  }
+  if (visitor_ != nullptr) {
+    visitor_->OnCanRead();
+  }
+}
+
+void QuicTransportStream::OnCanWriteNewData() {
+  // Ensure the origin check has been completed, as the stream can be notified
+  // about being writable before that.
+  if (!CanWrite()) {
+    return;
+  }
+  if (visitor_ != nullptr) {
+    visitor_->OnCanWrite();
+  }
+}
+
+}  // namespace quic
diff --git a/quic/quic_transport/quic_transport_stream.h b/quic/quic_transport/quic_transport_stream.h
new file mode 100644
index 0000000..11ea2dc
--- /dev/null
+++ b/quic/quic_transport/quic_transport_stream.h
@@ -0,0 +1,61 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_QUIC_TRANSPORT_QUIC_TRANSPORT_STREAM_H_
+#define QUICHE_QUIC_QUIC_TRANSPORT_QUIC_TRANSPORT_STREAM_H_
+
+#include <cstddef>
+
+#include "net/third_party/quiche/src/quic/core/quic_session.h"
+#include "net/third_party/quiche/src/quic/core/quic_stream.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_macros.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
+#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_session_interface.h"
+
+namespace quic {
+
+// QuicTransportStream is an extension of QuicStream that provides I/O interface
+// that is safe to use in the QuicTransport context.  The interface ensures no
+// application data is processed before the client indication is processed.
+class QuicTransportStream : public QuicStream {
+ public:
+  class Visitor {
+   public:
+    virtual ~Visitor() {}
+    virtual void OnCanRead() = 0;
+    virtual void OnCanWrite() = 0;
+  };
+
+  QuicTransportStream(QuicStreamId id,
+                      QuicSession* session,
+                      QuicTransportSessionInterface* session_interface);
+
+  // Reads at most |buffer_size| bytes into |buffer| and returns the number of
+  // bytes actually read.
+  size_t Read(char* buffer, size_t buffer_size);
+  // Writes |data| into the stream.  Returns true on success.
+  QUIC_MUST_USE_RESULT bool Write(QuicStringPiece data);
+  // Sends the FIN on the stream.  Returns true on success.
+  QUIC_MUST_USE_RESULT bool SendFin();
+
+  // Indicates whether it is possible to write into stream right now.
+  bool CanWrite() const;
+  // Indicates the number of bytes that can be read from the stream.
+  size_t ReadableBytes() const;
+
+  // QuicSession method implementations.
+  void OnDataAvailable() override;
+  void OnCanWriteNewData() override;
+
+  void set_visitor(Visitor* visitor) { visitor_ = visitor; }
+
+ protected:
+  QuicTransportSessionInterface* session_interface_;
+  Visitor* visitor_ = nullptr;
+};
+
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_QUIC_TRANSPORT_QUIC_TRANSPORT_STREAM_H_
diff --git a/quic/quic_transport/quic_transport_stream_test.cc b/quic/quic_transport/quic_transport_stream_test.cc
new file mode 100644
index 0000000..8c5ca5b
--- /dev/null
+++ b/quic/quic_transport/quic_transport_stream_test.cc
@@ -0,0 +1,104 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_stream.h"
+#include <memory>
+
+#include "net/third_party/quiche/src/quic/core/frames/quic_window_update_frame.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_expect_bug.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+#include "net/third_party/quiche/src/quic/quic_transport/quic_transport_session_interface.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_config_peer.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h"
+
+namespace quic {
+namespace test {
+namespace {
+
+using testing::Return;
+
+ParsedQuicVersionVector GetVersions() {
+  return {ParsedQuicVersion{PROTOCOL_TLS1_3, QUIC_VERSION_99}};
+}
+
+class MockQuicTransportSessionInterface : public QuicTransportSessionInterface {
+ public:
+  MOCK_CONST_METHOD0(IsSessionReady, bool());
+};
+
+class MockVisitor : public QuicTransportStream::Visitor {
+ public:
+  MOCK_METHOD0(OnCanRead, void());
+  MOCK_METHOD0(OnCanWrite, void());
+};
+
+class QuicTransportStreamTest : public QuicTest {
+ public:
+  QuicTransportStreamTest()
+      : connection_(new MockQuicConnection(&helper_,
+                                           &alarm_factory_,
+                                           Perspective::IS_CLIENT,
+                                           GetVersions())),
+        session_(connection_) {
+    session_.Initialize();
+
+    stream_ = new QuicTransportStream(0, &session_, &interface_);
+    session_.ActivateStream(QuicWrapUnique(stream_));
+    stream_->set_visitor(&visitor_);
+  }
+
+  void ReceiveStreamData(QuicStringPiece data, QuicStreamOffset offset) {
+    QuicStreamFrame frame(0, false, offset, data);
+    stream_->OnStreamFrame(frame);
+  }
+
+ protected:
+  MockAlarmFactory alarm_factory_;
+  MockQuicConnectionHelper helper_;
+
+  MockQuicConnection* connection_;  // Owned by |session_|.
+  MockQuicSession session_;
+  MockQuicTransportSessionInterface interface_;
+  MockVisitor visitor_;
+  QuicTransportStream* stream_;  // Owned by |session_|.
+};
+
+TEST_F(QuicTransportStreamTest, NotReady) {
+  EXPECT_CALL(interface_, IsSessionReady()).WillRepeatedly(Return(false));
+  ReceiveStreamData("test", 0);
+  EXPECT_EQ(stream_->ReadableBytes(), 0u);
+  EXPECT_FALSE(stream_->CanWrite());
+}
+
+TEST_F(QuicTransportStreamTest, ReadWhenNotReady) {
+  EXPECT_CALL(interface_, IsSessionReady()).WillRepeatedly(Return(false));
+  ReceiveStreamData("test", 0);
+  char buffer[4];
+  QuicByteCount bytes_read = stream_->Read(buffer, sizeof(buffer));
+  EXPECT_EQ(bytes_read, 0u);
+}
+
+TEST_F(QuicTransportStreamTest, WriteWhenNotReady) {
+  EXPECT_CALL(interface_, IsSessionReady()).WillRepeatedly(Return(false));
+  EXPECT_FALSE(stream_->Write("test"));
+}
+
+TEST_F(QuicTransportStreamTest, Ready) {
+  EXPECT_CALL(interface_, IsSessionReady()).WillRepeatedly(Return(true));
+  ReceiveStreamData("test", 0);
+  EXPECT_EQ(stream_->ReadableBytes(), 4u);
+  EXPECT_TRUE(stream_->CanWrite());
+  EXPECT_TRUE(stream_->Write("test"));
+}
+
+TEST_F(QuicTransportStreamTest, ReceiveData) {
+  EXPECT_CALL(interface_, IsSessionReady()).WillRepeatedly(Return(true));
+  EXPECT_CALL(visitor_, OnCanRead());
+  ReceiveStreamData("test", 0);
+}
+
+}  // namespace
+}  // namespace test
+}  // namespace quic