Automated g4 rollback of changelist 330572491.
*** Reason for rollback ***
The qbone client should continue sending the use messages flag until arnolds have rolled out with the change to only ever use messages.
*** Original change description ***
Remove support for ephemeral streams in QBONE. These have been unused for
months (since switching to message frames).
***
PiperOrigin-RevId: 330801096
Change-Id: I0ca42ebdf3734056c296ba4acc1720810f3e945a
diff --git a/quic/qbone/qbone_client.cc b/quic/qbone/qbone_client.cc
index c3d8f73..a2ba117 100644
--- a/quic/qbone/qbone_client.cc
+++ b/quic/qbone/qbone_client.cc
@@ -10,6 +10,7 @@
#include "net/third_party/quiche/src/quic/core/quic_epoll_connection_helper.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_epoll.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_exported_stats.h"
+#include "net/third_party/quiche/src/quic/qbone/qbone_stream.h"
#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
namespace quic {
diff --git a/quic/qbone/qbone_client_test.cc b/quic/qbone/qbone_client_test.cc
index a9b9994..cfe6c46 100644
--- a/quic/qbone/qbone_client_test.cc
+++ b/quic/qbone/qbone_client_test.cc
@@ -7,6 +7,7 @@
#include "net/third_party/quiche/src/quic/qbone/qbone_client.h"
#include "net/third_party/quiche/src/quic/core/quic_alarm_factory.h"
+#include "net/third_party/quiche/src/quic/core/quic_default_packet_writer.h"
#include "net/third_party/quiche/src/quic/core/quic_dispatcher.h"
#include "net/third_party/quiche/src/quic/core/quic_epoll_alarm_factory.h"
#include "net/third_party/quiche/src/quic/core/quic_epoll_connection_helper.h"
@@ -58,7 +59,7 @@
public:
void WritePacketToNetwork(const char* packet, size_t size) override {
QuicWriterMutexLock lock(&mu_);
- data_.emplace_back(std::string(packet, size));
+ data_.push_back(std::string(packet, size));
}
std::vector<std::string> data() {
diff --git a/quic/qbone/qbone_constants.h b/quic/qbone/qbone_constants.h
index 9925952..eeb6171 100644
--- a/quic/qbone/qbone_constants.h
+++ b/quic/qbone/qbone_constants.h
@@ -16,7 +16,7 @@
// QBONE's ALPN
static constexpr char kQboneAlpn[] = "qbone";
// The maximum number of bytes allowed in a QBONE packet.
- static const QuicByteCount kMaxQbonePacketBytes = 1280;
+ static const QuicByteCount kMaxQbonePacketBytes = 2000;
// The table id for QBONE's routing table. 'bone' in ascii.
static const uint32_t kQboneRouteTableId = 0x626F6E65;
// The stream ID of the control channel.
diff --git a/quic/qbone/qbone_session_base.cc b/quic/qbone/qbone_session_base.cc
index 1f8a512..e534398 100644
--- a/quic/qbone/qbone_session_base.cc
+++ b/quic/qbone/qbone_session_base.cc
@@ -10,6 +10,7 @@
#include <utility>
#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
+#include "net/third_party/quiche/src/quic/core/quic_data_reader.h"
#include "net/third_party/quiche/src/quic/core/quic_types.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_exported_stats.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
@@ -17,8 +18,17 @@
#include "net/third_party/quiche/src/quic/qbone/qbone_constants.h"
#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
+ABSL_FLAG(
+ bool,
+ qbone_close_ephemeral_frames,
+ true,
+ "If true, we'll call CloseStream even when we receive ephemeral frames.");
+
namespace quic {
+#define ENDPOINT \
+ (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
+
QboneSessionBase::QboneSessionBase(
QuicConnection* connection,
Visitor* owner,
@@ -60,14 +70,34 @@
return crypto_stream_.get();
}
+QuicStream* QboneSessionBase::CreateOutgoingStream() {
+ return ActivateDataStream(
+ CreateDataStream(GetNextOutgoingUnidirectionalStreamId()));
+}
+
+void QboneSessionBase::OnStreamFrame(const QuicStreamFrame& frame) {
+ if (frame.offset == 0 && frame.fin && frame.data_length > 0) {
+ ++num_ephemeral_packets_;
+ ProcessPacketFromPeer(
+ quiche::QuicheStringPiece(frame.data_buffer, frame.data_length));
+ flow_controller()->AddBytesConsumed(frame.data_length);
+ // TODO(b/147817422): Add a counter for how many streams were actually
+ // closed here.
+ if (GetQuicFlag(FLAGS_qbone_close_ephemeral_frames)) {
+ ResetStream(frame.stream_id, QUIC_STREAM_CANCELLED);
+ }
+ return;
+ }
+ QuicSession::OnStreamFrame(frame);
+}
+
void QboneSessionBase::OnMessageReceived(quiche::QuicheStringPiece message) {
++num_message_packets_;
ProcessPacketFromPeer(message);
}
QuicStream* QboneSessionBase::CreateIncomingStream(QuicStreamId id) {
- QUIC_NOTREACHED();
- return nullptr;
+ return ActivateDataStream(CreateDataStream(id));
}
QuicStream* QboneSessionBase::CreateIncomingStream(PendingStream* /*pending*/) {
@@ -80,60 +110,108 @@
return true;
}
+std::unique_ptr<QuicStream> QboneSessionBase::CreateDataStream(
+ QuicStreamId id) {
+ if (crypto_stream_ == nullptr || !crypto_stream_->encryption_established()) {
+ // Encryption not active so no stream created
+ return nullptr;
+ }
+
+ if (IsIncomingStream(id)) {
+ ++num_streamed_packets_;
+ return std::make_unique<QboneReadOnlyStream>(id, this);
+ }
+
+ return std::make_unique<QboneWriteOnlyStream>(id, this);
+}
+
+QuicStream* QboneSessionBase::ActivateDataStream(
+ std::unique_ptr<QuicStream> stream) {
+ // Transfer ownership of the data stream to the session via ActivateStream().
+ QuicStream* raw = stream.get();
+ if (stream) {
+ // Make QuicSession take ownership of the stream.
+ ActivateStream(std::move(stream));
+ }
+ return raw;
+}
+
void QboneSessionBase::SendPacketToPeer(quiche::QuicheStringPiece packet) {
if (crypto_stream_ == nullptr) {
QUIC_BUG << "Attempting to send packet before encryption established";
return;
}
- QuicUniqueBufferPtr buffer = MakeUniqueBuffer(
- connection()->helper()->GetStreamSendBufferAllocator(), packet.size());
- memcpy(buffer.get(), packet.data(), packet.size());
- QuicMemSlice slice(std::move(buffer), packet.size());
- switch (SendMessage(QuicMemSliceSpan(&slice), /*flush=*/true).status) {
- case MESSAGE_STATUS_SUCCESS:
- break;
- case MESSAGE_STATUS_TOO_LARGE: {
- if (packet.size() < sizeof(ip6_hdr)) {
- QUIC_BUG << "Dropped malformed packet: IPv6 header too short";
+ if (send_packets_as_messages_) {
+ QuicUniqueBufferPtr buffer = MakeUniqueBuffer(
+ connection()->helper()->GetStreamSendBufferAllocator(), packet.size());
+ memcpy(buffer.get(), packet.data(), packet.size());
+ QuicMemSlice slice(std::move(buffer), packet.size());
+ switch (SendMessage(QuicMemSliceSpan(&slice), /*flush=*/true).status) {
+ case MESSAGE_STATUS_SUCCESS:
+ break;
+ case MESSAGE_STATUS_TOO_LARGE: {
+ if (packet.size() < sizeof(ip6_hdr)) {
+ QUIC_BUG << "Dropped malformed packet: IPv6 header too short";
+ break;
+ }
+ auto* header = reinterpret_cast<const ip6_hdr*>(packet.begin());
+ icmp6_hdr icmp_header{};
+ icmp_header.icmp6_type = ICMP6_PACKET_TOO_BIG;
+ icmp_header.icmp6_mtu =
+ connection()->GetGuaranteedLargestMessagePayload();
+
+ CreateIcmpPacket(header->ip6_dst, header->ip6_src, icmp_header, packet,
+ [this](quiche::QuicheStringPiece icmp_packet) {
+ writer_->WritePacketToNetwork(icmp_packet.data(),
+ icmp_packet.size());
+ });
break;
}
- auto* header = reinterpret_cast<const ip6_hdr*>(packet.begin());
- icmp6_hdr icmp_header{};
- icmp_header.icmp6_type = ICMP6_PACKET_TOO_BIG;
- icmp_header.icmp6_mtu =
- connection()->GetGuaranteedLargestMessagePayload();
-
- CreateIcmpPacket(header->ip6_dst, header->ip6_src, icmp_header, packet,
- [this](quiche::QuicheStringPiece icmp_packet) {
- writer_->WritePacketToNetwork(icmp_packet.data(),
- icmp_packet.size());
- });
- break;
+ case MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED:
+ QUIC_BUG << "MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED";
+ break;
+ case MESSAGE_STATUS_UNSUPPORTED:
+ QUIC_BUG << "MESSAGE_STATUS_UNSUPPORTED";
+ break;
+ case MESSAGE_STATUS_BLOCKED:
+ QUIC_BUG << "MESSAGE_STATUS_BLOCKED";
+ break;
+ case MESSAGE_STATUS_INTERNAL_ERROR:
+ QUIC_BUG << "MESSAGE_STATUS_INTERNAL_ERROR";
+ break;
}
- case MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED:
- QUIC_BUG << "MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED";
- break;
- case MESSAGE_STATUS_UNSUPPORTED:
- QUIC_BUG << "MESSAGE_STATUS_UNSUPPORTED";
- break;
- case MESSAGE_STATUS_BLOCKED:
- QUIC_BUG << "MESSAGE_STATUS_BLOCKED";
- break;
- case MESSAGE_STATUS_INTERNAL_ERROR:
- QUIC_BUG << "MESSAGE_STATUS_INTERNAL_ERROR";
- break;
+ return;
}
+
+ // QBONE streams are ephemeral.
+ QuicStream* stream = CreateOutgoingStream();
+ if (!stream) {
+ QUIC_BUG << "Failed to create an outgoing QBONE stream.";
+ return;
+ }
+
+ QboneWriteOnlyStream* qbone_stream =
+ static_cast<QboneWriteOnlyStream*>(stream);
+ qbone_stream->WritePacketToQuicStream(packet);
}
uint64_t QboneSessionBase::GetNumEphemeralPackets() const {
return num_ephemeral_packets_;
}
+uint64_t QboneSessionBase::GetNumStreamedPackets() const {
+ return num_streamed_packets_;
+}
+
uint64_t QboneSessionBase::GetNumMessagePackets() const {
return num_message_packets_;
}
+uint64_t QboneSessionBase::GetNumFallbackToStream() const {
+ return num_fallback_to_stream_;
+}
+
void QboneSessionBase::set_writer(QbonePacketWriter* writer) {
writer_ = writer;
testing::testvalue::Adjust("quic_QbonePacketWriter", &writer_);
diff --git a/quic/qbone/qbone_session_base.h b/quic/qbone/qbone_session_base.h
index 4160ef2..4b95130 100644
--- a/quic/qbone/qbone_session_base.h
+++ b/quic/qbone/qbone_session_base.h
@@ -12,6 +12,7 @@
#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_export.h"
#include "net/third_party/quiche/src/quic/qbone/qbone_packet_writer.h"
+#include "net/third_party/quiche/src/quic/qbone/qbone_stream.h"
#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
namespace quic {
@@ -30,6 +31,8 @@
// Overrides from QuicSession.
// This will ensure that the crypto session is created.
void Initialize() override;
+ // This will check if the packet is wholly contained.
+ void OnStreamFrame(const QuicStreamFrame& frame) override;
// Called whenever a MESSAGE frame is received.
void OnMessageReceived(quiche::QuicheStringPiece message) override;
@@ -41,11 +44,22 @@
// a QboneReadOnlyStream.
uint64_t GetNumEphemeralPackets() const;
+ // Returns the number of QBONE network packets that were via
+ // multiple packets, requiring the creation of a QboneReadOnlyStream.
+ uint64_t GetNumStreamedPackets() const;
+
// Returns the number of QBONE network packets that were received using QUIC
// MESSAGE frame.
uint64_t GetNumMessagePackets() const;
+ // Returns the number of times sending a MESSAGE frame failed, and the session
+ // used an ephemeral stream instead.
+ uint64_t GetNumFallbackToStream() const;
+
void set_writer(QbonePacketWriter* writer);
+ void set_send_packets_as_messages(bool send_packets_as_messages) {
+ send_packets_as_messages_ = send_packets_as_messages;
+ }
protected:
virtual std::unique_ptr<QuicCryptoStream> CreateCryptoStream() = 0;
@@ -61,6 +75,12 @@
return true;
}
+ QuicStream* CreateOutgoingStream();
+ std::unique_ptr<QuicStream> CreateDataStream(QuicStreamId id);
+ // Activates a QuicStream. The session takes ownership of the stream, but
+ // returns an unowned pointer to the stream for convenience.
+ QuicStream* ActivateDataStream(std::unique_ptr<QuicStream> stream);
+
// Accepts a given packet from the network and writes it out
// to the QUIC stream. This will create an ephemeral stream per
// packet. This function will return true if a stream was created
@@ -70,6 +90,11 @@
QbonePacketWriter* writer_;
+ // If true, MESSAGE frames are used for short datagrams. If false, ephemeral
+ // streams are used instead. Note that receiving MESSAGE frames is always
+ // supported.
+ bool send_packets_as_messages_ = false;
+
private:
// Used for the crypto handshake.
std::unique_ptr<QuicCryptoStream> crypto_stream_;
diff --git a/quic/qbone/qbone_session_test.cc b/quic/qbone/qbone_session_test.cc
index 493034f..243a5c5 100644
--- a/quic/qbone/qbone_session_test.cc
+++ b/quic/qbone/qbone_session_test.cc
@@ -190,7 +190,7 @@
class DataSavingQbonePacketWriter : public QbonePacketWriter {
public:
void WritePacketToNetwork(const char* packet, size_t size) override {
- data_.emplace_back(std::string(packet, size));
+ data_.push_back(std::string(packet, size));
}
const std::vector<std::string>& data() { return data_; }
@@ -425,7 +425,7 @@
// Test handshake establishment and sending/receiving of data for two
// directions.
- void TestConnection() {
+ void TestStreamConnection(bool use_messages) {
ASSERT_TRUE(server_peer_->OneRttKeysAvailable());
ASSERT_TRUE(client_peer_->OneRttKeysAvailable());
ASSERT_TRUE(server_peer_->IsEncryptionEstablished());
@@ -457,18 +457,22 @@
EXPECT_EQ(0u, server_peer_->GetNumActiveStreams());
EXPECT_EQ(0u, client_peer_->GetNumActiveStreams());
- const QuicPacketLength max_packet_length = std::max(
- server_peer_->connection()->GetGuaranteedLargestMessagePayload(),
- client_peer_->connection()->GetGuaranteedLargestMessagePayload());
-
- std::string long_data(max_packet_length + 1, 'A');
+ // Try to send long payloads that are larger than the QUIC MTU but
+ // smaller than the QBONE max size.
+ // This should trigger the non-ephemeral stream code path.
+ std::string long_data(
+ QboneConstants::kMaxQbonePacketBytes - sizeof(ip6_hdr) - 1, 'A');
QUIC_LOG(INFO) << "Sending server -> client long data";
server_peer_->ProcessPacketFromNetwork(TestPacketIn(long_data));
runner_.Run();
- ExpectICMPTooBigResponse(
- server_writer_->data(),
- server_peer_->connection()->GetGuaranteedLargestMessagePayload(),
- TestPacketOut(long_data));
+ if (use_messages) {
+ ExpectICMPTooBigResponse(
+ server_writer_->data(),
+ server_peer_->connection()->GetGuaranteedLargestMessagePayload(),
+ TestPacketOut(long_data));
+ } else {
+ EXPECT_THAT(client_writer_->data(), Contains(TestPacketOut(long_data)));
+ }
EXPECT_THAT(server_writer_->data(),
Not(Contains(TestPacketOut(long_data))));
EXPECT_EQ(0u, server_peer_->GetNumActiveStreams());
@@ -477,18 +481,34 @@
QUIC_LOG(INFO) << "Sending client -> server long data";
client_peer_->ProcessPacketFromNetwork(TestPacketIn(long_data));
runner_.Run();
- ExpectICMPTooBigResponse(
- client_writer_->data(),
- client_peer_->connection()->GetGuaranteedLargestMessagePayload(),
- TestPacketIn(long_data));
+ if (use_messages) {
+ ExpectICMPTooBigResponse(
+ client_writer_->data(),
+ client_peer_->connection()->GetGuaranteedLargestMessagePayload(),
+ TestPacketIn(long_data));
+ } else {
+ EXPECT_THAT(server_writer_->data(), Contains(TestPacketOut(long_data)));
+ }
EXPECT_FALSE(client_peer_->EarlyDataAccepted());
EXPECT_FALSE(client_peer_->ReceivedInchoateReject());
EXPECT_THAT(client_peer_->GetNumReceivedServerConfigUpdates(), Eq(0));
- EXPECT_THAT(client_peer_->GetNumEphemeralPackets(), Eq(0));
- EXPECT_THAT(server_peer_->GetNumEphemeralPackets(), Eq(0));
- EXPECT_THAT(client_peer_->GetNumMessagePackets(), Eq(2));
- EXPECT_THAT(server_peer_->GetNumMessagePackets(), Eq(2));
+ if (!use_messages) {
+ EXPECT_THAT(client_peer_->GetNumStreamedPackets(), Eq(1));
+ EXPECT_THAT(server_peer_->GetNumStreamedPackets(), Eq(1));
+ }
+
+ if (use_messages) {
+ EXPECT_THAT(client_peer_->GetNumEphemeralPackets(), Eq(0));
+ EXPECT_THAT(server_peer_->GetNumEphemeralPackets(), Eq(0));
+ EXPECT_THAT(client_peer_->GetNumMessagePackets(), Eq(2));
+ EXPECT_THAT(server_peer_->GetNumMessagePackets(), Eq(2));
+ } else {
+ EXPECT_THAT(client_peer_->GetNumEphemeralPackets(), Eq(2));
+ EXPECT_THAT(server_peer_->GetNumEphemeralPackets(), Eq(2));
+ EXPECT_THAT(client_peer_->GetNumMessagePackets(), Eq(0));
+ EXPECT_THAT(server_peer_->GetNumMessagePackets(), Eq(0));
+ }
// All streams are ephemeral and should be gone.
EXPECT_EQ(0u, server_peer_->GetNumActiveStreams());
@@ -532,10 +552,20 @@
::testing::ValuesIn(GetTestParams()),
::testing::PrintToStringParamName());
+TEST_P(QboneSessionTest, StreamConnection) {
+ CreateClientAndServerSessions();
+ client_peer_->set_send_packets_as_messages(false);
+ server_peer_->set_send_packets_as_messages(false);
+ StartHandshake();
+ TestStreamConnection(false);
+}
+
TEST_P(QboneSessionTest, Messages) {
CreateClientAndServerSessions();
+ client_peer_->set_send_packets_as_messages(true);
+ server_peer_->set_send_packets_as_messages(true);
StartHandshake();
- TestConnection();
+ TestStreamConnection(true);
}
TEST_P(QboneSessionTest, ClientRejection) {
diff --git a/quic/qbone/qbone_stream.cc b/quic/qbone/qbone_stream.cc
new file mode 100644
index 0000000..4d18889
--- /dev/null
+++ b/quic/qbone/qbone_stream.cc
@@ -0,0 +1,63 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/qbone/qbone_stream.h"
+
+#include "net/third_party/quiche/src/quic/core/quic_data_reader.h"
+#include "net/third_party/quiche/src/quic/core/quic_data_writer.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/qbone/qbone_constants.h"
+#include "net/third_party/quiche/src/quic/qbone/qbone_session_base.h"
+#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
+
+ABSL_FLAG(int, qbone_stream_ttl_secs, 3, "The QBONE Stream TTL in seconds.");
+
+namespace quic {
+
+QboneWriteOnlyStream::QboneWriteOnlyStream(QuicStreamId id,
+ QuicSession* session)
+ : QuicStream(id, session, /*is_static=*/false, WRITE_UNIDIRECTIONAL) {
+ // QBONE uses a LIFO queue to try to always make progress. An individual
+ // packet may persist for upto to qbone_stream_ttl_secs seconds in memory.
+ MaybeSetTtl(
+ QuicTime::Delta::FromSeconds(GetQuicFlag(FLAGS_qbone_stream_ttl_secs)));
+}
+
+void QboneWriteOnlyStream::WritePacketToQuicStream(
+ quiche::QuicheStringPiece packet) {
+ // Streams are one way and ephemeral. This function should only be
+ // called once.
+ WriteOrBufferData(packet, /* fin= */ true, nullptr);
+}
+
+QboneReadOnlyStream::QboneReadOnlyStream(QuicStreamId id,
+ QboneSessionBase* session)
+ : QuicStream(id,
+ session,
+ /*is_static=*/false,
+ READ_UNIDIRECTIONAL),
+ session_(session) {
+ // QBONE uses a LIFO queue to try to always make progress. An individual
+ // packet may persist for upto to qbone_stream_ttl_secs seconds in memory.
+ MaybeSetTtl(
+ QuicTime::Delta::FromSeconds(GetQuicFlag(FLAGS_qbone_stream_ttl_secs)));
+}
+
+void QboneReadOnlyStream::OnDataAvailable() {
+ // Read in data and buffer it, attempt to frame to see if there's a packet.
+ sequencer()->Read(&buffer_);
+ if (sequencer()->IsClosed()) {
+ session_->ProcessPacketFromPeer(buffer_);
+ OnFinRead();
+ return;
+ }
+ if (buffer_.size() > QboneConstants::kMaxQbonePacketBytes) {
+ if (!rst_sent()) {
+ Reset(QUIC_BAD_APPLICATION_PAYLOAD);
+ }
+ StopReading();
+ }
+}
+
+} // namespace quic
diff --git a/quic/qbone/qbone_stream.h b/quic/qbone/qbone_stream.h
new file mode 100644
index 0000000..8a6313b
--- /dev/null
+++ b/quic/qbone/qbone_stream.h
@@ -0,0 +1,56 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_QBONE_QBONE_STREAM_H_
+#define QUICHE_QUIC_QBONE_QBONE_STREAM_H_
+
+#include "net/third_party/quiche/src/quic/core/quic_session.h"
+#include "net/third_party/quiche/src/quic/core/quic_stream.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_export.h"
+#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
+
+namespace quic {
+
+class QboneSessionBase;
+
+// QboneWriteOnlyStream is responsible for sending data for a single
+// packet to the other side.
+// Note that the stream will be created HalfClosed (reads will be closed).
+class QUIC_EXPORT_PRIVATE QboneWriteOnlyStream : public QuicStream {
+ public:
+ QboneWriteOnlyStream(QuicStreamId id, QuicSession* session);
+
+ // QuicStream implementation. QBONE writers are ephemeral and don't
+ // read any data.
+ void OnDataAvailable() override {}
+
+ // Write a network packet over the quic stream.
+ void WritePacketToQuicStream(quiche::QuicheStringPiece packet);
+};
+
+// QboneReadOnlyStream will be used if we find an incoming stream that
+// isn't fully contained. It will buffer the data when available and
+// attempt to parse it as a packet to send to the network when a FIN
+// is found.
+// Note that the stream will be created HalfClosed (writes will be closed).
+class QUIC_EXPORT_PRIVATE QboneReadOnlyStream : public QuicStream {
+ public:
+ QboneReadOnlyStream(QuicStreamId id, QboneSessionBase* session);
+
+ ~QboneReadOnlyStream() override = default;
+
+ // QuicStream overrides.
+ // OnDataAvailable is called when there is data in the quic stream buffer.
+ // This will copy the buffer locally and attempt to parse it to write out
+ // packets to the network.
+ void OnDataAvailable() override;
+
+ private:
+ std::string buffer_;
+ QboneSessionBase* session_;
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_QBONE_QBONE_STREAM_H_
diff --git a/quic/qbone/qbone_stream_test.cc b/quic/qbone/qbone_stream_test.cc
new file mode 100644
index 0000000..cfe9645
--- /dev/null
+++ b/quic/qbone/qbone_stream_test.cc
@@ -0,0 +1,262 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/qbone/qbone_stream.h"
+
+#include <utility>
+
+#include "net/third_party/quiche/src/quic/core/crypto/quic_random.h"
+#include "net/third_party/quiche/src/quic/core/quic_session.h"
+#include "net/third_party/quiche/src/quic/core/quic_simple_buffer_allocator.h"
+#include "net/third_party/quiche/src/quic/core/quic_utils.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test_loopback.h"
+#include "net/third_party/quiche/src/quic/qbone/qbone_constants.h"
+#include "net/third_party/quiche/src/quic/qbone/qbone_session_base.h"
+#include "net/third_party/quiche/src/quic/test_tools/mock_clock.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h"
+#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
+#include "net/third_party/quiche/src/spdy/core/spdy_protocol.h"
+
+namespace quic {
+
+namespace {
+
+using ::testing::_;
+using ::testing::StrictMock;
+
+// MockQuicSession that does not create streams and writes data from
+// QuicStream to a string.
+class MockQuicSession : public QboneSessionBase {
+ public:
+ MockQuicSession(QuicConnection* connection, const QuicConfig& config)
+ : QboneSessionBase(connection,
+ nullptr /*visitor*/,
+ config,
+ CurrentSupportedVersions(),
+ nullptr /*writer*/) {}
+
+ ~MockQuicSession() override {}
+
+ // Writes outgoing data from QuicStream to a string.
+ QuicConsumedData WritevData(
+ QuicStreamId id,
+ size_t write_length,
+ QuicStreamOffset offset,
+ StreamSendingState state,
+ TransmissionType type,
+ quiche::QuicheOptional<EncryptionLevel> level) override {
+ if (!writable_) {
+ return QuicConsumedData(0, false);
+ }
+
+ return QuicConsumedData(write_length, state != StreamSendingState::NO_FIN);
+ }
+
+ QboneReadOnlyStream* CreateIncomingStream(QuicStreamId id) override {
+ return nullptr;
+ }
+
+ const QuicCryptoStream* GetCryptoStream() const override { return nullptr; }
+ QuicCryptoStream* GetMutableCryptoStream() override { return nullptr; }
+
+ // Called by QuicStream when they want to close stream.
+ MOCK_METHOD(void,
+ SendRstStream,
+ (QuicStreamId, QuicRstStreamErrorCode, QuicStreamOffset, bool),
+ (override));
+
+ // Sets whether data is written to buffer, or else if this is write blocked.
+ void set_writable(bool writable) { writable_ = writable; }
+
+ // Tracks whether the stream is write blocked and its priority.
+ void RegisterReliableStream(QuicStreamId stream_id) {
+ // The priority effectively does not matter. Put all streams on the same
+ // priority.
+ write_blocked_streams()->RegisterStream(
+ stream_id,
+ /*is_static_stream=*/false,
+ /* precedence= */ spdy::SpdyStreamPrecedence(3));
+ }
+
+ // The session take ownership of the stream.
+ void ActivateReliableStream(std::unique_ptr<QuicStream> stream) {
+ ActivateStream(std::move(stream));
+ }
+
+ std::unique_ptr<QuicCryptoStream> CreateCryptoStream() override {
+ return nullptr;
+ }
+
+ MOCK_METHOD(void,
+ ProcessPacketFromPeer,
+ (quiche::QuicheStringPiece),
+ (override));
+ MOCK_METHOD(void,
+ ProcessPacketFromNetwork,
+ (quiche::QuicheStringPiece),
+ (override));
+
+ private:
+ // Whether data is written to write_buffer_.
+ bool writable_ = true;
+};
+
+// Packet writer that does nothing. This is required for QuicConnection but
+// isn't used for writing data.
+class DummyPacketWriter : public QuicPacketWriter {
+ public:
+ DummyPacketWriter() {}
+
+ // QuicPacketWriter overrides.
+ WriteResult WritePacket(const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ PerPacketOptions* options) override {
+ return WriteResult(WRITE_STATUS_ERROR, 0);
+ }
+
+ bool IsWriteBlocked() const override { return false; };
+
+ void SetWritable() override {}
+
+ QuicByteCount GetMaxPacketSize(
+ const QuicSocketAddress& peer_address) const override {
+ return 0;
+ }
+
+ bool SupportsReleaseTime() const override { return false; }
+
+ bool IsBatchMode() const override { return false; }
+
+ QuicPacketBuffer GetNextWriteLocation(
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address) override {
+ return {nullptr, nullptr};
+ }
+
+ WriteResult Flush() override { return WriteResult(WRITE_STATUS_OK, 0); }
+};
+
+class QboneReadOnlyStreamTest : public ::testing::Test,
+ public QuicConnectionHelperInterface {
+ public:
+ void CreateReliableQuicStream() {
+ // Arbitrary values for QuicConnection.
+ Perspective perspective = Perspective::IS_SERVER;
+ bool owns_writer = true;
+
+ alarm_factory_ = std::make_unique<test::MockAlarmFactory>();
+
+ connection_.reset(new QuicConnection(
+ test::TestConnectionId(0), QuicSocketAddress(TestLoopback(), 0),
+ this /*QuicConnectionHelperInterface*/, alarm_factory_.get(),
+ new DummyPacketWriter(), owns_writer, perspective,
+ ParsedVersionOfIndex(CurrentSupportedVersions(), 0)));
+ clock_.AdvanceTime(QuicTime::Delta::FromSeconds(1));
+ session_ = std::make_unique<StrictMock<MockQuicSession>>(connection_.get(),
+ QuicConfig());
+ stream_ = new QboneReadOnlyStream(kStreamId, session_.get());
+ session_->ActivateReliableStream(
+ std::unique_ptr<QboneReadOnlyStream>(stream_));
+ }
+
+ ~QboneReadOnlyStreamTest() override {}
+
+ const QuicClock* GetClock() const override { return &clock_; }
+
+ QuicRandom* GetRandomGenerator() override {
+ return QuicRandom::GetInstance();
+ }
+
+ QuicBufferAllocator* GetStreamSendBufferAllocator() override {
+ return &buffer_allocator_;
+ }
+
+ protected:
+ // The QuicSession will take the ownership.
+ QboneReadOnlyStream* stream_;
+ std::unique_ptr<StrictMock<MockQuicSession>> session_;
+ std::unique_ptr<QuicAlarmFactory> alarm_factory_;
+ std::unique_ptr<QuicConnection> connection_;
+ // Used to implement the QuicConnectionHelperInterface.
+ SimpleBufferAllocator buffer_allocator_;
+ MockClock clock_;
+ const QuicStreamId kStreamId = QuicUtils::GetFirstUnidirectionalStreamId(
+ CurrentSupportedVersions()[0].transport_version,
+ Perspective::IS_CLIENT);
+};
+
+// Read an entire string.
+TEST_F(QboneReadOnlyStreamTest, ReadDataWhole) {
+ std::string packet = "Stuff";
+ CreateReliableQuicStream();
+ QuicStreamFrame frame(kStreamId, true, 0, packet);
+ EXPECT_CALL(*session_, ProcessPacketFromPeer("Stuff"));
+ stream_->OnStreamFrame(frame);
+}
+
+// Test buffering.
+TEST_F(QboneReadOnlyStreamTest, ReadBuffered) {
+ CreateReliableQuicStream();
+ std::string packet = "Stuf";
+ {
+ QuicStreamFrame frame(kStreamId, false, 0, packet);
+ stream_->OnStreamFrame(frame);
+ }
+ // We didn't write 5 bytes yet...
+
+ packet = "f";
+ EXPECT_CALL(*session_, ProcessPacketFromPeer("Stuff"));
+ {
+ QuicStreamFrame frame(kStreamId, true, 4, packet);
+ stream_->OnStreamFrame(frame);
+ }
+}
+
+TEST_F(QboneReadOnlyStreamTest, ReadOutOfOrder) {
+ CreateReliableQuicStream();
+ std::string packet = "f";
+ {
+ QuicStreamFrame frame(kStreamId, true, 4, packet);
+ stream_->OnStreamFrame(frame);
+ }
+
+ packet = "S";
+ {
+ QuicStreamFrame frame(kStreamId, false, 0, packet);
+ stream_->OnStreamFrame(frame);
+ }
+
+ packet = "tuf";
+ EXPECT_CALL(*session_, ProcessPacketFromPeer("Stuff"));
+ {
+ QuicStreamFrame frame(kStreamId, false, 1, packet);
+ stream_->OnStreamFrame(frame);
+ }
+}
+
+// Test buffering too many bytes.
+TEST_F(QboneReadOnlyStreamTest, ReadBufferedTooLarge) {
+ CreateReliableQuicStream();
+ std::string packet = "0123456789";
+ int iterations = (QboneConstants::kMaxQbonePacketBytes / packet.size()) + 2;
+ EXPECT_CALL(*session_,
+ SendRstStream(kStreamId, QUIC_BAD_APPLICATION_PAYLOAD, _, _));
+ for (int i = 0; i < iterations; ++i) {
+ QuicStreamFrame frame(kStreamId, i == (iterations - 1), i * packet.size(),
+ packet);
+ if (!stream_->reading_stopped()) {
+ stream_->OnStreamFrame(frame);
+ }
+ }
+ // We should have nothing written to the network and the stream
+ // should have stopped reading.
+ EXPECT_TRUE(stream_->reading_stopped());
+}
+
+} // namespace
+
+} // namespace quic