Automated g4 rollback of changelist 330572491. *** Reason for rollback *** The qbone client should continue sending the use messages flag until arnolds have rolled out with the change to only ever use messages. *** Original change description *** Remove support for ephemeral streams in QBONE. These have been unused for months (since switching to message frames). *** PiperOrigin-RevId: 330801096 Change-Id: I0ca42ebdf3734056c296ba4acc1720810f3e945a
diff --git a/quic/qbone/qbone_client.cc b/quic/qbone/qbone_client.cc index c3d8f73..a2ba117 100644 --- a/quic/qbone/qbone_client.cc +++ b/quic/qbone/qbone_client.cc
@@ -10,6 +10,7 @@ #include "net/third_party/quiche/src/quic/core/quic_epoll_connection_helper.h" #include "net/third_party/quiche/src/quic/platform/api/quic_epoll.h" #include "net/third_party/quiche/src/quic/platform/api/quic_exported_stats.h" +#include "net/third_party/quiche/src/quic/qbone/qbone_stream.h" #include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h" namespace quic {
diff --git a/quic/qbone/qbone_client_test.cc b/quic/qbone/qbone_client_test.cc index a9b9994..cfe6c46 100644 --- a/quic/qbone/qbone_client_test.cc +++ b/quic/qbone/qbone_client_test.cc
@@ -7,6 +7,7 @@ #include "net/third_party/quiche/src/quic/qbone/qbone_client.h" #include "net/third_party/quiche/src/quic/core/quic_alarm_factory.h" +#include "net/third_party/quiche/src/quic/core/quic_default_packet_writer.h" #include "net/third_party/quiche/src/quic/core/quic_dispatcher.h" #include "net/third_party/quiche/src/quic/core/quic_epoll_alarm_factory.h" #include "net/third_party/quiche/src/quic/core/quic_epoll_connection_helper.h" @@ -58,7 +59,7 @@ public: void WritePacketToNetwork(const char* packet, size_t size) override { QuicWriterMutexLock lock(&mu_); - data_.emplace_back(std::string(packet, size)); + data_.push_back(std::string(packet, size)); } std::vector<std::string> data() {
diff --git a/quic/qbone/qbone_constants.h b/quic/qbone/qbone_constants.h index 9925952..eeb6171 100644 --- a/quic/qbone/qbone_constants.h +++ b/quic/qbone/qbone_constants.h
@@ -16,7 +16,7 @@ // QBONE's ALPN static constexpr char kQboneAlpn[] = "qbone"; // The maximum number of bytes allowed in a QBONE packet. - static const QuicByteCount kMaxQbonePacketBytes = 1280; + static const QuicByteCount kMaxQbonePacketBytes = 2000; // The table id for QBONE's routing table. 'bone' in ascii. static const uint32_t kQboneRouteTableId = 0x626F6E65; // The stream ID of the control channel.
diff --git a/quic/qbone/qbone_session_base.cc b/quic/qbone/qbone_session_base.cc index 1f8a512..e534398 100644 --- a/quic/qbone/qbone_session_base.cc +++ b/quic/qbone/qbone_session_base.cc
@@ -10,6 +10,7 @@ #include <utility> #include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h" +#include "net/third_party/quiche/src/quic/core/quic_data_reader.h" #include "net/third_party/quiche/src/quic/core/quic_types.h" #include "net/third_party/quiche/src/quic/platform/api/quic_exported_stats.h" #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h" @@ -17,8 +18,17 @@ #include "net/third_party/quiche/src/quic/qbone/qbone_constants.h" #include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h" +ABSL_FLAG( + bool, + qbone_close_ephemeral_frames, + true, + "If true, we'll call CloseStream even when we receive ephemeral frames."); + namespace quic { +#define ENDPOINT \ + (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ") + QboneSessionBase::QboneSessionBase( QuicConnection* connection, Visitor* owner, @@ -60,14 +70,34 @@ return crypto_stream_.get(); } +QuicStream* QboneSessionBase::CreateOutgoingStream() { + return ActivateDataStream( + CreateDataStream(GetNextOutgoingUnidirectionalStreamId())); +} + +void QboneSessionBase::OnStreamFrame(const QuicStreamFrame& frame) { + if (frame.offset == 0 && frame.fin && frame.data_length > 0) { + ++num_ephemeral_packets_; + ProcessPacketFromPeer( + quiche::QuicheStringPiece(frame.data_buffer, frame.data_length)); + flow_controller()->AddBytesConsumed(frame.data_length); + // TODO(b/147817422): Add a counter for how many streams were actually + // closed here. + if (GetQuicFlag(FLAGS_qbone_close_ephemeral_frames)) { + ResetStream(frame.stream_id, QUIC_STREAM_CANCELLED); + } + return; + } + QuicSession::OnStreamFrame(frame); +} + void QboneSessionBase::OnMessageReceived(quiche::QuicheStringPiece message) { ++num_message_packets_; ProcessPacketFromPeer(message); } QuicStream* QboneSessionBase::CreateIncomingStream(QuicStreamId id) { - QUIC_NOTREACHED(); - return nullptr; + return ActivateDataStream(CreateDataStream(id)); } QuicStream* QboneSessionBase::CreateIncomingStream(PendingStream* /*pending*/) { @@ -80,60 +110,108 @@ return true; } +std::unique_ptr<QuicStream> QboneSessionBase::CreateDataStream( + QuicStreamId id) { + if (crypto_stream_ == nullptr || !crypto_stream_->encryption_established()) { + // Encryption not active so no stream created + return nullptr; + } + + if (IsIncomingStream(id)) { + ++num_streamed_packets_; + return std::make_unique<QboneReadOnlyStream>(id, this); + } + + return std::make_unique<QboneWriteOnlyStream>(id, this); +} + +QuicStream* QboneSessionBase::ActivateDataStream( + std::unique_ptr<QuicStream> stream) { + // Transfer ownership of the data stream to the session via ActivateStream(). + QuicStream* raw = stream.get(); + if (stream) { + // Make QuicSession take ownership of the stream. + ActivateStream(std::move(stream)); + } + return raw; +} + void QboneSessionBase::SendPacketToPeer(quiche::QuicheStringPiece packet) { if (crypto_stream_ == nullptr) { QUIC_BUG << "Attempting to send packet before encryption established"; return; } - QuicUniqueBufferPtr buffer = MakeUniqueBuffer( - connection()->helper()->GetStreamSendBufferAllocator(), packet.size()); - memcpy(buffer.get(), packet.data(), packet.size()); - QuicMemSlice slice(std::move(buffer), packet.size()); - switch (SendMessage(QuicMemSliceSpan(&slice), /*flush=*/true).status) { - case MESSAGE_STATUS_SUCCESS: - break; - case MESSAGE_STATUS_TOO_LARGE: { - if (packet.size() < sizeof(ip6_hdr)) { - QUIC_BUG << "Dropped malformed packet: IPv6 header too short"; + if (send_packets_as_messages_) { + QuicUniqueBufferPtr buffer = MakeUniqueBuffer( + connection()->helper()->GetStreamSendBufferAllocator(), packet.size()); + memcpy(buffer.get(), packet.data(), packet.size()); + QuicMemSlice slice(std::move(buffer), packet.size()); + switch (SendMessage(QuicMemSliceSpan(&slice), /*flush=*/true).status) { + case MESSAGE_STATUS_SUCCESS: + break; + case MESSAGE_STATUS_TOO_LARGE: { + if (packet.size() < sizeof(ip6_hdr)) { + QUIC_BUG << "Dropped malformed packet: IPv6 header too short"; + break; + } + auto* header = reinterpret_cast<const ip6_hdr*>(packet.begin()); + icmp6_hdr icmp_header{}; + icmp_header.icmp6_type = ICMP6_PACKET_TOO_BIG; + icmp_header.icmp6_mtu = + connection()->GetGuaranteedLargestMessagePayload(); + + CreateIcmpPacket(header->ip6_dst, header->ip6_src, icmp_header, packet, + [this](quiche::QuicheStringPiece icmp_packet) { + writer_->WritePacketToNetwork(icmp_packet.data(), + icmp_packet.size()); + }); break; } - auto* header = reinterpret_cast<const ip6_hdr*>(packet.begin()); - icmp6_hdr icmp_header{}; - icmp_header.icmp6_type = ICMP6_PACKET_TOO_BIG; - icmp_header.icmp6_mtu = - connection()->GetGuaranteedLargestMessagePayload(); - - CreateIcmpPacket(header->ip6_dst, header->ip6_src, icmp_header, packet, - [this](quiche::QuicheStringPiece icmp_packet) { - writer_->WritePacketToNetwork(icmp_packet.data(), - icmp_packet.size()); - }); - break; + case MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED: + QUIC_BUG << "MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED"; + break; + case MESSAGE_STATUS_UNSUPPORTED: + QUIC_BUG << "MESSAGE_STATUS_UNSUPPORTED"; + break; + case MESSAGE_STATUS_BLOCKED: + QUIC_BUG << "MESSAGE_STATUS_BLOCKED"; + break; + case MESSAGE_STATUS_INTERNAL_ERROR: + QUIC_BUG << "MESSAGE_STATUS_INTERNAL_ERROR"; + break; } - case MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED: - QUIC_BUG << "MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED"; - break; - case MESSAGE_STATUS_UNSUPPORTED: - QUIC_BUG << "MESSAGE_STATUS_UNSUPPORTED"; - break; - case MESSAGE_STATUS_BLOCKED: - QUIC_BUG << "MESSAGE_STATUS_BLOCKED"; - break; - case MESSAGE_STATUS_INTERNAL_ERROR: - QUIC_BUG << "MESSAGE_STATUS_INTERNAL_ERROR"; - break; + return; } + + // QBONE streams are ephemeral. + QuicStream* stream = CreateOutgoingStream(); + if (!stream) { + QUIC_BUG << "Failed to create an outgoing QBONE stream."; + return; + } + + QboneWriteOnlyStream* qbone_stream = + static_cast<QboneWriteOnlyStream*>(stream); + qbone_stream->WritePacketToQuicStream(packet); } uint64_t QboneSessionBase::GetNumEphemeralPackets() const { return num_ephemeral_packets_; } +uint64_t QboneSessionBase::GetNumStreamedPackets() const { + return num_streamed_packets_; +} + uint64_t QboneSessionBase::GetNumMessagePackets() const { return num_message_packets_; } +uint64_t QboneSessionBase::GetNumFallbackToStream() const { + return num_fallback_to_stream_; +} + void QboneSessionBase::set_writer(QbonePacketWriter* writer) { writer_ = writer; testing::testvalue::Adjust("quic_QbonePacketWriter", &writer_);
diff --git a/quic/qbone/qbone_session_base.h b/quic/qbone/qbone_session_base.h index 4160ef2..4b95130 100644 --- a/quic/qbone/qbone_session_base.h +++ b/quic/qbone/qbone_session_base.h
@@ -12,6 +12,7 @@ #include "net/third_party/quiche/src/quic/platform/api/quic_containers.h" #include "net/third_party/quiche/src/quic/platform/api/quic_export.h" #include "net/third_party/quiche/src/quic/qbone/qbone_packet_writer.h" +#include "net/third_party/quiche/src/quic/qbone/qbone_stream.h" #include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h" namespace quic { @@ -30,6 +31,8 @@ // Overrides from QuicSession. // This will ensure that the crypto session is created. void Initialize() override; + // This will check if the packet is wholly contained. + void OnStreamFrame(const QuicStreamFrame& frame) override; // Called whenever a MESSAGE frame is received. void OnMessageReceived(quiche::QuicheStringPiece message) override; @@ -41,11 +44,22 @@ // a QboneReadOnlyStream. uint64_t GetNumEphemeralPackets() const; + // Returns the number of QBONE network packets that were via + // multiple packets, requiring the creation of a QboneReadOnlyStream. + uint64_t GetNumStreamedPackets() const; + // Returns the number of QBONE network packets that were received using QUIC // MESSAGE frame. uint64_t GetNumMessagePackets() const; + // Returns the number of times sending a MESSAGE frame failed, and the session + // used an ephemeral stream instead. + uint64_t GetNumFallbackToStream() const; + void set_writer(QbonePacketWriter* writer); + void set_send_packets_as_messages(bool send_packets_as_messages) { + send_packets_as_messages_ = send_packets_as_messages; + } protected: virtual std::unique_ptr<QuicCryptoStream> CreateCryptoStream() = 0; @@ -61,6 +75,12 @@ return true; } + QuicStream* CreateOutgoingStream(); + std::unique_ptr<QuicStream> CreateDataStream(QuicStreamId id); + // Activates a QuicStream. The session takes ownership of the stream, but + // returns an unowned pointer to the stream for convenience. + QuicStream* ActivateDataStream(std::unique_ptr<QuicStream> stream); + // Accepts a given packet from the network and writes it out // to the QUIC stream. This will create an ephemeral stream per // packet. This function will return true if a stream was created @@ -70,6 +90,11 @@ QbonePacketWriter* writer_; + // If true, MESSAGE frames are used for short datagrams. If false, ephemeral + // streams are used instead. Note that receiving MESSAGE frames is always + // supported. + bool send_packets_as_messages_ = false; + private: // Used for the crypto handshake. std::unique_ptr<QuicCryptoStream> crypto_stream_;
diff --git a/quic/qbone/qbone_session_test.cc b/quic/qbone/qbone_session_test.cc index 493034f..243a5c5 100644 --- a/quic/qbone/qbone_session_test.cc +++ b/quic/qbone/qbone_session_test.cc
@@ -190,7 +190,7 @@ class DataSavingQbonePacketWriter : public QbonePacketWriter { public: void WritePacketToNetwork(const char* packet, size_t size) override { - data_.emplace_back(std::string(packet, size)); + data_.push_back(std::string(packet, size)); } const std::vector<std::string>& data() { return data_; } @@ -425,7 +425,7 @@ // Test handshake establishment and sending/receiving of data for two // directions. - void TestConnection() { + void TestStreamConnection(bool use_messages) { ASSERT_TRUE(server_peer_->OneRttKeysAvailable()); ASSERT_TRUE(client_peer_->OneRttKeysAvailable()); ASSERT_TRUE(server_peer_->IsEncryptionEstablished()); @@ -457,18 +457,22 @@ EXPECT_EQ(0u, server_peer_->GetNumActiveStreams()); EXPECT_EQ(0u, client_peer_->GetNumActiveStreams()); - const QuicPacketLength max_packet_length = std::max( - server_peer_->connection()->GetGuaranteedLargestMessagePayload(), - client_peer_->connection()->GetGuaranteedLargestMessagePayload()); - - std::string long_data(max_packet_length + 1, 'A'); + // Try to send long payloads that are larger than the QUIC MTU but + // smaller than the QBONE max size. + // This should trigger the non-ephemeral stream code path. + std::string long_data( + QboneConstants::kMaxQbonePacketBytes - sizeof(ip6_hdr) - 1, 'A'); QUIC_LOG(INFO) << "Sending server -> client long data"; server_peer_->ProcessPacketFromNetwork(TestPacketIn(long_data)); runner_.Run(); - ExpectICMPTooBigResponse( - server_writer_->data(), - server_peer_->connection()->GetGuaranteedLargestMessagePayload(), - TestPacketOut(long_data)); + if (use_messages) { + ExpectICMPTooBigResponse( + server_writer_->data(), + server_peer_->connection()->GetGuaranteedLargestMessagePayload(), + TestPacketOut(long_data)); + } else { + EXPECT_THAT(client_writer_->data(), Contains(TestPacketOut(long_data))); + } EXPECT_THAT(server_writer_->data(), Not(Contains(TestPacketOut(long_data)))); EXPECT_EQ(0u, server_peer_->GetNumActiveStreams()); @@ -477,18 +481,34 @@ QUIC_LOG(INFO) << "Sending client -> server long data"; client_peer_->ProcessPacketFromNetwork(TestPacketIn(long_data)); runner_.Run(); - ExpectICMPTooBigResponse( - client_writer_->data(), - client_peer_->connection()->GetGuaranteedLargestMessagePayload(), - TestPacketIn(long_data)); + if (use_messages) { + ExpectICMPTooBigResponse( + client_writer_->data(), + client_peer_->connection()->GetGuaranteedLargestMessagePayload(), + TestPacketIn(long_data)); + } else { + EXPECT_THAT(server_writer_->data(), Contains(TestPacketOut(long_data))); + } EXPECT_FALSE(client_peer_->EarlyDataAccepted()); EXPECT_FALSE(client_peer_->ReceivedInchoateReject()); EXPECT_THAT(client_peer_->GetNumReceivedServerConfigUpdates(), Eq(0)); - EXPECT_THAT(client_peer_->GetNumEphemeralPackets(), Eq(0)); - EXPECT_THAT(server_peer_->GetNumEphemeralPackets(), Eq(0)); - EXPECT_THAT(client_peer_->GetNumMessagePackets(), Eq(2)); - EXPECT_THAT(server_peer_->GetNumMessagePackets(), Eq(2)); + if (!use_messages) { + EXPECT_THAT(client_peer_->GetNumStreamedPackets(), Eq(1)); + EXPECT_THAT(server_peer_->GetNumStreamedPackets(), Eq(1)); + } + + if (use_messages) { + EXPECT_THAT(client_peer_->GetNumEphemeralPackets(), Eq(0)); + EXPECT_THAT(server_peer_->GetNumEphemeralPackets(), Eq(0)); + EXPECT_THAT(client_peer_->GetNumMessagePackets(), Eq(2)); + EXPECT_THAT(server_peer_->GetNumMessagePackets(), Eq(2)); + } else { + EXPECT_THAT(client_peer_->GetNumEphemeralPackets(), Eq(2)); + EXPECT_THAT(server_peer_->GetNumEphemeralPackets(), Eq(2)); + EXPECT_THAT(client_peer_->GetNumMessagePackets(), Eq(0)); + EXPECT_THAT(server_peer_->GetNumMessagePackets(), Eq(0)); + } // All streams are ephemeral and should be gone. EXPECT_EQ(0u, server_peer_->GetNumActiveStreams()); @@ -532,10 +552,20 @@ ::testing::ValuesIn(GetTestParams()), ::testing::PrintToStringParamName()); +TEST_P(QboneSessionTest, StreamConnection) { + CreateClientAndServerSessions(); + client_peer_->set_send_packets_as_messages(false); + server_peer_->set_send_packets_as_messages(false); + StartHandshake(); + TestStreamConnection(false); +} + TEST_P(QboneSessionTest, Messages) { CreateClientAndServerSessions(); + client_peer_->set_send_packets_as_messages(true); + server_peer_->set_send_packets_as_messages(true); StartHandshake(); - TestConnection(); + TestStreamConnection(true); } TEST_P(QboneSessionTest, ClientRejection) {
diff --git a/quic/qbone/qbone_stream.cc b/quic/qbone/qbone_stream.cc new file mode 100644 index 0000000..4d18889 --- /dev/null +++ b/quic/qbone/qbone_stream.cc
@@ -0,0 +1,63 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/qbone/qbone_stream.h" + +#include "net/third_party/quiche/src/quic/core/quic_data_reader.h" +#include "net/third_party/quiche/src/quic/core/quic_data_writer.h" +#include "net/third_party/quiche/src/quic/core/quic_types.h" +#include "net/third_party/quiche/src/quic/qbone/qbone_constants.h" +#include "net/third_party/quiche/src/quic/qbone/qbone_session_base.h" +#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h" + +ABSL_FLAG(int, qbone_stream_ttl_secs, 3, "The QBONE Stream TTL in seconds."); + +namespace quic { + +QboneWriteOnlyStream::QboneWriteOnlyStream(QuicStreamId id, + QuicSession* session) + : QuicStream(id, session, /*is_static=*/false, WRITE_UNIDIRECTIONAL) { + // QBONE uses a LIFO queue to try to always make progress. An individual + // packet may persist for upto to qbone_stream_ttl_secs seconds in memory. + MaybeSetTtl( + QuicTime::Delta::FromSeconds(GetQuicFlag(FLAGS_qbone_stream_ttl_secs))); +} + +void QboneWriteOnlyStream::WritePacketToQuicStream( + quiche::QuicheStringPiece packet) { + // Streams are one way and ephemeral. This function should only be + // called once. + WriteOrBufferData(packet, /* fin= */ true, nullptr); +} + +QboneReadOnlyStream::QboneReadOnlyStream(QuicStreamId id, + QboneSessionBase* session) + : QuicStream(id, + session, + /*is_static=*/false, + READ_UNIDIRECTIONAL), + session_(session) { + // QBONE uses a LIFO queue to try to always make progress. An individual + // packet may persist for upto to qbone_stream_ttl_secs seconds in memory. + MaybeSetTtl( + QuicTime::Delta::FromSeconds(GetQuicFlag(FLAGS_qbone_stream_ttl_secs))); +} + +void QboneReadOnlyStream::OnDataAvailable() { + // Read in data and buffer it, attempt to frame to see if there's a packet. + sequencer()->Read(&buffer_); + if (sequencer()->IsClosed()) { + session_->ProcessPacketFromPeer(buffer_); + OnFinRead(); + return; + } + if (buffer_.size() > QboneConstants::kMaxQbonePacketBytes) { + if (!rst_sent()) { + Reset(QUIC_BAD_APPLICATION_PAYLOAD); + } + StopReading(); + } +} + +} // namespace quic
diff --git a/quic/qbone/qbone_stream.h b/quic/qbone/qbone_stream.h new file mode 100644 index 0000000..8a6313b --- /dev/null +++ b/quic/qbone/qbone_stream.h
@@ -0,0 +1,56 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_QBONE_QBONE_STREAM_H_ +#define QUICHE_QUIC_QBONE_QBONE_STREAM_H_ + +#include "net/third_party/quiche/src/quic/core/quic_session.h" +#include "net/third_party/quiche/src/quic/core/quic_stream.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_export.h" +#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h" + +namespace quic { + +class QboneSessionBase; + +// QboneWriteOnlyStream is responsible for sending data for a single +// packet to the other side. +// Note that the stream will be created HalfClosed (reads will be closed). +class QUIC_EXPORT_PRIVATE QboneWriteOnlyStream : public QuicStream { + public: + QboneWriteOnlyStream(QuicStreamId id, QuicSession* session); + + // QuicStream implementation. QBONE writers are ephemeral and don't + // read any data. + void OnDataAvailable() override {} + + // Write a network packet over the quic stream. + void WritePacketToQuicStream(quiche::QuicheStringPiece packet); +}; + +// QboneReadOnlyStream will be used if we find an incoming stream that +// isn't fully contained. It will buffer the data when available and +// attempt to parse it as a packet to send to the network when a FIN +// is found. +// Note that the stream will be created HalfClosed (writes will be closed). +class QUIC_EXPORT_PRIVATE QboneReadOnlyStream : public QuicStream { + public: + QboneReadOnlyStream(QuicStreamId id, QboneSessionBase* session); + + ~QboneReadOnlyStream() override = default; + + // QuicStream overrides. + // OnDataAvailable is called when there is data in the quic stream buffer. + // This will copy the buffer locally and attempt to parse it to write out + // packets to the network. + void OnDataAvailable() override; + + private: + std::string buffer_; + QboneSessionBase* session_; +}; + +} // namespace quic + +#endif // QUICHE_QUIC_QBONE_QBONE_STREAM_H_
diff --git a/quic/qbone/qbone_stream_test.cc b/quic/qbone/qbone_stream_test.cc new file mode 100644 index 0000000..cfe9645 --- /dev/null +++ b/quic/qbone/qbone_stream_test.cc
@@ -0,0 +1,262 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/qbone/qbone_stream.h" + +#include <utility> + +#include "net/third_party/quiche/src/quic/core/crypto/quic_random.h" +#include "net/third_party/quiche/src/quic/core/quic_session.h" +#include "net/third_party/quiche/src/quic/core/quic_simple_buffer_allocator.h" +#include "net/third_party/quiche/src/quic/core/quic_utils.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_test.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_test_loopback.h" +#include "net/third_party/quiche/src/quic/qbone/qbone_constants.h" +#include "net/third_party/quiche/src/quic/qbone/qbone_session_base.h" +#include "net/third_party/quiche/src/quic/test_tools/mock_clock.h" +#include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h" +#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h" +#include "net/third_party/quiche/src/spdy/core/spdy_protocol.h" + +namespace quic { + +namespace { + +using ::testing::_; +using ::testing::StrictMock; + +// MockQuicSession that does not create streams and writes data from +// QuicStream to a string. +class MockQuicSession : public QboneSessionBase { + public: + MockQuicSession(QuicConnection* connection, const QuicConfig& config) + : QboneSessionBase(connection, + nullptr /*visitor*/, + config, + CurrentSupportedVersions(), + nullptr /*writer*/) {} + + ~MockQuicSession() override {} + + // Writes outgoing data from QuicStream to a string. + QuicConsumedData WritevData( + QuicStreamId id, + size_t write_length, + QuicStreamOffset offset, + StreamSendingState state, + TransmissionType type, + quiche::QuicheOptional<EncryptionLevel> level) override { + if (!writable_) { + return QuicConsumedData(0, false); + } + + return QuicConsumedData(write_length, state != StreamSendingState::NO_FIN); + } + + QboneReadOnlyStream* CreateIncomingStream(QuicStreamId id) override { + return nullptr; + } + + const QuicCryptoStream* GetCryptoStream() const override { return nullptr; } + QuicCryptoStream* GetMutableCryptoStream() override { return nullptr; } + + // Called by QuicStream when they want to close stream. + MOCK_METHOD(void, + SendRstStream, + (QuicStreamId, QuicRstStreamErrorCode, QuicStreamOffset, bool), + (override)); + + // Sets whether data is written to buffer, or else if this is write blocked. + void set_writable(bool writable) { writable_ = writable; } + + // Tracks whether the stream is write blocked and its priority. + void RegisterReliableStream(QuicStreamId stream_id) { + // The priority effectively does not matter. Put all streams on the same + // priority. + write_blocked_streams()->RegisterStream( + stream_id, + /*is_static_stream=*/false, + /* precedence= */ spdy::SpdyStreamPrecedence(3)); + } + + // The session take ownership of the stream. + void ActivateReliableStream(std::unique_ptr<QuicStream> stream) { + ActivateStream(std::move(stream)); + } + + std::unique_ptr<QuicCryptoStream> CreateCryptoStream() override { + return nullptr; + } + + MOCK_METHOD(void, + ProcessPacketFromPeer, + (quiche::QuicheStringPiece), + (override)); + MOCK_METHOD(void, + ProcessPacketFromNetwork, + (quiche::QuicheStringPiece), + (override)); + + private: + // Whether data is written to write_buffer_. + bool writable_ = true; +}; + +// Packet writer that does nothing. This is required for QuicConnection but +// isn't used for writing data. +class DummyPacketWriter : public QuicPacketWriter { + public: + DummyPacketWriter() {} + + // QuicPacketWriter overrides. + WriteResult WritePacket(const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + PerPacketOptions* options) override { + return WriteResult(WRITE_STATUS_ERROR, 0); + } + + bool IsWriteBlocked() const override { return false; }; + + void SetWritable() override {} + + QuicByteCount GetMaxPacketSize( + const QuicSocketAddress& peer_address) const override { + return 0; + } + + bool SupportsReleaseTime() const override { return false; } + + bool IsBatchMode() const override { return false; } + + QuicPacketBuffer GetNextWriteLocation( + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address) override { + return {nullptr, nullptr}; + } + + WriteResult Flush() override { return WriteResult(WRITE_STATUS_OK, 0); } +}; + +class QboneReadOnlyStreamTest : public ::testing::Test, + public QuicConnectionHelperInterface { + public: + void CreateReliableQuicStream() { + // Arbitrary values for QuicConnection. + Perspective perspective = Perspective::IS_SERVER; + bool owns_writer = true; + + alarm_factory_ = std::make_unique<test::MockAlarmFactory>(); + + connection_.reset(new QuicConnection( + test::TestConnectionId(0), QuicSocketAddress(TestLoopback(), 0), + this /*QuicConnectionHelperInterface*/, alarm_factory_.get(), + new DummyPacketWriter(), owns_writer, perspective, + ParsedVersionOfIndex(CurrentSupportedVersions(), 0))); + clock_.AdvanceTime(QuicTime::Delta::FromSeconds(1)); + session_ = std::make_unique<StrictMock<MockQuicSession>>(connection_.get(), + QuicConfig()); + stream_ = new QboneReadOnlyStream(kStreamId, session_.get()); + session_->ActivateReliableStream( + std::unique_ptr<QboneReadOnlyStream>(stream_)); + } + + ~QboneReadOnlyStreamTest() override {} + + const QuicClock* GetClock() const override { return &clock_; } + + QuicRandom* GetRandomGenerator() override { + return QuicRandom::GetInstance(); + } + + QuicBufferAllocator* GetStreamSendBufferAllocator() override { + return &buffer_allocator_; + } + + protected: + // The QuicSession will take the ownership. + QboneReadOnlyStream* stream_; + std::unique_ptr<StrictMock<MockQuicSession>> session_; + std::unique_ptr<QuicAlarmFactory> alarm_factory_; + std::unique_ptr<QuicConnection> connection_; + // Used to implement the QuicConnectionHelperInterface. + SimpleBufferAllocator buffer_allocator_; + MockClock clock_; + const QuicStreamId kStreamId = QuicUtils::GetFirstUnidirectionalStreamId( + CurrentSupportedVersions()[0].transport_version, + Perspective::IS_CLIENT); +}; + +// Read an entire string. +TEST_F(QboneReadOnlyStreamTest, ReadDataWhole) { + std::string packet = "Stuff"; + CreateReliableQuicStream(); + QuicStreamFrame frame(kStreamId, true, 0, packet); + EXPECT_CALL(*session_, ProcessPacketFromPeer("Stuff")); + stream_->OnStreamFrame(frame); +} + +// Test buffering. +TEST_F(QboneReadOnlyStreamTest, ReadBuffered) { + CreateReliableQuicStream(); + std::string packet = "Stuf"; + { + QuicStreamFrame frame(kStreamId, false, 0, packet); + stream_->OnStreamFrame(frame); + } + // We didn't write 5 bytes yet... + + packet = "f"; + EXPECT_CALL(*session_, ProcessPacketFromPeer("Stuff")); + { + QuicStreamFrame frame(kStreamId, true, 4, packet); + stream_->OnStreamFrame(frame); + } +} + +TEST_F(QboneReadOnlyStreamTest, ReadOutOfOrder) { + CreateReliableQuicStream(); + std::string packet = "f"; + { + QuicStreamFrame frame(kStreamId, true, 4, packet); + stream_->OnStreamFrame(frame); + } + + packet = "S"; + { + QuicStreamFrame frame(kStreamId, false, 0, packet); + stream_->OnStreamFrame(frame); + } + + packet = "tuf"; + EXPECT_CALL(*session_, ProcessPacketFromPeer("Stuff")); + { + QuicStreamFrame frame(kStreamId, false, 1, packet); + stream_->OnStreamFrame(frame); + } +} + +// Test buffering too many bytes. +TEST_F(QboneReadOnlyStreamTest, ReadBufferedTooLarge) { + CreateReliableQuicStream(); + std::string packet = "0123456789"; + int iterations = (QboneConstants::kMaxQbonePacketBytes / packet.size()) + 2; + EXPECT_CALL(*session_, + SendRstStream(kStreamId, QUIC_BAD_APPLICATION_PAYLOAD, _, _)); + for (int i = 0; i < iterations; ++i) { + QuicStreamFrame frame(kStreamId, i == (iterations - 1), i * packet.size(), + packet); + if (!stream_->reading_stopped()) { + stream_->OnStreamFrame(frame); + } + } + // We should have nothing written to the network and the stream + // should have stopped reading. + EXPECT_TRUE(stream_->reading_stopped()); +} + +} // namespace + +} // namespace quic