blob: 68343a5c578fd70091414054b37873470ca9bd08 [file] [log] [blame]
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/core/io/event_loop_tcp_client_socket.h"
#include <functional>
#include <memory>
#include <utility>
#include <vector>
#include "absl/functional/bind_front.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "quiche/quic/core/connecting_client_socket.h"
#include "quiche/quic/core/io/event_loop_socket_factory.h"
#include "quiche/quic/core/io/quic_default_event_loop.h"
#include "quiche/quic/core/io/quic_event_loop.h"
#include "quiche/quic/core/io/socket.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/platform/api/quic_ip_address_family.h"
#include "quiche/quic/platform/api/quic_socket_address.h"
#include "quiche/quic/test_tools/mock_clock.h"
#include "quiche/quic/test_tools/quic_test_utils.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/platform/api/quiche_mem_slice.h"
#include "quiche/common/platform/api/quiche_mutex.h"
#include "quiche/common/platform/api/quiche_test.h"
#include "quiche/common/platform/api/quiche_test_loopback.h"
#include "quiche/common/platform/api/quiche_thread.h"
#include "quiche/common/simple_buffer_allocator.h"
namespace quic::test {
namespace {
bool CreateListeningServerSocket(SocketFd* out_socket_descriptor,
QuicSocketAddress* out_socket_address) {
QUICHE_CHECK(out_socket_descriptor);
QUICHE_CHECK(out_socket_address);
absl::StatusOr<SocketFd> socket = socket_api::CreateSocket(
quiche::TestLoopback().address_family(), socket_api::SocketProtocol::kTcp,
/*blocking=*/true);
QUICHE_CHECK(socket.ok());
// Set an extremely small receive buffer size to increase the odds of buffers
// filling up when testing asynchronous writes.
static const QuicByteCount kReceiveBufferSize = 2;
absl::Status result =
socket_api::SetReceiveBufferSize(socket.value(), kReceiveBufferSize);
QUICHE_CHECK(result.ok());
QuicSocketAddress bind_address(quiche::TestLoopback(), /*port=*/0);
result = socket_api::Bind(socket.value(), bind_address);
QUICHE_CHECK(result.ok());
absl::StatusOr<QuicSocketAddress> socket_address =
socket_api::GetSocketAddress(socket.value());
QUICHE_CHECK(socket_address.ok());
result = socket_api::Listen(socket.value(), /*backlog=*/1);
QUICHE_CHECK(result.ok());
*out_socket_descriptor = socket.value();
*out_socket_address = std::move(socket_address).value();
return true;
}
class TestTcpServerSocketRunner : public quiche::QuicheThread {
public:
using SocketBehavior = std::function<void(SocketFd connected_socket)>;
// On construction, spins a separate thread to accept a connection from
// `server_socket_descriptor`, runs `behavior` with that connection, and then
// closes the accepted connection socket. If `allow_accept_failure` is true,
// will silently stop if an error is encountered accepting the connection.
TestTcpServerSocketRunner(SocketFd server_socket_descriptor,
SocketBehavior behavior,
bool allow_accept_failure = false)
: QuicheThread("TestTcpServerSocketRunner"),
server_socket_descriptor_(server_socket_descriptor),
behavior_(std::move(behavior)),
allow_accept_failure_(allow_accept_failure) {
Start();
}
~TestTcpServerSocketRunner() override { WaitForCompletion(); }
void WaitForCompletion() { completion_notification_.WaitForNotification(); }
protected:
void Run() override {
if (AcceptSocket()) {
behavior_(connection_socket_descriptor_);
CloseSocket();
} else {
QUICHE_CHECK(allow_accept_failure_);
}
completion_notification_.Notify();
}
private:
bool AcceptSocket() {
absl::StatusOr<socket_api::AcceptResult> connection_socket =
socket_api::Accept(server_socket_descriptor_, /*blocking=*/true);
if (connection_socket.ok()) {
connection_socket_descriptor_ = connection_socket.value().fd;
}
return connection_socket.ok();
}
void CloseSocket() {
QUICHE_CHECK(socket_api::Close(connection_socket_descriptor_).ok());
}
const SocketFd server_socket_descriptor_;
const SocketBehavior behavior_;
const bool allow_accept_failure_;
SocketFd connection_socket_descriptor_;
quiche::QuicheNotification completion_notification_;
};
class EventLoopTcpClientSocketTest
: public quiche::test::QuicheTestWithParam<QuicEventLoopFactory*>,
public ConnectingClientSocket::AsyncVisitor {
public:
void SetUp() override {
QUICHE_CHECK(CreateListeningServerSocket(&server_socket_descriptor_,
&server_socket_address_));
}
void TearDown() override {
if (server_socket_descriptor_ != kInvalidSocketFd) {
QUICHE_CHECK(socket_api::Close(server_socket_descriptor_).ok());
}
}
void ConnectComplete(absl::Status status) override {
QUICHE_CHECK(!connect_result_.has_value());
connect_result_ = std::move(status);
}
void ReceiveComplete(absl::StatusOr<quiche::QuicheMemSlice> data) override {
QUICHE_CHECK(!receive_result_.has_value());
receive_result_ = std::move(data);
}
void SendComplete(absl::Status status) override {
QUICHE_CHECK(!send_result_.has_value());
send_result_ = std::move(status);
}
protected:
SocketFd server_socket_descriptor_ = kInvalidSocketFd;
QuicSocketAddress server_socket_address_;
MockClock clock_;
std::unique_ptr<QuicEventLoop> event_loop_ = GetParam()->Create(&clock_);
EventLoopSocketFactory socket_factory_{event_loop_.get(),
quiche::SimpleBufferAllocator::Get()};
absl::optional<absl::Status> connect_result_;
absl::optional<absl::StatusOr<quiche::QuicheMemSlice>> receive_result_;
absl::optional<absl::Status> send_result_;
};
std::string GetTestParamName(
::testing::TestParamInfo<QuicEventLoopFactory*> info) {
return EscapeTestParamName(info.param->GetName());
}
INSTANTIATE_TEST_SUITE_P(EventLoopTcpClientSocketTests,
EventLoopTcpClientSocketTest,
::testing::ValuesIn(GetAllSupportedEventLoops()),
&GetTestParamName);
TEST_P(EventLoopTcpClientSocketTest, Connect) {
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/nullptr);
// No socket runner to accept the connection for the server, but that is not
// expected to be necessary for the connection to complete from the client.
EXPECT_TRUE(socket->ConnectBlocking().ok());
socket->Disconnect();
}
TEST_P(EventLoopTcpClientSocketTest, ConnectAsync) {
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/this);
socket->ConnectAsync();
// Synchronous completion not normally expected, but since there is no known
// way to delay the server side of the connection (the OS does not wait for
// an accept() call), cannot be gauranteed that the connection will always
// complete asynchronously. If connecting asynchronously (normal behavior),
// expect completion once signalled by the event loop.
if (!connect_result_.has_value()) {
event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
ASSERT_TRUE(connect_result_.has_value());
}
EXPECT_TRUE(connect_result_.value().ok());
connect_result_.reset();
socket->Disconnect();
EXPECT_FALSE(connect_result_.has_value());
}
TEST_P(EventLoopTcpClientSocketTest, ErrorBeforeConnectAsync) {
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/this);
// Close the server socket.
EXPECT_TRUE(socket_api::Close(server_socket_descriptor_).ok());
server_socket_descriptor_ = kInvalidSocketFd;
socket->ConnectAsync();
if (!connect_result_.has_value()) {
event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
ASSERT_TRUE(connect_result_.has_value());
}
// Expect an error because server socket was closed before connection.
EXPECT_FALSE(connect_result_.value().ok());
}
TEST_P(EventLoopTcpClientSocketTest, ErrorDuringConnectAsync) {
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/this);
socket->ConnectAsync();
if (connect_result_.has_value()) {
// Not typical, but theoretically nothing to stop the connection from
// completing before the server socket is closed to trigger the error.
EXPECT_TRUE(connect_result_.value().ok());
return;
}
// Close the server socket.
EXPECT_TRUE(socket_api::Close(server_socket_descriptor_).ok());
server_socket_descriptor_ = kInvalidSocketFd;
// Expect an error once signalled.
EXPECT_FALSE(connect_result_.has_value());
event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
ASSERT_TRUE(connect_result_.has_value());
EXPECT_FALSE(connect_result_.value().ok());
}
TEST_P(EventLoopTcpClientSocketTest, Disconnect) {
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/nullptr);
ASSERT_TRUE(socket->ConnectBlocking().ok());
socket->Disconnect();
}
TEST_P(EventLoopTcpClientSocketTest, DisconnectCancelsConnectAsync) {
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/this);
socket->ConnectAsync();
if (connect_result_.has_value()) {
// Not typical, but theoretically nothing to stop the connection from
// completing before the server socket is closed to trigger the error.
EXPECT_TRUE(connect_result_.value().ok());
return;
}
socket->Disconnect();
// Expect immediate cancelled error.
ASSERT_TRUE(connect_result_.has_value());
EXPECT_TRUE(absl::IsCancelled(connect_result_.value()));
}
TEST_P(EventLoopTcpClientSocketTest, ConnectAndReconnect) {
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/nullptr);
ASSERT_TRUE(socket->ConnectBlocking().ok());
socket->Disconnect();
// Expect `socket` can reconnect now that it has been disconnected.
EXPECT_TRUE(socket->ConnectBlocking().ok());
socket->Disconnect();
}
void SendDataOnSocket(absl::string_view data, SocketFd connected_socket) {
while (!data.empty()) {
absl::StatusOr<absl::string_view> remainder =
socket_api::Send(connected_socket, data);
if (!remainder.ok()) {
return;
}
data = remainder.value();
}
}
TEST_P(EventLoopTcpClientSocketTest, Receive) {
std::string expected = {1, 2, 3, 4, 5, 6, 7, 8};
TestTcpServerSocketRunner runner(
server_socket_descriptor_, absl::bind_front(&SendDataOnSocket, expected));
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/nullptr);
ASSERT_TRUE(socket->ConnectBlocking().ok());
std::string received;
absl::StatusOr<quiche::QuicheMemSlice> data;
do {
data = socket->ReceiveBlocking(100);
ASSERT_TRUE(data.ok());
received.append(data.value().data(), data.value().length());
} while (!data.value().empty());
EXPECT_EQ(received, expected);
socket->Disconnect();
}
TEST_P(EventLoopTcpClientSocketTest, ReceiveAsync) {
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/this);
ASSERT_TRUE(socket->ConnectBlocking().ok());
// Start an async receive. Expect no immediate results because runner not yet
// setup to accept and send.
socket->ReceiveAsync(100);
EXPECT_FALSE(receive_result_.has_value());
// Send data from server.
std::string expected = {1, 2, 3, 4, 5, 6, 7, 8};
TestTcpServerSocketRunner runner(
server_socket_descriptor_, absl::bind_front(&SendDataOnSocket, expected));
EXPECT_FALSE(receive_result_.has_value());
for (int i = 0; i < 5 && !receive_result_.has_value(); ++i) {
event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
}
// Expect to receive at least some of the sent data.
ASSERT_TRUE(receive_result_.has_value());
ASSERT_TRUE(receive_result_.value().ok());
EXPECT_FALSE(receive_result_.value().value().empty());
std::string received(receive_result_.value().value().data(),
receive_result_.value().value().length());
// Get any remaining data via blocking calls.
absl::StatusOr<quiche::QuicheMemSlice> data;
do {
data = socket->ReceiveBlocking(100);
ASSERT_TRUE(data.ok());
received.append(data.value().data(), data.value().length());
} while (!data.value().empty());
EXPECT_EQ(received, expected);
receive_result_.reset();
socket->Disconnect();
EXPECT_FALSE(receive_result_.has_value());
}
TEST_P(EventLoopTcpClientSocketTest, DisconnectCancelsReceiveAsync) {
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/this);
ASSERT_TRUE(socket->ConnectBlocking().ok());
// Start an asynchronous read, expecting no completion because server never
// sends any data.
socket->ReceiveAsync(100);
EXPECT_FALSE(receive_result_.has_value());
// Disconnect and expect an immediate cancelled error.
socket->Disconnect();
ASSERT_TRUE(receive_result_.has_value());
ASSERT_FALSE(receive_result_.value().ok());
EXPECT_TRUE(absl::IsCancelled(receive_result_.value().status()));
}
// Receive from `connected_socket` until connection is closed, writing received
// data to `out_received`.
void ReceiveDataFromSocket(std::string* out_received,
SocketFd connected_socket) {
out_received->clear();
std::string buffer(100, 0);
absl::StatusOr<absl::Span<char>> received;
do {
received = socket_api::Receive(connected_socket, absl::MakeSpan(buffer));
QUICHE_CHECK(received.ok());
out_received->insert(out_received->end(), received.value().begin(),
received.value().end());
} while (!received.value().empty());
}
TEST_P(EventLoopTcpClientSocketTest, Send) {
std::string sent;
TestTcpServerSocketRunner runner(
server_socket_descriptor_,
absl::bind_front(&ReceiveDataFromSocket, &sent));
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/0,
/*async_visitor=*/nullptr);
ASSERT_TRUE(socket->ConnectBlocking().ok());
std::string expected = {1, 2, 3, 4, 5, 6, 7, 8};
EXPECT_TRUE(socket->SendBlocking(expected).ok());
socket->Disconnect();
runner.WaitForCompletion();
EXPECT_EQ(sent, expected);
}
TEST_P(EventLoopTcpClientSocketTest, SendAsync) {
// Use a small send buffer to improve chances of a send needing to be
// asynchronous.
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/4,
/*async_visitor=*/this);
ASSERT_TRUE(socket->ConnectBlocking().ok());
std::string data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
std::string expected;
// Repeatedly write to socket until it does not complete synchronously.
do {
expected.insert(expected.end(), data.begin(), data.end());
send_result_.reset();
socket->SendAsync(data);
ASSERT_TRUE(!send_result_.has_value() || send_result_.value().ok());
} while (send_result_.has_value());
// Begin receiving from server and expect more data to send.
std::string sent;
TestTcpServerSocketRunner runner(
server_socket_descriptor_,
absl::bind_front(&ReceiveDataFromSocket, &sent));
EXPECT_FALSE(send_result_.has_value());
for (int i = 0; i < 5 && !send_result_.has_value(); ++i) {
event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
}
ASSERT_TRUE(send_result_.has_value());
EXPECT_TRUE(send_result_.value().ok());
send_result_.reset();
socket->Disconnect();
EXPECT_FALSE(send_result_.has_value());
runner.WaitForCompletion();
EXPECT_EQ(sent, expected);
}
TEST_P(EventLoopTcpClientSocketTest, DisconnectCancelsSendAsync) {
// Use a small send buffer to improve chances of a send needing to be
// asynchronous.
std::unique_ptr<ConnectingClientSocket> socket =
socket_factory_.CreateTcpClientSocket(server_socket_address_,
/*receive_buffer_size=*/0,
/*send_buffer_size=*/4,
/*async_visitor=*/this);
ASSERT_TRUE(socket->ConnectBlocking().ok());
std::string data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// Repeatedly write to socket until it does not complete synchronously.
do {
send_result_.reset();
socket->SendAsync(data);
ASSERT_TRUE(!send_result_.has_value() || send_result_.value().ok());
} while (send_result_.has_value());
// Disconnect and expect immediate cancelled error.
socket->Disconnect();
ASSERT_TRUE(send_result_.has_value());
EXPECT_TRUE(absl::IsCancelled(send_result_.value()));
}
} // namespace
} // namespace quic::test