Add higher-level TCP client socket/factory support to QUICHE
Added some basic interfaces that could be used to abstract away implementation details of socket operations, including a SocketFactory that allows initiation details like QuicEventLoop to be hidden away from code interacting with it and creating/running sockets. Especially helpful to pass factory objects to code that doesn't need to know details around the event/async libraries in use or need to stay agnostic between them to build on multiple plaforms/environments.
Interfaces support my current needs (TCP client sockets), but left them deliberately open to future expansion to server sockets or UDP, etc. Created a QuicEventLoop-based implementation of the interfaces.
PiperOrigin-RevId: 466366913
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 30d068a..a928fe6 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -947,10 +947,14 @@
"quic/core/batch_writer/quic_batch_writer_test.h",
"quic/core/batch_writer/quic_gso_batch_writer.h",
"quic/core/batch_writer/quic_sendmmsg_batch_writer.h",
+ "quic/core/io/event_loop_socket_factory.h",
+ "quic/core/io/event_loop_tcp_client_socket.h",
"quic/core/io/quic_default_event_loop.h",
"quic/core/io/quic_event_loop.h",
"quic/core/io/quic_poll_event_loop.h",
"quic/core/io/socket.h",
+ "quic/core/io/socket_factory.h",
+ "quic/core/io/stream_client_socket.h",
"quic/core/quic_default_packet_writer.h",
"quic/core/quic_epoll_alarm_factory.h",
"quic/core/quic_epoll_clock.h",
@@ -981,6 +985,8 @@
"quic/core/batch_writer/quic_batch_writer_buffer.cc",
"quic/core/batch_writer/quic_gso_batch_writer.cc",
"quic/core/batch_writer/quic_sendmmsg_batch_writer.cc",
+ "quic/core/io/event_loop_socket_factory.cc",
+ "quic/core/io/event_loop_tcp_client_socket.cc",
"quic/core/io/quic_default_event_loop.cc",
"quic/core/io/quic_poll_event_loop.cc",
"quic/core/io/socket_posix.cc",
@@ -1302,6 +1308,7 @@
"quic/core/http/quic_spdy_client_session_test.cc",
"quic/core/http/quic_spdy_client_stream_test.cc",
"quic/core/http/quic_spdy_server_stream_base_test.cc",
+ "quic/core/io/event_loop_tcp_client_socket_test.cc",
"quic/core/io/quic_all_event_loops_test.cc",
"quic/core/io/quic_poll_event_loop_test.cc",
"quic/core/io/socket_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index daadaf6..2fd7580 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -947,10 +947,14 @@
"src/quiche/quic/core/batch_writer/quic_batch_writer_test.h",
"src/quiche/quic/core/batch_writer/quic_gso_batch_writer.h",
"src/quiche/quic/core/batch_writer/quic_sendmmsg_batch_writer.h",
+ "src/quiche/quic/core/io/event_loop_socket_factory.h",
+ "src/quiche/quic/core/io/event_loop_tcp_client_socket.h",
"src/quiche/quic/core/io/quic_default_event_loop.h",
"src/quiche/quic/core/io/quic_event_loop.h",
"src/quiche/quic/core/io/quic_poll_event_loop.h",
"src/quiche/quic/core/io/socket.h",
+ "src/quiche/quic/core/io/socket_factory.h",
+ "src/quiche/quic/core/io/stream_client_socket.h",
"src/quiche/quic/core/quic_default_packet_writer.h",
"src/quiche/quic/core/quic_epoll_alarm_factory.h",
"src/quiche/quic/core/quic_epoll_clock.h",
@@ -981,6 +985,8 @@
"src/quiche/quic/core/batch_writer/quic_batch_writer_buffer.cc",
"src/quiche/quic/core/batch_writer/quic_gso_batch_writer.cc",
"src/quiche/quic/core/batch_writer/quic_sendmmsg_batch_writer.cc",
+ "src/quiche/quic/core/io/event_loop_socket_factory.cc",
+ "src/quiche/quic/core/io/event_loop_tcp_client_socket.cc",
"src/quiche/quic/core/io/quic_default_event_loop.cc",
"src/quiche/quic/core/io/quic_poll_event_loop.cc",
"src/quiche/quic/core/io/socket_posix.cc",
@@ -1302,6 +1308,7 @@
"src/quiche/quic/core/http/quic_spdy_client_session_test.cc",
"src/quiche/quic/core/http/quic_spdy_client_stream_test.cc",
"src/quiche/quic/core/http/quic_spdy_server_stream_base_test.cc",
+ "src/quiche/quic/core/io/event_loop_tcp_client_socket_test.cc",
"src/quiche/quic/core/io/quic_all_event_loops_test.cc",
"src/quiche/quic/core/io/quic_poll_event_loop_test.cc",
"src/quiche/quic/core/io/socket_test.cc",
diff --git a/build/source_list.json b/build/source_list.json
index ca31469..02f608b 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -946,10 +946,14 @@
"quiche/quic/core/batch_writer/quic_batch_writer_test.h",
"quiche/quic/core/batch_writer/quic_gso_batch_writer.h",
"quiche/quic/core/batch_writer/quic_sendmmsg_batch_writer.h",
+ "quiche/quic/core/io/event_loop_socket_factory.h",
+ "quiche/quic/core/io/event_loop_tcp_client_socket.h",
"quiche/quic/core/io/quic_default_event_loop.h",
"quiche/quic/core/io/quic_event_loop.h",
"quiche/quic/core/io/quic_poll_event_loop.h",
"quiche/quic/core/io/socket.h",
+ "quiche/quic/core/io/socket_factory.h",
+ "quiche/quic/core/io/stream_client_socket.h",
"quiche/quic/core/quic_default_packet_writer.h",
"quiche/quic/core/quic_epoll_alarm_factory.h",
"quiche/quic/core/quic_epoll_clock.h",
@@ -980,6 +984,8 @@
"quiche/quic/core/batch_writer/quic_batch_writer_buffer.cc",
"quiche/quic/core/batch_writer/quic_gso_batch_writer.cc",
"quiche/quic/core/batch_writer/quic_sendmmsg_batch_writer.cc",
+ "quiche/quic/core/io/event_loop_socket_factory.cc",
+ "quiche/quic/core/io/event_loop_tcp_client_socket.cc",
"quiche/quic/core/io/quic_default_event_loop.cc",
"quiche/quic/core/io/quic_poll_event_loop.cc",
"quiche/quic/core/io/socket_posix.cc",
@@ -1301,6 +1307,7 @@
"quiche/quic/core/http/quic_spdy_client_session_test.cc",
"quiche/quic/core/http/quic_spdy_client_stream_test.cc",
"quiche/quic/core/http/quic_spdy_server_stream_base_test.cc",
+ "quiche/quic/core/io/event_loop_tcp_client_socket_test.cc",
"quiche/quic/core/io/quic_all_event_loops_test.cc",
"quiche/quic/core/io/quic_poll_event_loop_test.cc",
"quiche/quic/core/io/socket_test.cc",
diff --git a/quiche/quic/core/io/event_loop_socket_factory.cc b/quiche/quic/core/io/event_loop_socket_factory.cc
new file mode 100644
index 0000000..4d2508e
--- /dev/null
+++ b/quiche/quic/core/io/event_loop_socket_factory.cc
@@ -0,0 +1,36 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/core/io/event_loop_socket_factory.h"
+
+#include <memory>
+
+#include "quiche/quic/core/io/event_loop_tcp_client_socket.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/io/stream_client_socket.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+
+namespace quic {
+
+EventLoopSocketFactory::EventLoopSocketFactory(
+ QuicEventLoop* event_loop, quiche::QuicheBufferAllocator* buffer_allocator)
+ : event_loop_(event_loop), buffer_allocator_(buffer_allocator) {
+ QUICHE_DCHECK(event_loop_);
+ QUICHE_DCHECK(buffer_allocator_);
+}
+
+std::unique_ptr<StreamClientSocket>
+EventLoopSocketFactory::CreateTcpClientSocket(
+ const quic::QuicSocketAddress& peer_address,
+ QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
+ StreamClientSocket::AsyncVisitor* async_visitor) {
+ return std::make_unique<EventLoopTcpClientSocket>(
+ peer_address, receive_buffer_size, send_buffer_size, event_loop_,
+ buffer_allocator_, async_visitor);
+}
+
+} // namespace quic
diff --git a/quiche/quic/core/io/event_loop_socket_factory.h b/quiche/quic/core/io/event_loop_socket_factory.h
new file mode 100644
index 0000000..6882e80
--- /dev/null
+++ b/quiche/quic/core/io/event_loop_socket_factory.h
@@ -0,0 +1,41 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_IO_EVENT_LOOP_SOCKET_FACTORY_H_
+#define QUICHE_QUIC_CORE_IO_EVENT_LOOP_SOCKET_FACTORY_H_
+
+#include <memory>
+
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/io/socket_factory.h"
+#include "quiche/quic/core/io/stream_client_socket.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+
+namespace quic {
+
+// A socket factory that creates sockets implemented using an underlying
+// QuicEventLoop.
+class QUICHE_EXPORT_PRIVATE EventLoopSocketFactory : public SocketFactory {
+ public:
+ // `event_loop` and `buffer_allocator` must outlive the created factory.
+ EventLoopSocketFactory(QuicEventLoop* event_loop,
+ quiche::QuicheBufferAllocator* buffer_allocator);
+
+ // SocketFactory:
+ std::unique_ptr<StreamClientSocket> CreateTcpClientSocket(
+ const quic::QuicSocketAddress& peer_address,
+ QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
+ StreamClientSocket::AsyncVisitor* async_visitor) override;
+
+ private:
+ QuicEventLoop* const event_loop_; // unowned
+ quiche::QuicheBufferAllocator* buffer_allocator_; // unowned
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_CORE_IO_EVENT_LOOP_SOCKET_FACTORY_H_
diff --git a/quiche/quic/core/io/event_loop_tcp_client_socket.cc b/quiche/quic/core/io/event_loop_tcp_client_socket.cc
new file mode 100644
index 0000000..84263bc
--- /dev/null
+++ b/quiche/quic/core/io/event_loop_tcp_client_socket.cc
@@ -0,0 +1,610 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/core/io/event_loop_tcp_client_socket.h"
+
+#include <limits>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "absl/types/variant.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/io/socket.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace quic {
+
+EventLoopTcpClientSocket::EventLoopTcpClientSocket(
+ const quic::QuicSocketAddress& peer_address,
+ QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
+ QuicEventLoop* event_loop, quiche::QuicheBufferAllocator* buffer_allocator,
+ AsyncVisitor* async_visitor)
+ : peer_address_(peer_address),
+ receive_buffer_size_(receive_buffer_size),
+ send_buffer_size_(send_buffer_size),
+ event_loop_(event_loop),
+ buffer_allocator_(buffer_allocator),
+ async_visitor_(async_visitor) {
+ QUICHE_DCHECK(event_loop_);
+ QUICHE_DCHECK(buffer_allocator_);
+}
+
+EventLoopTcpClientSocket::~EventLoopTcpClientSocket() {
+ // Connected socket must be closed via Disconnect() before destruction. Cannot
+ // safely recover if state indicates caller may be expecting async callbacks.
+ QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
+ QUICHE_DCHECK(!receive_max_size_.has_value());
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+ if (descriptor_ != kInvalidSocketFd) {
+ QUICHE_BUG(quic_event_loop_tcp_socket_invalid_destruction)
+ << "Must call Disconnect() on connected TCP socket before destruction.";
+ Close();
+ }
+
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+ QUICHE_DCHECK(send_remaining_.empty());
+}
+
+absl::Status EventLoopTcpClientSocket::ConnectBlocking() {
+ QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+ QUICHE_DCHECK(!receive_max_size_.has_value());
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+ absl::Status status = Open();
+ if (!status.ok()) {
+ return status;
+ }
+
+ status = socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
+ if (!status.ok()) {
+ QUICHE_LOG_FIRST_N(WARNING, 100)
+ << "Failed to set socket to address: " << peer_address_.ToString()
+ << " as blocking for connect with error: " << status;
+ Close();
+ return status;
+ }
+
+ status = DoInitialConnect();
+
+ if (absl::IsUnavailable(status)) {
+ QUICHE_LOG_FIRST_N(ERROR, 100)
+ << "Non-blocking connect to should-be blocking socket to address:"
+ << peer_address_.ToString() << ".";
+ Close();
+ connect_status_ = ConnectStatus::kNotConnected;
+ return status;
+ } else if (!status.ok()) {
+ // DoInitialConnect() closes the socket on failures.
+ QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+ return status;
+ }
+
+ status = socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
+ if (!status.ok()) {
+ QUICHE_LOG_FIRST_N(WARNING, 100)
+ << "Failed to return socket to address: " << peer_address_.ToString()
+ << " to non-blocking after connect with error: " << status;
+ Close();
+ connect_status_ = ConnectStatus::kNotConnected;
+ }
+
+ QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
+ return status;
+}
+
+void EventLoopTcpClientSocket::ConnectAsync() {
+ QUICHE_DCHECK(async_visitor_);
+ QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+ QUICHE_DCHECK(!receive_max_size_.has_value());
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+ absl::Status status = Open();
+ if (!status.ok()) {
+ async_visitor_->ConnectComplete(status);
+ return;
+ }
+
+ FinishOrRearmAsyncConnect(DoInitialConnect());
+}
+
+void EventLoopTcpClientSocket::Disconnect() {
+ QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ != ConnectStatus::kNotConnected);
+
+ Close();
+ QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
+
+ // Reset all state before invoking any callbacks.
+ bool require_connect_callback = connect_status_ == ConnectStatus::kConnecting;
+ connect_status_ = ConnectStatus::kNotConnected;
+ bool require_receive_callback = receive_max_size_.has_value();
+ receive_max_size_.reset();
+ bool require_send_callback =
+ !absl::holds_alternative<absl::monostate>(send_data_);
+ send_data_ = absl::monostate();
+ send_remaining_ = "";
+
+ if (require_connect_callback) {
+ QUICHE_DCHECK(async_visitor_);
+ async_visitor_->ConnectComplete(absl::CancelledError());
+ }
+ if (require_receive_callback) {
+ QUICHE_DCHECK(async_visitor_);
+ async_visitor_->ReceiveComplete(absl::CancelledError());
+ }
+ if (require_send_callback) {
+ QUICHE_DCHECK(async_visitor_);
+ async_visitor_->SendComplete(absl::CancelledError());
+ }
+}
+
+absl::StatusOr<quiche::QuicheMemSlice>
+EventLoopTcpClientSocket::ReceiveBlocking(QuicByteCount max_size) {
+ QUICHE_DCHECK_GT(max_size, 0u);
+ QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+ QUICHE_DCHECK(!receive_max_size_.has_value());
+
+ absl::Status status =
+ socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
+ if (!status.ok()) {
+ QUICHE_LOG_FIRST_N(WARNING, 100)
+ << "Failed to set socket to address: " << peer_address_.ToString()
+ << " as blocking for receive with error: " << status;
+ return status;
+ }
+
+ receive_max_size_ = max_size;
+ absl::StatusOr<quiche::QuicheMemSlice> buffer = ReceiveInternal();
+
+ if (!buffer.ok() && absl::IsUnavailable(buffer.status())) {
+ QUICHE_LOG_FIRST_N(ERROR, 100)
+ << "Non-blocking receive from should-be blocking socket to address:"
+ << peer_address_.ToString() << ".";
+ receive_max_size_.reset();
+ } else {
+ QUICHE_DCHECK(!receive_max_size_.has_value());
+ }
+
+ absl::Status set_non_blocking_status =
+ socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
+ if (!set_non_blocking_status.ok()) {
+ QUICHE_LOG_FIRST_N(WARNING, 100)
+ << "Failed to return socket to address: " << peer_address_.ToString()
+ << " to non-blocking after receive with error: "
+ << set_non_blocking_status;
+ return set_non_blocking_status;
+ }
+
+ return buffer;
+}
+
+void EventLoopTcpClientSocket::ReceiveAsync(QuicByteCount max_size) {
+ QUICHE_DCHECK(async_visitor_);
+ QUICHE_DCHECK_GT(max_size, 0u);
+ QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+ QUICHE_DCHECK(!receive_max_size_.has_value());
+
+ receive_max_size_ = max_size;
+
+ FinishOrRearmAsyncReceive(ReceiveInternal());
+}
+
+absl::Status EventLoopTcpClientSocket::SendBlocking(std::string data) {
+ QUICHE_DCHECK(!data.empty());
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+ send_data_ = std::move(data);
+ return SendBlockingInternal();
+}
+
+absl::Status EventLoopTcpClientSocket::SendBlocking(
+ quiche::QuicheMemSlice data) {
+ QUICHE_DCHECK(!data.empty());
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+ send_data_ = std::move(data);
+ return SendBlockingInternal();
+}
+
+void EventLoopTcpClientSocket::SendAsync(std::string data) {
+ QUICHE_DCHECK(!data.empty());
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+ send_data_ = std::move(data);
+ send_remaining_ = absl::get<std::string>(send_data_);
+
+ FinishOrRearmAsyncSend(SendInternal());
+}
+
+void EventLoopTcpClientSocket::SendAsync(quiche::QuicheMemSlice data) {
+ QUICHE_DCHECK(!data.empty());
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+ send_data_ = std::move(data);
+ send_remaining_ =
+ absl::get<quiche::QuicheMemSlice>(send_data_).AsStringView();
+
+ FinishOrRearmAsyncSend(SendInternal());
+}
+
+void EventLoopTcpClientSocket::OnSocketEvent(QuicEventLoop* event_loop,
+ SocketFd fd,
+ QuicSocketEventMask events) {
+ QUICHE_DCHECK_EQ(event_loop, event_loop_);
+ QUICHE_DCHECK_EQ(fd, descriptor_);
+
+ if (connect_status_ == ConnectStatus::kConnecting &&
+ (events & (kSocketEventWritable | kSocketEventError))) {
+ FinishOrRearmAsyncConnect(GetConnectResult());
+ return;
+ }
+
+ if (receive_max_size_.has_value() &&
+ (events & (kSocketEventReadable | kSocketEventError))) {
+ FinishOrRearmAsyncReceive(ReceiveInternal());
+ }
+ if (!send_remaining_.empty() &&
+ (events & (kSocketEventWritable | kSocketEventError))) {
+ FinishOrRearmAsyncSend(SendInternal());
+ }
+}
+
+absl::Status EventLoopTcpClientSocket::Open() {
+ QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+ QUICHE_DCHECK(!receive_max_size_.has_value());
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+ QUICHE_DCHECK(send_remaining_.empty());
+
+ absl::StatusOr<SocketFd> descriptor = socket_api::CreateSocket(
+ peer_address_.host().address_family(), socket_api::SocketProtocol::kTcp,
+ /*blocking=*/false);
+ if (!descriptor.ok()) {
+ QUICHE_DVLOG(1) << "Failed to open socket for connection to address: "
+ << peer_address_.ToString()
+ << " with error: " << descriptor.status();
+ return descriptor.status();
+ }
+ QUICHE_DCHECK_NE(descriptor.value(), kInvalidSocketFd);
+
+ descriptor_ = descriptor.value();
+
+ if (async_visitor_) {
+ bool registered;
+ if (event_loop_->SupportsEdgeTriggered()) {
+ registered = event_loop_->RegisterSocket(
+ descriptor_,
+ kSocketEventReadable | kSocketEventWritable | kSocketEventError,
+ this);
+ } else {
+ // Just register the socket without any armed events for now. Will rearm
+ // with specific events as needed. Registering now before events are
+ // needed makes it easier to ensure the socket is registered only once
+ // and can always be unregistered on socket close.
+ registered = event_loop_->RegisterSocket(descriptor_, /*events=*/0, this);
+ }
+ QUICHE_DCHECK(registered);
+ }
+
+ if (receive_buffer_size_ != 0) {
+ absl::Status status =
+ socket_api::SetReceiveBufferSize(descriptor_, receive_buffer_size_);
+ if (!status.ok()) {
+ QUICHE_LOG_FIRST_N(WARNING, 100)
+ << "Failed to set receive buffer size to: " << receive_buffer_size_
+ << " for socket to address: " << peer_address_.ToString()
+ << " with error: " << status;
+ Close();
+ return status;
+ }
+ }
+
+ if (send_buffer_size_ != 0) {
+ absl::Status status =
+ socket_api::SetSendBufferSize(descriptor_, send_buffer_size_);
+ if (!status.ok()) {
+ QUICHE_LOG_FIRST_N(WARNING, 100)
+ << "Failed to set send buffer size to: " << send_buffer_size_
+ << " for socket to address: " << peer_address_.ToString()
+ << " with error: " << status;
+ Close();
+ return status;
+ }
+ }
+
+ return absl::OkStatus();
+}
+
+void EventLoopTcpClientSocket::Close() {
+ QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+
+ bool unregistered = event_loop_->UnregisterSocket(descriptor_);
+ QUICHE_DCHECK_EQ(unregistered, !!async_visitor_);
+
+ absl::Status status = socket_api::Close(descriptor_);
+ if (!status.ok()) {
+ QUICHE_LOG_FIRST_N(WARNING, 100)
+ << "Could not close socket to address: " << peer_address_.ToString()
+ << " with error: " << status;
+ }
+
+ descriptor_ = kInvalidSocketFd;
+}
+
+absl::Status EventLoopTcpClientSocket::DoInitialConnect() {
+ QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+ QUICHE_DCHECK(!receive_max_size_.has_value());
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+ absl::Status connect_result = socket_api::Connect(descriptor_, peer_address_);
+
+ if (connect_result.ok()) {
+ connect_status_ = ConnectStatus::kConnected;
+ } else if (absl::IsUnavailable(connect_result)) {
+ connect_status_ = ConnectStatus::kConnecting;
+ } else {
+ QUICHE_DVLOG(1) << "Synchronously failed to connect socket to address: "
+ << peer_address_.ToString()
+ << " with error: " << connect_result;
+ Close();
+ connect_status_ = ConnectStatus::kNotConnected;
+ }
+
+ return connect_result;
+}
+
+absl::Status EventLoopTcpClientSocket::GetConnectResult() {
+ QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnecting);
+ QUICHE_DCHECK(!receive_max_size_.has_value());
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+ absl::Status error = socket_api::GetSocketError(descriptor_);
+
+ if (!error.ok()) {
+ QUICHE_DVLOG(1) << "Asynchronously failed to connect socket to address: "
+ << peer_address_.ToString() << " with error: " << error;
+ Close();
+ connect_status_ = ConnectStatus::kNotConnected;
+ return error;
+ }
+
+ // Peek at one byte to confirm the connection is actually alive. Motivation:
+ // 1) Plausibly could have a lot of cases where the connection operation
+ // itself technically succeeds but the socket then quickly fails. Don't
+ // want to claim connection success here if, by the time this code is
+ // running after event triggers and such, the socket has already failed.
+ // Lot of undefined room around whether or not such errors would be saved
+ // into SO_ERROR and returned by socket_api::GetSocketError().
+ // 2) With the various platforms and event systems involved, less than 100%
+ // trust that it's impossible to end up in this method before the async
+ // connect has completed/errored. Given that Connect() and GetSocketError()
+ // does not difinitevely differentiate between success and
+ // still-in-progress, and given that there's a very simple and performant
+ // way to positively confirm the socket is connected (peek), do that here.
+ // (Could consider making the not-connected case a QUIC_BUG if a way is
+ // found to differentiate it from (1).)
+ absl::StatusOr<bool> peek_data = OneBytePeek();
+ if (peek_data.ok() || absl::IsUnavailable(peek_data.status())) {
+ connect_status_ = ConnectStatus::kConnected;
+ } else {
+ error = peek_data.status();
+ QUICHE_LOG_FIRST_N(WARNING, 100)
+ << "Socket to address: " << peer_address_.ToString()
+ << " signalled writable after connect and no connect error found, "
+ "but socket does not appear connected with error: "
+ << error;
+ Close();
+ connect_status_ = ConnectStatus::kNotConnected;
+ }
+
+ return error;
+}
+
+void EventLoopTcpClientSocket::FinishOrRearmAsyncConnect(absl::Status status) {
+ if (absl::IsUnavailable(status)) {
+ if (!event_loop_->SupportsEdgeTriggered()) {
+ bool result = event_loop_->RearmSocket(
+ descriptor_, kSocketEventWritable | kSocketEventError);
+ QUICHE_DCHECK(result);
+ }
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnecting);
+ } else {
+ QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
+ async_visitor_->ConnectComplete(status);
+ }
+}
+
+absl::StatusOr<quiche::QuicheMemSlice>
+EventLoopTcpClientSocket::ReceiveInternal() {
+ QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+ QUICHE_CHECK(receive_max_size_.has_value());
+ QUICHE_DCHECK_GE(receive_max_size_.value(), 1u);
+ QUICHE_DCHECK_LE(receive_max_size_.value(),
+ std::numeric_limits<size_t>::max());
+
+ // Before allocating a buffer, do a 1-byte peek to determine if needed.
+ if (receive_max_size_.value() > 1) {
+ absl::StatusOr<bool> peek_data = OneBytePeek();
+ if (!peek_data.ok()) {
+ if (!absl::IsUnavailable(peek_data.status())) {
+ receive_max_size_.reset();
+ }
+ return peek_data.status();
+ } else if (!peek_data.value()) {
+ receive_max_size_.reset();
+ return quiche::QuicheMemSlice();
+ }
+ }
+
+ quiche::QuicheBuffer buffer(buffer_allocator_, receive_max_size_.value());
+ absl::StatusOr<absl::Span<char>> received = socket_api::Receive(
+ descriptor_, absl::MakeSpan(buffer.data(), buffer.size()));
+
+ if (received.ok()) {
+ QUICHE_DCHECK_LE(received.value().size(), buffer.size());
+ QUICHE_DCHECK_EQ(received.value().data(), buffer.data());
+
+ receive_max_size_.reset();
+ return quiche::QuicheMemSlice(
+ quiche::QuicheBuffer(buffer.Release(), received.value().size()));
+ } else {
+ if (!absl::IsUnavailable(received.status())) {
+ QUICHE_DVLOG(1) << "Failed to receive from socket to address: "
+ << peer_address_.ToString()
+ << " with error: " << received.status();
+ receive_max_size_.reset();
+ }
+ return received.status();
+ }
+}
+
+void EventLoopTcpClientSocket::FinishOrRearmAsyncReceive(
+ absl::StatusOr<quiche::QuicheMemSlice> buffer) {
+ QUICHE_DCHECK(async_visitor_);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+
+ if (!buffer.ok() && absl::IsUnavailable(buffer.status())) {
+ if (!event_loop_->SupportsEdgeTriggered()) {
+ bool result = event_loop_->RearmSocket(
+ descriptor_, kSocketEventReadable | kSocketEventError);
+ QUICHE_DCHECK(result);
+ }
+ QUICHE_DCHECK(receive_max_size_.has_value());
+ } else {
+ QUICHE_DCHECK(!receive_max_size_.has_value());
+ async_visitor_->ReceiveComplete(std::move(buffer));
+ }
+}
+
+absl::StatusOr<bool> EventLoopTcpClientSocket::OneBytePeek() {
+ QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+
+ char peek_buffer;
+ absl::StatusOr<absl::Span<char>> peek_received = socket_api::Receive(
+ descriptor_, absl::MakeSpan(&peek_buffer, /*size=*/1), /*peek=*/true);
+ if (!peek_received.ok()) {
+ return peek_received.status();
+ } else {
+ return !peek_received.value().empty();
+ }
+}
+
+absl::Status EventLoopTcpClientSocket::SendBlockingInternal() {
+ QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+ QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
+ QUICHE_DCHECK(send_remaining_.empty());
+
+ absl::Status status =
+ socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
+ if (!status.ok()) {
+ QUICHE_LOG_FIRST_N(WARNING, 100)
+ << "Failed to set socket to address: " << peer_address_.ToString()
+ << " as blocking for send with error: " << status;
+ send_data_ = absl::monostate();
+ return status;
+ }
+
+ if (absl::holds_alternative<std::string>(send_data_)) {
+ send_remaining_ = absl::get<std::string>(send_data_);
+ } else {
+ send_remaining_ =
+ absl::get<quiche::QuicheMemSlice>(send_data_).AsStringView();
+ }
+
+ status = SendInternal();
+ if (absl::IsUnavailable(status)) {
+ QUICHE_LOG_FIRST_N(ERROR, 100)
+ << "Non-blocking send for should-be blocking socket to address:"
+ << peer_address_.ToString();
+ send_data_ = absl::monostate();
+ send_remaining_ = "";
+ } else {
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+ QUICHE_DCHECK(send_remaining_.empty());
+ }
+
+ absl::Status set_non_blocking_status =
+ socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
+ if (!set_non_blocking_status.ok()) {
+ QUICHE_LOG_FIRST_N(WARNING, 100)
+ << "Failed to return socket to address: " << peer_address_.ToString()
+ << " to non-blocking after send with error: "
+ << set_non_blocking_status;
+ return set_non_blocking_status;
+ }
+
+ return status;
+}
+
+absl::Status EventLoopTcpClientSocket::SendInternal() {
+ QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+ QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
+ QUICHE_DCHECK(!send_remaining_.empty());
+
+ // Repeat send until all data sent, unavailable, or error.
+ while (!send_remaining_.empty()) {
+ absl::StatusOr<absl::string_view> remainder =
+ socket_api::Send(descriptor_, send_remaining_);
+
+ if (remainder.ok()) {
+ QUICHE_DCHECK(remainder.value().empty() ||
+ (remainder.value().data() >= send_remaining_.data() &&
+ remainder.value().data() <
+ send_remaining_.data() + send_remaining_.size()));
+ QUICHE_DCHECK(remainder.value().empty() ||
+ (remainder.value().data() + remainder.value().size() ==
+ send_remaining_.data() + send_remaining_.size()));
+ send_remaining_ = remainder.value();
+ } else {
+ if (!absl::IsUnavailable(remainder.status())) {
+ QUICHE_DVLOG(1) << "Failed to send to socket to address: "
+ << peer_address_.ToString()
+ << " with error: " << remainder.status();
+ send_data_ = absl::monostate();
+ send_remaining_ = "";
+ }
+ return remainder.status();
+ }
+ }
+
+ send_data_ = absl::monostate();
+ return absl::OkStatus();
+}
+
+void EventLoopTcpClientSocket::FinishOrRearmAsyncSend(absl::Status status) {
+ QUICHE_DCHECK(async_visitor_);
+ QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+
+ if (absl::IsUnavailable(status)) {
+ if (!event_loop_->SupportsEdgeTriggered()) {
+ bool result = event_loop_->RearmSocket(
+ descriptor_, kSocketEventWritable | kSocketEventError);
+ QUICHE_DCHECK(result);
+ }
+ QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
+ QUICHE_DCHECK(!send_remaining_.empty());
+ } else {
+ QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+ QUICHE_DCHECK(send_remaining_.empty());
+ async_visitor_->SendComplete(status);
+ }
+}
+
+} // namespace quic
diff --git a/quiche/quic/core/io/event_loop_tcp_client_socket.h b/quiche/quic/core/io/event_loop_tcp_client_socket.h
new file mode 100644
index 0000000..2d50087
--- /dev/null
+++ b/quiche/quic/core/io/event_loop_tcp_client_socket.h
@@ -0,0 +1,103 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_IO_EVENT_LOOP_TCP_CLIENT_SOCKET_H_
+#define QUICHE_QUIC_CORE_IO_EVENT_LOOP_TCP_CLIENT_SOCKET_H_
+
+#include <string>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "absl/types/variant.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/io/stream_client_socket.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+
+namespace quic {
+
+// A TCP client socket implemented using an underlying QuicEventLoop.
+class QUICHE_EXPORT_PRIVATE EventLoopTcpClientSocket
+ : public StreamClientSocket,
+ public QuicSocketEventListener {
+ public:
+ // Will use platform default buffer size if `receive_buffer_size` or
+ // `send_buffer_size` is zero. `async_visitor` may be null if no async
+ // operations will be requested. `event_loop`, `buffer_allocator`, and
+ // `async_visitor` (if non-null) must outlive the created socket.
+ EventLoopTcpClientSocket(const quic::QuicSocketAddress& peer_address,
+ QuicByteCount receive_buffer_size,
+ QuicByteCount send_buffer_size,
+ QuicEventLoop* event_loop,
+ quiche::QuicheBufferAllocator* buffer_allocator,
+ AsyncVisitor* async_visitor);
+
+ ~EventLoopTcpClientSocket() override;
+
+ // StreamClientSocket:
+ absl::Status ConnectBlocking() override;
+ void ConnectAsync() override;
+ void Disconnect() override;
+
+ // Socket:
+ absl::StatusOr<quiche::QuicheMemSlice> ReceiveBlocking(
+ QuicByteCount max_size) override;
+ void ReceiveAsync(QuicByteCount max_size) override;
+ absl::Status SendBlocking(std::string data) override;
+ absl::Status SendBlocking(quiche::QuicheMemSlice data) override;
+ void SendAsync(std::string data) override;
+ void SendAsync(quiche::QuicheMemSlice data) override;
+
+ // QuicSocketEventListener:
+ void OnSocketEvent(QuicEventLoop* event_loop, SocketFd fd,
+ QuicSocketEventMask events) override;
+
+ private:
+ enum class ConnectStatus {
+ kNotConnected,
+ kConnecting,
+ kConnected,
+ };
+
+ absl::Status Open();
+ void Close();
+ absl::Status DoInitialConnect();
+ absl::Status GetConnectResult();
+ void FinishOrRearmAsyncConnect(absl::Status status);
+ absl::StatusOr<quiche::QuicheMemSlice> ReceiveInternal();
+ void FinishOrRearmAsyncReceive(absl::StatusOr<quiche::QuicheMemSlice> buffer);
+ // Returns `true` if a byte received, or `false` if successfully received
+ // empty data.
+ absl::StatusOr<bool> OneBytePeek();
+ absl::Status SendBlockingInternal();
+ absl::Status SendInternal();
+ void FinishOrRearmAsyncSend(absl::Status status);
+
+ const QuicSocketAddress peer_address_;
+ const QuicByteCount receive_buffer_size_;
+ const QuicByteCount send_buffer_size_;
+ QuicEventLoop* const event_loop_; // unowned
+ quiche::QuicheBufferAllocator* buffer_allocator_; // unowned
+ AsyncVisitor* const async_visitor_; // unowned, potentially null
+
+ SocketFd descriptor_ = kInvalidSocketFd;
+ ConnectStatus connect_status_ = ConnectStatus::kNotConnected;
+
+ // Only set while receive in progress or pending, otherwise nullopt.
+ absl::optional<QuicByteCount> receive_max_size_;
+
+ // Only contains data while send in progress or pending, otherwise monostate.
+ absl::variant<absl::monostate, std::string, quiche::QuicheMemSlice>
+ send_data_;
+ // Points to the unsent portion of `send_data_` while send in progress or
+ // pending, otherwise empty.
+ absl::string_view send_remaining_;
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_CORE_IO_EVENT_LOOP_TCP_CLIENT_SOCKET_H_
diff --git a/quiche/quic/core/io/event_loop_tcp_client_socket_test.cc b/quiche/quic/core/io/event_loop_tcp_client_socket_test.cc
new file mode 100644
index 0000000..f3c0800
--- /dev/null
+++ b/quiche/quic/core/io/event_loop_tcp_client_socket_test.cc
@@ -0,0 +1,523 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/core/io/event_loop_tcp_client_socket.h"
+
+#include <functional>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "absl/functional/bind_front.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "absl/types/span.h"
+#include "quiche/quic/core/io/event_loop_socket_factory.h"
+#include "quiche/quic/core/io/quic_default_event_loop.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/io/socket.h"
+#include "quiche/quic/core/io/stream_client_socket.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/platform/api/quic_ip_address_family.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/quic/test_tools/mock_clock.h"
+#include "quiche/quic/test_tools/quic_test_utils.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+#include "quiche/common/platform/api/quiche_mutex.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/platform/api/quiche_test_loopback.h"
+#include "quiche/common/platform/api/quiche_thread.h"
+#include "quiche/common/simple_buffer_allocator.h"
+
+namespace quic::test {
+namespace {
+
+bool CreateListeningServerSocket(SocketFd* out_socket_descriptor,
+ QuicSocketAddress* out_socket_address) {
+ QUICHE_CHECK(out_socket_descriptor);
+ QUICHE_CHECK(out_socket_address);
+
+ absl::StatusOr<SocketFd> socket = socket_api::CreateSocket(
+ quiche::TestLoopback().address_family(), socket_api::SocketProtocol::kTcp,
+ /*blocking=*/true);
+ QUICHE_CHECK(socket.ok());
+
+ // Set an extremely small receive buffer size to increase the odds of buffers
+ // filling up when testing asynchronous writes.
+ static const QuicByteCount kReceiveBufferSize = 2;
+ absl::Status result =
+ socket_api::SetReceiveBufferSize(socket.value(), kReceiveBufferSize);
+ QUICHE_CHECK(result.ok());
+
+ QuicSocketAddress bind_address(quiche::TestLoopback(), /*port=*/0);
+ result = socket_api::Bind(socket.value(), bind_address);
+ QUICHE_CHECK(result.ok());
+
+ absl::StatusOr<QuicSocketAddress> socket_address =
+ socket_api::GetSocketAddress(socket.value());
+ QUICHE_CHECK(socket_address.ok());
+
+ result = socket_api::Listen(socket.value(), /*backlog=*/1);
+ QUICHE_CHECK(result.ok());
+
+ *out_socket_descriptor = socket.value();
+ *out_socket_address = std::move(socket_address).value();
+ return true;
+}
+
+class TestTcpServerSocketRunner : public quiche::QuicheThread {
+ public:
+ using SocketBehavior = std::function<void(SocketFd connected_socket)>;
+
+ // On construction, spins a separate thread to accept a connection from
+ // `server_socket_descriptor`, runs `behavior` with that connection, and then
+ // closes the accepted connection socket. If `allow_accept_failure` is true,
+ // will silently stop if an error is encountered accepting the connection.
+ TestTcpServerSocketRunner(SocketFd server_socket_descriptor,
+ SocketBehavior behavior,
+ bool allow_accept_failure = false)
+ : QuicheThread("TestTcpServerSocketRunner"),
+ server_socket_descriptor_(server_socket_descriptor),
+ behavior_(std::move(behavior)),
+ allow_accept_failure_(allow_accept_failure) {
+ Start();
+ }
+
+ ~TestTcpServerSocketRunner() override { WaitForCompletion(); }
+
+ void WaitForCompletion() { completion_notification_.WaitForNotification(); }
+
+ protected:
+ void Run() override {
+ if (AcceptSocket()) {
+ behavior_(connection_socket_descriptor_);
+ CloseSocket();
+ } else {
+ QUICHE_CHECK(allow_accept_failure_);
+ }
+
+ completion_notification_.Notify();
+ }
+
+ private:
+ bool AcceptSocket() {
+ absl::StatusOr<socket_api::AcceptResult> connection_socket =
+ socket_api::Accept(server_socket_descriptor_, /*blocking=*/true);
+ if (connection_socket.ok()) {
+ connection_socket_descriptor_ = connection_socket.value().fd;
+ }
+ return connection_socket.ok();
+ }
+
+ void CloseSocket() {
+ QUICHE_CHECK(socket_api::Close(connection_socket_descriptor_).ok());
+ }
+
+ const SocketFd server_socket_descriptor_;
+ const SocketBehavior behavior_;
+ const bool allow_accept_failure_;
+
+ SocketFd connection_socket_descriptor_;
+
+ quiche::QuicheNotification completion_notification_;
+};
+
+class EventLoopTcpClientSocketTest
+ : public quiche::test::QuicheTestWithParam<QuicEventLoopFactory*>,
+ public StreamClientSocket::AsyncVisitor {
+ public:
+ void SetUp() override {
+ QUICHE_CHECK(CreateListeningServerSocket(&server_socket_descriptor_,
+ &server_socket_address_));
+ }
+
+ void TearDown() override {
+ if (server_socket_descriptor_ != kInvalidSocketFd) {
+ QUICHE_CHECK(socket_api::Close(server_socket_descriptor_).ok());
+ }
+ }
+
+ void ConnectComplete(absl::Status status) override {
+ QUICHE_CHECK(!connect_result_.has_value());
+ connect_result_ = std::move(status);
+ }
+
+ void ReceiveComplete(absl::StatusOr<quiche::QuicheMemSlice> data) override {
+ QUICHE_CHECK(!receive_result_.has_value());
+ receive_result_ = std::move(data);
+ }
+
+ void SendComplete(absl::Status status) override {
+ QUICHE_CHECK(!send_result_.has_value());
+ send_result_ = std::move(status);
+ }
+
+ protected:
+ SocketFd server_socket_descriptor_ = kInvalidSocketFd;
+ QuicSocketAddress server_socket_address_;
+
+ MockClock clock_;
+ std::unique_ptr<QuicEventLoop> event_loop_ = GetParam()->Create(&clock_);
+ EventLoopSocketFactory socket_factory_{event_loop_.get(),
+ quiche::SimpleBufferAllocator::Get()};
+
+ absl::optional<absl::Status> connect_result_;
+ absl::optional<absl::StatusOr<quiche::QuicheMemSlice>> receive_result_;
+ absl::optional<absl::Status> send_result_;
+};
+
+std::string GetTestParamName(
+ ::testing::TestParamInfo<QuicEventLoopFactory*> info) {
+ return EscapeTestParamName(info.param->GetName());
+}
+
+INSTANTIATE_TEST_SUITE_P(EventLoopTcpClientSocketTests,
+ EventLoopTcpClientSocketTest,
+ ::testing::ValuesIn(GetAllSupportedEventLoops()),
+ &GetTestParamName);
+
+TEST_P(EventLoopTcpClientSocketTest, Connect) {
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/nullptr);
+
+ // No socket runner to accept the connection for the server, but that is not
+ // expected to be necessary for the connection to complete from the client.
+ EXPECT_TRUE(socket->ConnectBlocking().ok());
+
+ socket->Disconnect();
+}
+
+TEST_P(EventLoopTcpClientSocketTest, ConnectAsync) {
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/this);
+
+ socket->ConnectAsync();
+
+ // Synchronous completion not normally expected, but since there is no known
+ // way to delay the server side of the connection (the OS does not wait for
+ // an accept() call), cannot be gauranteed that the connection will always
+ // complete asynchronously. If connecting asynchronously (normal behavior),
+ // expect completion once signalled by the event loop.
+ if (!connect_result_.has_value()) {
+ event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
+ ASSERT_TRUE(connect_result_.has_value());
+ }
+ EXPECT_TRUE(connect_result_.value().ok());
+
+ connect_result_.reset();
+ socket->Disconnect();
+ EXPECT_FALSE(connect_result_.has_value());
+}
+
+TEST_P(EventLoopTcpClientSocketTest, ErrorBeforeConnectAsync) {
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/this);
+
+ // Close the server socket.
+ EXPECT_TRUE(socket_api::Close(server_socket_descriptor_).ok());
+ server_socket_descriptor_ = kInvalidSocketFd;
+
+ socket->ConnectAsync();
+ if (!connect_result_.has_value()) {
+ event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
+ ASSERT_TRUE(connect_result_.has_value());
+ }
+
+ // Expect an error because server socket was closed before connection.
+ EXPECT_FALSE(connect_result_.value().ok());
+}
+
+TEST_P(EventLoopTcpClientSocketTest, ErrorDuringConnectAsync) {
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/this);
+
+ socket->ConnectAsync();
+
+ if (connect_result_.has_value()) {
+ // Not typical, but theoretically nothing to stop the connection from
+ // completing before the server socket is closed to trigger the error.
+ EXPECT_TRUE(connect_result_.value().ok());
+ return;
+ }
+
+ // Close the server socket.
+ EXPECT_TRUE(socket_api::Close(server_socket_descriptor_).ok());
+ server_socket_descriptor_ = kInvalidSocketFd;
+
+ // Expect an error once signalled.
+ EXPECT_FALSE(connect_result_.has_value());
+ event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
+ ASSERT_TRUE(connect_result_.has_value());
+ EXPECT_FALSE(connect_result_.value().ok());
+}
+
+TEST_P(EventLoopTcpClientSocketTest, Disconnect) {
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/nullptr);
+
+ ASSERT_TRUE(socket->ConnectBlocking().ok());
+ socket->Disconnect();
+}
+
+TEST_P(EventLoopTcpClientSocketTest, DisconnectCancelsConnectAsync) {
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/this);
+
+ socket->ConnectAsync();
+
+ if (connect_result_.has_value()) {
+ // Not typical, but theoretically nothing to stop the connection from
+ // completing before the server socket is closed to trigger the error.
+ EXPECT_TRUE(connect_result_.value().ok());
+ return;
+ }
+
+ socket->Disconnect();
+
+ // Expect immediate cancelled error.
+ ASSERT_TRUE(connect_result_.has_value());
+ EXPECT_TRUE(absl::IsCancelled(connect_result_.value()));
+}
+
+TEST_P(EventLoopTcpClientSocketTest, ConnectAndReconnect) {
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/nullptr);
+
+ ASSERT_TRUE(socket->ConnectBlocking().ok());
+ socket->Disconnect();
+
+ // Expect `socket` can reconnect now that it has been disconnected.
+ EXPECT_TRUE(socket->ConnectBlocking().ok());
+ socket->Disconnect();
+}
+
+void SendDataOnSocket(absl::string_view data, SocketFd connected_socket) {
+ while (!data.empty()) {
+ absl::StatusOr<absl::string_view> remainder =
+ socket_api::Send(connected_socket, data);
+ if (!remainder.ok()) {
+ return;
+ }
+ data = remainder.value();
+ }
+}
+
+TEST_P(EventLoopTcpClientSocketTest, Receive) {
+ std::string expected = {1, 2, 3, 4, 5, 6, 7, 8};
+ TestTcpServerSocketRunner runner(
+ server_socket_descriptor_, absl::bind_front(&SendDataOnSocket, expected));
+
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/nullptr);
+ ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+ std::string received;
+ absl::StatusOr<quiche::QuicheMemSlice> data;
+ do {
+ data = socket->ReceiveBlocking(100);
+ ASSERT_TRUE(data.ok());
+ received.append(data.value().data(), data.value().length());
+ } while (!data.value().empty());
+ EXPECT_EQ(received, expected);
+
+ socket->Disconnect();
+}
+
+TEST_P(EventLoopTcpClientSocketTest, ReceiveAsync) {
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/this);
+ ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+ // Start an async receive. Expect no immediate results because runner not yet
+ // setup to accept and send.
+ socket->ReceiveAsync(100);
+ EXPECT_FALSE(receive_result_.has_value());
+
+ // Send data from server.
+ std::string expected = {1, 2, 3, 4, 5, 6, 7, 8};
+ TestTcpServerSocketRunner runner(
+ server_socket_descriptor_, absl::bind_front(&SendDataOnSocket, expected));
+ EXPECT_FALSE(receive_result_.has_value());
+ for (int i = 0; i < 5 && !receive_result_.has_value(); ++i) {
+ event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
+ }
+
+ // Expect to receive at least some of the sent data.
+ ASSERT_TRUE(receive_result_.has_value());
+ ASSERT_TRUE(receive_result_.value().ok());
+ EXPECT_FALSE(receive_result_.value().value().empty());
+ std::string received(receive_result_.value().value().data(),
+ receive_result_.value().value().length());
+
+ // Get any remaining data via blocking calls.
+ absl::StatusOr<quiche::QuicheMemSlice> data;
+ do {
+ data = socket->ReceiveBlocking(100);
+ ASSERT_TRUE(data.ok());
+ received.append(data.value().data(), data.value().length());
+ } while (!data.value().empty());
+
+ EXPECT_EQ(received, expected);
+
+ receive_result_.reset();
+ socket->Disconnect();
+ EXPECT_FALSE(receive_result_.has_value());
+}
+
+TEST_P(EventLoopTcpClientSocketTest, DisconnectCancelsReceiveAsync) {
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/this);
+
+ ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+ // Start an asynchronous read, expecting no completion because server never
+ // sends any data.
+ socket->ReceiveAsync(100);
+ EXPECT_FALSE(receive_result_.has_value());
+
+ // Disconnect and expect an immediate cancelled error.
+ socket->Disconnect();
+ ASSERT_TRUE(receive_result_.has_value());
+ ASSERT_FALSE(receive_result_.value().ok());
+ EXPECT_TRUE(absl::IsCancelled(receive_result_.value().status()));
+}
+
+// Receive from `connected_socket` until connection is closed, writing received
+// data to `out_received`.
+void ReceiveDataFromSocket(std::string* out_received,
+ SocketFd connected_socket) {
+ out_received->clear();
+
+ std::string buffer(100, 0);
+ absl::StatusOr<absl::Span<char>> received;
+ do {
+ received = socket_api::Receive(connected_socket, absl::MakeSpan(buffer));
+ QUICHE_CHECK(received.ok());
+ out_received->insert(out_received->end(), received.value().begin(),
+ received.value().end());
+ } while (!received.value().empty());
+}
+
+TEST_P(EventLoopTcpClientSocketTest, Send) {
+ std::string sent;
+ TestTcpServerSocketRunner runner(
+ server_socket_descriptor_,
+ absl::bind_front(&ReceiveDataFromSocket, &sent));
+
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/0,
+ /*async_visitor=*/nullptr);
+ ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+ std::string expected = {1, 2, 3, 4, 5, 6, 7, 8};
+ EXPECT_TRUE(socket->SendBlocking(expected).ok());
+ socket->Disconnect();
+
+ runner.WaitForCompletion();
+ EXPECT_EQ(sent, expected);
+}
+
+TEST_P(EventLoopTcpClientSocketTest, SendAsync) {
+ // Use a small send buffer to improve chances of a send needing to be
+ // asynchronous.
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/4,
+ /*async_visitor=*/this);
+ ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+ std::string data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+ std::string expected;
+
+ // Repeatedly write to socket until it does not complete synchronously.
+ do {
+ expected.insert(expected.end(), data.begin(), data.end());
+ send_result_.reset();
+ socket->SendAsync(data);
+ ASSERT_TRUE(!send_result_.has_value() || send_result_.value().ok());
+ } while (send_result_.has_value());
+
+ // Begin receiving from server and expect more data to send.
+ std::string sent;
+ TestTcpServerSocketRunner runner(
+ server_socket_descriptor_,
+ absl::bind_front(&ReceiveDataFromSocket, &sent));
+ EXPECT_FALSE(send_result_.has_value());
+ for (int i = 0; i < 5 && !send_result_.has_value(); ++i) {
+ event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
+ }
+ ASSERT_TRUE(send_result_.has_value());
+ EXPECT_TRUE(send_result_.value().ok());
+
+ send_result_.reset();
+ socket->Disconnect();
+ EXPECT_FALSE(send_result_.has_value());
+
+ runner.WaitForCompletion();
+ EXPECT_EQ(sent, expected);
+}
+
+TEST_P(EventLoopTcpClientSocketTest, DisconnectCancelsSendAsync) {
+ // Use a small send buffer to improve chances of a send needing to be
+ // asynchronous.
+ std::unique_ptr<StreamClientSocket> socket =
+ socket_factory_.CreateTcpClientSocket(server_socket_address_,
+ /*receive_buffer_size=*/0,
+ /*send_buffer_size=*/4,
+ /*async_visitor=*/this);
+ ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+ std::string data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+
+ // Repeatedly write to socket until it does not complete synchronously.
+ do {
+ send_result_.reset();
+ socket->SendAsync(data);
+ ASSERT_TRUE(!send_result_.has_value() || send_result_.value().ok());
+ } while (send_result_.has_value());
+
+ // Disconnect and expect immediate cancelled error.
+ socket->Disconnect();
+ ASSERT_TRUE(send_result_.has_value());
+ EXPECT_TRUE(absl::IsCancelled(send_result_.value()));
+}
+
+} // namespace
+} // namespace quic::test
diff --git a/quiche/quic/core/io/socket.h b/quiche/quic/core/io/socket.h
index 748de4d..fcc6418 100644
--- a/quiche/quic/core/io/socket.h
+++ b/quiche/quic/core/io/socket.h
@@ -5,6 +5,9 @@
#ifndef QUICHE_QUIC_CORE_IO_SOCKET_H_
#define QUICHE_QUIC_CORE_IO_SOCKET_H_
+#include <functional>
+#include <string>
+
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
@@ -13,6 +16,7 @@
#include "quiche/quic/platform/api/quic_ip_address_family.h"
#include "quiche/quic/platform/api/quic_socket_address.h"
#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
#if defined(_WIN32)
#include <winsock2.h>
@@ -28,6 +32,57 @@
inline constexpr SocketFd kInvalidSocketFd = -1;
#endif
+// A read/write socket.
+//
+// Warning regarding blocking calls: Code in the QUICHE library typically
+// handles IO on a single thread, so if making calls from that typical
+// environment, it would be problematic to make a blocking call and block that
+// single thread.
+class QUICHE_EXPORT_PRIVATE Socket {
+ public:
+ class AsyncVisitor {
+ public:
+ virtual ~AsyncVisitor() = default;
+
+ // If the operation completed without error, `data` is set to the received
+ // data.
+ virtual void ReceiveComplete(
+ absl::StatusOr<quiche::QuicheMemSlice> data) = 0;
+
+ virtual void SendComplete(absl::Status status) = 0;
+ };
+
+ virtual ~Socket() = default;
+
+ // Blocking read. Receives and returns a buffer of up to `max_size` bytes from
+ // socket. Returns status on error.
+ virtual absl::StatusOr<quiche::QuicheMemSlice> ReceiveBlocking(
+ QuicByteCount max_size) = 0;
+
+ // Asynchronous read. Receives up to `max_size` bytes from socket. If
+ // no data is synchronously available to be read, waits until some data is
+ // available or the socket is closed. On completion, calls ReceiveComplete()
+ // on the visitor, potentially before return from ReceiveAsync().
+ //
+ // After calling, the socket must not be destroyed until ReceiveComplete() is
+ // called.
+ virtual void ReceiveAsync(QuicByteCount max_size) = 0;
+
+ // Blocking write. Sends all of `data` (potentially via multiple underlying
+ // socket sends).
+ virtual absl::Status SendBlocking(std::string data) = 0;
+ virtual absl::Status SendBlocking(quiche::QuicheMemSlice data) = 0;
+
+ // Asynchronous write. Sends all of `data` (potentially via multiple
+ // underlying socket sends). On completion, calls SendComplete() on the
+ // visitor, potentially before return from SendAsync().
+ //
+ // After calling, the socket must not be destroyed until SendComplete() is
+ // called.
+ virtual void SendAsync(std::string data) = 0;
+ virtual void SendAsync(quiche::QuicheMemSlice data) = 0;
+};
+
// Low-level platform-agnostic socket operations. Closely follows the behavior
// of basic POSIX socket APIs, diverging mostly only to convert to/from cleaner
// and platform-agnostic types.
diff --git a/quiche/quic/core/io/socket_factory.h b/quiche/quic/core/io/socket_factory.h
new file mode 100644
index 0000000..39102f1
--- /dev/null
+++ b/quiche/quic/core/io/socket_factory.h
@@ -0,0 +1,34 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_IO_SOCKET_FACTORY_H_
+#define QUICHE_QUIC_CORE_IO_SOCKET_FACTORY_H_
+
+#include <memory>
+
+#include "quiche/quic/core/io/stream_client_socket.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_export.h"
+
+namespace quic {
+
+// A factory to create objects of type Socket and derived interfaces.
+class QUICHE_EXPORT_PRIVATE SocketFactory {
+ public:
+ virtual ~SocketFactory() = default;
+
+ // Will use platform default buffer size if `receive_buffer_size` or
+ // `send_buffer_size` is zero. If `async_visitor` is null, async operations
+ // must not be called on the created socket. If `async_visitor` is non-null,
+ // it must outlive the created socket.
+ virtual std::unique_ptr<StreamClientSocket> CreateTcpClientSocket(
+ const quic::QuicSocketAddress& peer_address,
+ QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
+ StreamClientSocket::AsyncVisitor* async_visitor) = 0;
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_CORE_IO_SOCKET_FACTORY_H_
diff --git a/quiche/quic/core/io/stream_client_socket.h b/quiche/quic/core/io/stream_client_socket.h
new file mode 100644
index 0000000..00f9a42
--- /dev/null
+++ b/quiche/quic/core/io/stream_client_socket.h
@@ -0,0 +1,62 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_IO_STREAM_CLIENT_SOCKET_H_
+#define QUICHE_QUIC_CORE_IO_STREAM_CLIENT_SOCKET_H_
+
+#include "absl/status/status.h"
+#include "quiche/quic/core/io/socket.h"
+#include "quiche/common/platform/api/quiche_export.h"
+
+namespace quic {
+
+// A client socket using a protocol (typically TCP) that provides
+// connection-based streams.
+//
+// Must not destroy a connected/connecting socket. If connected or connecting,
+// must call Disconnect() to disconnect or cancel the connection before
+// destruction.
+//
+// Warning regarding blocking calls: Code in the QUICHE library typically
+// handles IO on a single thread, so if making calls from that typical
+// environment, it would be problematic to make a blocking call and block that
+// single thread.
+class QUICHE_EXPORT_PRIVATE StreamClientSocket : public Socket {
+ public:
+ class AsyncVisitor : public Socket::AsyncVisitor {
+ public:
+ virtual void ConnectComplete(absl::Status status) = 0;
+ };
+
+ ~StreamClientSocket() override = default;
+
+ // Establishes a connection synchronously. Should not be called if socket has
+ // already been successfully connected without first calling Disconnect().
+ virtual absl::Status ConnectBlocking() = 0;
+
+ // Establishes a connection asynchronously. On completion, calls
+ // ConnectComplete() on the visitor, potentially before return from
+ // ConnectAsync(). Should not be called if socket has already been
+ // successfully connected without first calling Disconnect().
+ //
+ // After calling, the socket must not be destroyed until Disconnect() is
+ // called.
+ virtual void ConnectAsync() = 0;
+
+ // Disconnects a connected socket or cancels an in-progress ConnectAsync(),
+ // invoking the `ConnectComplete(absl::CancelledError())` on the visitor.
+ // After success, it is possible to call ConnectBlocking() or ConnectAsync()
+ // again to establish a new connection. Cancels any pending read or write
+ // operations, calling visitor completion methods with
+ // `absl::CancelledError()`.
+ //
+ // Typically implemented via a call to ::close(), which for TCP can result in
+ // either FIN or RST, depending on socket/platform state and undefined
+ // platform behavior.
+ virtual void Disconnect() = 0;
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_CORE_IO_STREAM_CLIENT_SOCKET_H_