Add higher-level TCP client socket/factory support to QUICHE

Added some basic interfaces that could be used to abstract away implementation details of socket operations, including a SocketFactory that allows initiation details like QuicEventLoop to be hidden away from code interacting with it and creating/running sockets.  Especially helpful to pass factory objects to code that doesn't need to know details around the event/async libraries in use or need to stay agnostic between them to build on multiple plaforms/environments.

Interfaces support my current needs (TCP client sockets), but left them deliberately open to future expansion to server sockets or UDP, etc.  Created a QuicEventLoop-based implementation of the interfaces.

PiperOrigin-RevId: 466366913
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 30d068a..a928fe6 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -947,10 +947,14 @@
     "quic/core/batch_writer/quic_batch_writer_test.h",
     "quic/core/batch_writer/quic_gso_batch_writer.h",
     "quic/core/batch_writer/quic_sendmmsg_batch_writer.h",
+    "quic/core/io/event_loop_socket_factory.h",
+    "quic/core/io/event_loop_tcp_client_socket.h",
     "quic/core/io/quic_default_event_loop.h",
     "quic/core/io/quic_event_loop.h",
     "quic/core/io/quic_poll_event_loop.h",
     "quic/core/io/socket.h",
+    "quic/core/io/socket_factory.h",
+    "quic/core/io/stream_client_socket.h",
     "quic/core/quic_default_packet_writer.h",
     "quic/core/quic_epoll_alarm_factory.h",
     "quic/core/quic_epoll_clock.h",
@@ -981,6 +985,8 @@
     "quic/core/batch_writer/quic_batch_writer_buffer.cc",
     "quic/core/batch_writer/quic_gso_batch_writer.cc",
     "quic/core/batch_writer/quic_sendmmsg_batch_writer.cc",
+    "quic/core/io/event_loop_socket_factory.cc",
+    "quic/core/io/event_loop_tcp_client_socket.cc",
     "quic/core/io/quic_default_event_loop.cc",
     "quic/core/io/quic_poll_event_loop.cc",
     "quic/core/io/socket_posix.cc",
@@ -1302,6 +1308,7 @@
     "quic/core/http/quic_spdy_client_session_test.cc",
     "quic/core/http/quic_spdy_client_stream_test.cc",
     "quic/core/http/quic_spdy_server_stream_base_test.cc",
+    "quic/core/io/event_loop_tcp_client_socket_test.cc",
     "quic/core/io/quic_all_event_loops_test.cc",
     "quic/core/io/quic_poll_event_loop_test.cc",
     "quic/core/io/socket_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index daadaf6..2fd7580 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -947,10 +947,14 @@
     "src/quiche/quic/core/batch_writer/quic_batch_writer_test.h",
     "src/quiche/quic/core/batch_writer/quic_gso_batch_writer.h",
     "src/quiche/quic/core/batch_writer/quic_sendmmsg_batch_writer.h",
+    "src/quiche/quic/core/io/event_loop_socket_factory.h",
+    "src/quiche/quic/core/io/event_loop_tcp_client_socket.h",
     "src/quiche/quic/core/io/quic_default_event_loop.h",
     "src/quiche/quic/core/io/quic_event_loop.h",
     "src/quiche/quic/core/io/quic_poll_event_loop.h",
     "src/quiche/quic/core/io/socket.h",
+    "src/quiche/quic/core/io/socket_factory.h",
+    "src/quiche/quic/core/io/stream_client_socket.h",
     "src/quiche/quic/core/quic_default_packet_writer.h",
     "src/quiche/quic/core/quic_epoll_alarm_factory.h",
     "src/quiche/quic/core/quic_epoll_clock.h",
@@ -981,6 +985,8 @@
     "src/quiche/quic/core/batch_writer/quic_batch_writer_buffer.cc",
     "src/quiche/quic/core/batch_writer/quic_gso_batch_writer.cc",
     "src/quiche/quic/core/batch_writer/quic_sendmmsg_batch_writer.cc",
+    "src/quiche/quic/core/io/event_loop_socket_factory.cc",
+    "src/quiche/quic/core/io/event_loop_tcp_client_socket.cc",
     "src/quiche/quic/core/io/quic_default_event_loop.cc",
     "src/quiche/quic/core/io/quic_poll_event_loop.cc",
     "src/quiche/quic/core/io/socket_posix.cc",
@@ -1302,6 +1308,7 @@
     "src/quiche/quic/core/http/quic_spdy_client_session_test.cc",
     "src/quiche/quic/core/http/quic_spdy_client_stream_test.cc",
     "src/quiche/quic/core/http/quic_spdy_server_stream_base_test.cc",
+    "src/quiche/quic/core/io/event_loop_tcp_client_socket_test.cc",
     "src/quiche/quic/core/io/quic_all_event_loops_test.cc",
     "src/quiche/quic/core/io/quic_poll_event_loop_test.cc",
     "src/quiche/quic/core/io/socket_test.cc",
diff --git a/build/source_list.json b/build/source_list.json
index ca31469..02f608b 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -946,10 +946,14 @@
     "quiche/quic/core/batch_writer/quic_batch_writer_test.h",
     "quiche/quic/core/batch_writer/quic_gso_batch_writer.h",
     "quiche/quic/core/batch_writer/quic_sendmmsg_batch_writer.h",
+    "quiche/quic/core/io/event_loop_socket_factory.h",
+    "quiche/quic/core/io/event_loop_tcp_client_socket.h",
     "quiche/quic/core/io/quic_default_event_loop.h",
     "quiche/quic/core/io/quic_event_loop.h",
     "quiche/quic/core/io/quic_poll_event_loop.h",
     "quiche/quic/core/io/socket.h",
+    "quiche/quic/core/io/socket_factory.h",
+    "quiche/quic/core/io/stream_client_socket.h",
     "quiche/quic/core/quic_default_packet_writer.h",
     "quiche/quic/core/quic_epoll_alarm_factory.h",
     "quiche/quic/core/quic_epoll_clock.h",
@@ -980,6 +984,8 @@
     "quiche/quic/core/batch_writer/quic_batch_writer_buffer.cc",
     "quiche/quic/core/batch_writer/quic_gso_batch_writer.cc",
     "quiche/quic/core/batch_writer/quic_sendmmsg_batch_writer.cc",
+    "quiche/quic/core/io/event_loop_socket_factory.cc",
+    "quiche/quic/core/io/event_loop_tcp_client_socket.cc",
     "quiche/quic/core/io/quic_default_event_loop.cc",
     "quiche/quic/core/io/quic_poll_event_loop.cc",
     "quiche/quic/core/io/socket_posix.cc",
@@ -1301,6 +1307,7 @@
     "quiche/quic/core/http/quic_spdy_client_session_test.cc",
     "quiche/quic/core/http/quic_spdy_client_stream_test.cc",
     "quiche/quic/core/http/quic_spdy_server_stream_base_test.cc",
+    "quiche/quic/core/io/event_loop_tcp_client_socket_test.cc",
     "quiche/quic/core/io/quic_all_event_loops_test.cc",
     "quiche/quic/core/io/quic_poll_event_loop_test.cc",
     "quiche/quic/core/io/socket_test.cc",
diff --git a/quiche/quic/core/io/event_loop_socket_factory.cc b/quiche/quic/core/io/event_loop_socket_factory.cc
new file mode 100644
index 0000000..4d2508e
--- /dev/null
+++ b/quiche/quic/core/io/event_loop_socket_factory.cc
@@ -0,0 +1,36 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/core/io/event_loop_socket_factory.h"
+
+#include <memory>
+
+#include "quiche/quic/core/io/event_loop_tcp_client_socket.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/io/stream_client_socket.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+
+namespace quic {
+
+EventLoopSocketFactory::EventLoopSocketFactory(
+    QuicEventLoop* event_loop, quiche::QuicheBufferAllocator* buffer_allocator)
+    : event_loop_(event_loop), buffer_allocator_(buffer_allocator) {
+  QUICHE_DCHECK(event_loop_);
+  QUICHE_DCHECK(buffer_allocator_);
+}
+
+std::unique_ptr<StreamClientSocket>
+EventLoopSocketFactory::CreateTcpClientSocket(
+    const quic::QuicSocketAddress& peer_address,
+    QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
+    StreamClientSocket::AsyncVisitor* async_visitor) {
+  return std::make_unique<EventLoopTcpClientSocket>(
+      peer_address, receive_buffer_size, send_buffer_size, event_loop_,
+      buffer_allocator_, async_visitor);
+}
+
+}  // namespace quic
diff --git a/quiche/quic/core/io/event_loop_socket_factory.h b/quiche/quic/core/io/event_loop_socket_factory.h
new file mode 100644
index 0000000..6882e80
--- /dev/null
+++ b/quiche/quic/core/io/event_loop_socket_factory.h
@@ -0,0 +1,41 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_IO_EVENT_LOOP_SOCKET_FACTORY_H_
+#define QUICHE_QUIC_CORE_IO_EVENT_LOOP_SOCKET_FACTORY_H_
+
+#include <memory>
+
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/io/socket_factory.h"
+#include "quiche/quic/core/io/stream_client_socket.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+
+namespace quic {
+
+// A socket factory that creates sockets implemented using an underlying
+// QuicEventLoop.
+class QUICHE_EXPORT_PRIVATE EventLoopSocketFactory : public SocketFactory {
+ public:
+  // `event_loop` and `buffer_allocator` must outlive the created factory.
+  EventLoopSocketFactory(QuicEventLoop* event_loop,
+                         quiche::QuicheBufferAllocator* buffer_allocator);
+
+  // SocketFactory:
+  std::unique_ptr<StreamClientSocket> CreateTcpClientSocket(
+      const quic::QuicSocketAddress& peer_address,
+      QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
+      StreamClientSocket::AsyncVisitor* async_visitor) override;
+
+ private:
+  QuicEventLoop* const event_loop_;                  // unowned
+  quiche::QuicheBufferAllocator* buffer_allocator_;  // unowned
+};
+
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_CORE_IO_EVENT_LOOP_SOCKET_FACTORY_H_
diff --git a/quiche/quic/core/io/event_loop_tcp_client_socket.cc b/quiche/quic/core/io/event_loop_tcp_client_socket.cc
new file mode 100644
index 0000000..84263bc
--- /dev/null
+++ b/quiche/quic/core/io/event_loop_tcp_client_socket.cc
@@ -0,0 +1,610 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/core/io/event_loop_tcp_client_socket.h"
+
+#include <limits>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "absl/types/variant.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/io/socket.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace quic {
+
+EventLoopTcpClientSocket::EventLoopTcpClientSocket(
+    const quic::QuicSocketAddress& peer_address,
+    QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
+    QuicEventLoop* event_loop, quiche::QuicheBufferAllocator* buffer_allocator,
+    AsyncVisitor* async_visitor)
+    : peer_address_(peer_address),
+      receive_buffer_size_(receive_buffer_size),
+      send_buffer_size_(send_buffer_size),
+      event_loop_(event_loop),
+      buffer_allocator_(buffer_allocator),
+      async_visitor_(async_visitor) {
+  QUICHE_DCHECK(event_loop_);
+  QUICHE_DCHECK(buffer_allocator_);
+}
+
+EventLoopTcpClientSocket::~EventLoopTcpClientSocket() {
+  // Connected socket must be closed via Disconnect() before destruction. Cannot
+  // safely recover if state indicates caller may be expecting async callbacks.
+  QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
+  QUICHE_DCHECK(!receive_max_size_.has_value());
+  QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+  if (descriptor_ != kInvalidSocketFd) {
+    QUICHE_BUG(quic_event_loop_tcp_socket_invalid_destruction)
+        << "Must call Disconnect() on connected TCP socket before destruction.";
+    Close();
+  }
+
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+  QUICHE_DCHECK(send_remaining_.empty());
+}
+
+absl::Status EventLoopTcpClientSocket::ConnectBlocking() {
+  QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+  QUICHE_DCHECK(!receive_max_size_.has_value());
+  QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+  absl::Status status = Open();
+  if (!status.ok()) {
+    return status;
+  }
+
+  status = socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
+  if (!status.ok()) {
+    QUICHE_LOG_FIRST_N(WARNING, 100)
+        << "Failed to set socket to address: " << peer_address_.ToString()
+        << " as blocking for connect with error: " << status;
+    Close();
+    return status;
+  }
+
+  status = DoInitialConnect();
+
+  if (absl::IsUnavailable(status)) {
+    QUICHE_LOG_FIRST_N(ERROR, 100)
+        << "Non-blocking connect to should-be blocking socket to address:"
+        << peer_address_.ToString() << ".";
+    Close();
+    connect_status_ = ConnectStatus::kNotConnected;
+    return status;
+  } else if (!status.ok()) {
+    // DoInitialConnect() closes the socket on failures.
+    QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
+    QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+    return status;
+  }
+
+  status = socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
+  if (!status.ok()) {
+    QUICHE_LOG_FIRST_N(WARNING, 100)
+        << "Failed to return socket to address: " << peer_address_.ToString()
+        << " to non-blocking after connect with error: " << status;
+    Close();
+    connect_status_ = ConnectStatus::kNotConnected;
+  }
+
+  QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
+  return status;
+}
+
+void EventLoopTcpClientSocket::ConnectAsync() {
+  QUICHE_DCHECK(async_visitor_);
+  QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+  QUICHE_DCHECK(!receive_max_size_.has_value());
+  QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+  absl::Status status = Open();
+  if (!status.ok()) {
+    async_visitor_->ConnectComplete(status);
+    return;
+  }
+
+  FinishOrRearmAsyncConnect(DoInitialConnect());
+}
+
+void EventLoopTcpClientSocket::Disconnect() {
+  QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ != ConnectStatus::kNotConnected);
+
+  Close();
+  QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
+
+  // Reset all state before invoking any callbacks.
+  bool require_connect_callback = connect_status_ == ConnectStatus::kConnecting;
+  connect_status_ = ConnectStatus::kNotConnected;
+  bool require_receive_callback = receive_max_size_.has_value();
+  receive_max_size_.reset();
+  bool require_send_callback =
+      !absl::holds_alternative<absl::monostate>(send_data_);
+  send_data_ = absl::monostate();
+  send_remaining_ = "";
+
+  if (require_connect_callback) {
+    QUICHE_DCHECK(async_visitor_);
+    async_visitor_->ConnectComplete(absl::CancelledError());
+  }
+  if (require_receive_callback) {
+    QUICHE_DCHECK(async_visitor_);
+    async_visitor_->ReceiveComplete(absl::CancelledError());
+  }
+  if (require_send_callback) {
+    QUICHE_DCHECK(async_visitor_);
+    async_visitor_->SendComplete(absl::CancelledError());
+  }
+}
+
+absl::StatusOr<quiche::QuicheMemSlice>
+EventLoopTcpClientSocket::ReceiveBlocking(QuicByteCount max_size) {
+  QUICHE_DCHECK_GT(max_size, 0u);
+  QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+  QUICHE_DCHECK(!receive_max_size_.has_value());
+
+  absl::Status status =
+      socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
+  if (!status.ok()) {
+    QUICHE_LOG_FIRST_N(WARNING, 100)
+        << "Failed to set socket to address: " << peer_address_.ToString()
+        << " as blocking for receive with error: " << status;
+    return status;
+  }
+
+  receive_max_size_ = max_size;
+  absl::StatusOr<quiche::QuicheMemSlice> buffer = ReceiveInternal();
+
+  if (!buffer.ok() && absl::IsUnavailable(buffer.status())) {
+    QUICHE_LOG_FIRST_N(ERROR, 100)
+        << "Non-blocking receive from should-be blocking socket to address:"
+        << peer_address_.ToString() << ".";
+    receive_max_size_.reset();
+  } else {
+    QUICHE_DCHECK(!receive_max_size_.has_value());
+  }
+
+  absl::Status set_non_blocking_status =
+      socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
+  if (!set_non_blocking_status.ok()) {
+    QUICHE_LOG_FIRST_N(WARNING, 100)
+        << "Failed to return socket to address: " << peer_address_.ToString()
+        << " to non-blocking after receive with error: "
+        << set_non_blocking_status;
+    return set_non_blocking_status;
+  }
+
+  return buffer;
+}
+
+void EventLoopTcpClientSocket::ReceiveAsync(QuicByteCount max_size) {
+  QUICHE_DCHECK(async_visitor_);
+  QUICHE_DCHECK_GT(max_size, 0u);
+  QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+  QUICHE_DCHECK(!receive_max_size_.has_value());
+
+  receive_max_size_ = max_size;
+
+  FinishOrRearmAsyncReceive(ReceiveInternal());
+}
+
+absl::Status EventLoopTcpClientSocket::SendBlocking(std::string data) {
+  QUICHE_DCHECK(!data.empty());
+  QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+  send_data_ = std::move(data);
+  return SendBlockingInternal();
+}
+
+absl::Status EventLoopTcpClientSocket::SendBlocking(
+    quiche::QuicheMemSlice data) {
+  QUICHE_DCHECK(!data.empty());
+  QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+  send_data_ = std::move(data);
+  return SendBlockingInternal();
+}
+
+void EventLoopTcpClientSocket::SendAsync(std::string data) {
+  QUICHE_DCHECK(!data.empty());
+  QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+  send_data_ = std::move(data);
+  send_remaining_ = absl::get<std::string>(send_data_);
+
+  FinishOrRearmAsyncSend(SendInternal());
+}
+
+void EventLoopTcpClientSocket::SendAsync(quiche::QuicheMemSlice data) {
+  QUICHE_DCHECK(!data.empty());
+  QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+  send_data_ = std::move(data);
+  send_remaining_ =
+      absl::get<quiche::QuicheMemSlice>(send_data_).AsStringView();
+
+  FinishOrRearmAsyncSend(SendInternal());
+}
+
+void EventLoopTcpClientSocket::OnSocketEvent(QuicEventLoop* event_loop,
+                                             SocketFd fd,
+                                             QuicSocketEventMask events) {
+  QUICHE_DCHECK_EQ(event_loop, event_loop_);
+  QUICHE_DCHECK_EQ(fd, descriptor_);
+
+  if (connect_status_ == ConnectStatus::kConnecting &&
+      (events & (kSocketEventWritable | kSocketEventError))) {
+    FinishOrRearmAsyncConnect(GetConnectResult());
+    return;
+  }
+
+  if (receive_max_size_.has_value() &&
+      (events & (kSocketEventReadable | kSocketEventError))) {
+    FinishOrRearmAsyncReceive(ReceiveInternal());
+  }
+  if (!send_remaining_.empty() &&
+      (events & (kSocketEventWritable | kSocketEventError))) {
+    FinishOrRearmAsyncSend(SendInternal());
+  }
+}
+
+absl::Status EventLoopTcpClientSocket::Open() {
+  QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+  QUICHE_DCHECK(!receive_max_size_.has_value());
+  QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+  QUICHE_DCHECK(send_remaining_.empty());
+
+  absl::StatusOr<SocketFd> descriptor = socket_api::CreateSocket(
+      peer_address_.host().address_family(), socket_api::SocketProtocol::kTcp,
+      /*blocking=*/false);
+  if (!descriptor.ok()) {
+    QUICHE_DVLOG(1) << "Failed to open socket for connection to address: "
+                    << peer_address_.ToString()
+                    << " with error: " << descriptor.status();
+    return descriptor.status();
+  }
+  QUICHE_DCHECK_NE(descriptor.value(), kInvalidSocketFd);
+
+  descriptor_ = descriptor.value();
+
+  if (async_visitor_) {
+    bool registered;
+    if (event_loop_->SupportsEdgeTriggered()) {
+      registered = event_loop_->RegisterSocket(
+          descriptor_,
+          kSocketEventReadable | kSocketEventWritable | kSocketEventError,
+          this);
+    } else {
+      // Just register the socket without any armed events for now.  Will rearm
+      // with specific events as needed.  Registering now before events are
+      // needed makes it easier to ensure the socket is registered only once
+      // and can always be unregistered on socket close.
+      registered = event_loop_->RegisterSocket(descriptor_, /*events=*/0, this);
+    }
+    QUICHE_DCHECK(registered);
+  }
+
+  if (receive_buffer_size_ != 0) {
+    absl::Status status =
+        socket_api::SetReceiveBufferSize(descriptor_, receive_buffer_size_);
+    if (!status.ok()) {
+      QUICHE_LOG_FIRST_N(WARNING, 100)
+          << "Failed to set receive buffer size to: " << receive_buffer_size_
+          << " for socket to address: " << peer_address_.ToString()
+          << " with error: " << status;
+      Close();
+      return status;
+    }
+  }
+
+  if (send_buffer_size_ != 0) {
+    absl::Status status =
+        socket_api::SetSendBufferSize(descriptor_, send_buffer_size_);
+    if (!status.ok()) {
+      QUICHE_LOG_FIRST_N(WARNING, 100)
+          << "Failed to set send buffer size to: " << send_buffer_size_
+          << " for socket to address: " << peer_address_.ToString()
+          << " with error: " << status;
+      Close();
+      return status;
+    }
+  }
+
+  return absl::OkStatus();
+}
+
+void EventLoopTcpClientSocket::Close() {
+  QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+
+  bool unregistered = event_loop_->UnregisterSocket(descriptor_);
+  QUICHE_DCHECK_EQ(unregistered, !!async_visitor_);
+
+  absl::Status status = socket_api::Close(descriptor_);
+  if (!status.ok()) {
+    QUICHE_LOG_FIRST_N(WARNING, 100)
+        << "Could not close socket to address: " << peer_address_.ToString()
+        << " with error: " << status;
+  }
+
+  descriptor_ = kInvalidSocketFd;
+}
+
+absl::Status EventLoopTcpClientSocket::DoInitialConnect() {
+  QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
+  QUICHE_DCHECK(!receive_max_size_.has_value());
+  QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+  absl::Status connect_result = socket_api::Connect(descriptor_, peer_address_);
+
+  if (connect_result.ok()) {
+    connect_status_ = ConnectStatus::kConnected;
+  } else if (absl::IsUnavailable(connect_result)) {
+    connect_status_ = ConnectStatus::kConnecting;
+  } else {
+    QUICHE_DVLOG(1) << "Synchronously failed to connect socket to address: "
+                    << peer_address_.ToString()
+                    << " with error: " << connect_result;
+    Close();
+    connect_status_ = ConnectStatus::kNotConnected;
+  }
+
+  return connect_result;
+}
+
+absl::Status EventLoopTcpClientSocket::GetConnectResult() {
+  QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnecting);
+  QUICHE_DCHECK(!receive_max_size_.has_value());
+  QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+
+  absl::Status error = socket_api::GetSocketError(descriptor_);
+
+  if (!error.ok()) {
+    QUICHE_DVLOG(1) << "Asynchronously failed to connect socket to address: "
+                    << peer_address_.ToString() << " with error: " << error;
+    Close();
+    connect_status_ = ConnectStatus::kNotConnected;
+    return error;
+  }
+
+  // Peek at one byte to confirm the connection is actually alive. Motivation:
+  // 1) Plausibly could have a lot of cases where the connection operation
+  //    itself technically succeeds but the socket then quickly fails.  Don't
+  //    want to claim connection success here if, by the time this code is
+  //    running after event triggers and such, the socket has already failed.
+  //    Lot of undefined room around whether or not such errors would be saved
+  //    into SO_ERROR and returned by socket_api::GetSocketError().
+  // 2) With the various platforms and event systems involved, less than 100%
+  //    trust that it's impossible to end up in this method before the async
+  //    connect has completed/errored. Given that Connect() and GetSocketError()
+  //    does not difinitevely differentiate between success and
+  //    still-in-progress, and given that there's a very simple and performant
+  //    way to positively confirm the socket is connected (peek), do that here.
+  //    (Could consider making the not-connected case a QUIC_BUG if a way is
+  //    found to differentiate it from (1).)
+  absl::StatusOr<bool> peek_data = OneBytePeek();
+  if (peek_data.ok() || absl::IsUnavailable(peek_data.status())) {
+    connect_status_ = ConnectStatus::kConnected;
+  } else {
+    error = peek_data.status();
+    QUICHE_LOG_FIRST_N(WARNING, 100)
+        << "Socket to address: " << peer_address_.ToString()
+        << " signalled writable after connect and no connect error found, "
+           "but socket does not appear connected with error: "
+        << error;
+    Close();
+    connect_status_ = ConnectStatus::kNotConnected;
+  }
+
+  return error;
+}
+
+void EventLoopTcpClientSocket::FinishOrRearmAsyncConnect(absl::Status status) {
+  if (absl::IsUnavailable(status)) {
+    if (!event_loop_->SupportsEdgeTriggered()) {
+      bool result = event_loop_->RearmSocket(
+          descriptor_, kSocketEventWritable | kSocketEventError);
+      QUICHE_DCHECK(result);
+    }
+    QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnecting);
+  } else {
+    QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
+    async_visitor_->ConnectComplete(status);
+  }
+}
+
+absl::StatusOr<quiche::QuicheMemSlice>
+EventLoopTcpClientSocket::ReceiveInternal() {
+  QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+  QUICHE_CHECK(receive_max_size_.has_value());
+  QUICHE_DCHECK_GE(receive_max_size_.value(), 1u);
+  QUICHE_DCHECK_LE(receive_max_size_.value(),
+                   std::numeric_limits<size_t>::max());
+
+  // Before allocating a buffer, do a 1-byte peek to determine if needed.
+  if (receive_max_size_.value() > 1) {
+    absl::StatusOr<bool> peek_data = OneBytePeek();
+    if (!peek_data.ok()) {
+      if (!absl::IsUnavailable(peek_data.status())) {
+        receive_max_size_.reset();
+      }
+      return peek_data.status();
+    } else if (!peek_data.value()) {
+      receive_max_size_.reset();
+      return quiche::QuicheMemSlice();
+    }
+  }
+
+  quiche::QuicheBuffer buffer(buffer_allocator_, receive_max_size_.value());
+  absl::StatusOr<absl::Span<char>> received = socket_api::Receive(
+      descriptor_, absl::MakeSpan(buffer.data(), buffer.size()));
+
+  if (received.ok()) {
+    QUICHE_DCHECK_LE(received.value().size(), buffer.size());
+    QUICHE_DCHECK_EQ(received.value().data(), buffer.data());
+
+    receive_max_size_.reset();
+    return quiche::QuicheMemSlice(
+        quiche::QuicheBuffer(buffer.Release(), received.value().size()));
+  } else {
+    if (!absl::IsUnavailable(received.status())) {
+      QUICHE_DVLOG(1) << "Failed to receive from socket to address: "
+                      << peer_address_.ToString()
+                      << " with error: " << received.status();
+      receive_max_size_.reset();
+    }
+    return received.status();
+  }
+}
+
+void EventLoopTcpClientSocket::FinishOrRearmAsyncReceive(
+    absl::StatusOr<quiche::QuicheMemSlice> buffer) {
+  QUICHE_DCHECK(async_visitor_);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+
+  if (!buffer.ok() && absl::IsUnavailable(buffer.status())) {
+    if (!event_loop_->SupportsEdgeTriggered()) {
+      bool result = event_loop_->RearmSocket(
+          descriptor_, kSocketEventReadable | kSocketEventError);
+      QUICHE_DCHECK(result);
+    }
+    QUICHE_DCHECK(receive_max_size_.has_value());
+  } else {
+    QUICHE_DCHECK(!receive_max_size_.has_value());
+    async_visitor_->ReceiveComplete(std::move(buffer));
+  }
+}
+
+absl::StatusOr<bool> EventLoopTcpClientSocket::OneBytePeek() {
+  QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+
+  char peek_buffer;
+  absl::StatusOr<absl::Span<char>> peek_received = socket_api::Receive(
+      descriptor_, absl::MakeSpan(&peek_buffer, /*size=*/1), /*peek=*/true);
+  if (!peek_received.ok()) {
+    return peek_received.status();
+  } else {
+    return !peek_received.value().empty();
+  }
+}
+
+absl::Status EventLoopTcpClientSocket::SendBlockingInternal() {
+  QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+  QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
+  QUICHE_DCHECK(send_remaining_.empty());
+
+  absl::Status status =
+      socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
+  if (!status.ok()) {
+    QUICHE_LOG_FIRST_N(WARNING, 100)
+        << "Failed to set socket to address: " << peer_address_.ToString()
+        << " as blocking for send with error: " << status;
+    send_data_ = absl::monostate();
+    return status;
+  }
+
+  if (absl::holds_alternative<std::string>(send_data_)) {
+    send_remaining_ = absl::get<std::string>(send_data_);
+  } else {
+    send_remaining_ =
+        absl::get<quiche::QuicheMemSlice>(send_data_).AsStringView();
+  }
+
+  status = SendInternal();
+  if (absl::IsUnavailable(status)) {
+    QUICHE_LOG_FIRST_N(ERROR, 100)
+        << "Non-blocking send for should-be blocking socket to address:"
+        << peer_address_.ToString();
+    send_data_ = absl::monostate();
+    send_remaining_ = "";
+  } else {
+    QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+    QUICHE_DCHECK(send_remaining_.empty());
+  }
+
+  absl::Status set_non_blocking_status =
+      socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
+  if (!set_non_blocking_status.ok()) {
+    QUICHE_LOG_FIRST_N(WARNING, 100)
+        << "Failed to return socket to address: " << peer_address_.ToString()
+        << " to non-blocking after send with error: "
+        << set_non_blocking_status;
+    return set_non_blocking_status;
+  }
+
+  return status;
+}
+
+absl::Status EventLoopTcpClientSocket::SendInternal() {
+  QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+  QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
+  QUICHE_DCHECK(!send_remaining_.empty());
+
+  // Repeat send until all data sent, unavailable, or error.
+  while (!send_remaining_.empty()) {
+    absl::StatusOr<absl::string_view> remainder =
+        socket_api::Send(descriptor_, send_remaining_);
+
+    if (remainder.ok()) {
+      QUICHE_DCHECK(remainder.value().empty() ||
+                    (remainder.value().data() >= send_remaining_.data() &&
+                     remainder.value().data() <
+                         send_remaining_.data() + send_remaining_.size()));
+      QUICHE_DCHECK(remainder.value().empty() ||
+                    (remainder.value().data() + remainder.value().size() ==
+                     send_remaining_.data() + send_remaining_.size()));
+      send_remaining_ = remainder.value();
+    } else {
+      if (!absl::IsUnavailable(remainder.status())) {
+        QUICHE_DVLOG(1) << "Failed to send to socket to address: "
+                        << peer_address_.ToString()
+                        << " with error: " << remainder.status();
+        send_data_ = absl::monostate();
+        send_remaining_ = "";
+      }
+      return remainder.status();
+    }
+  }
+
+  send_data_ = absl::monostate();
+  return absl::OkStatus();
+}
+
+void EventLoopTcpClientSocket::FinishOrRearmAsyncSend(absl::Status status) {
+  QUICHE_DCHECK(async_visitor_);
+  QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
+
+  if (absl::IsUnavailable(status)) {
+    if (!event_loop_->SupportsEdgeTriggered()) {
+      bool result = event_loop_->RearmSocket(
+          descriptor_, kSocketEventWritable | kSocketEventError);
+      QUICHE_DCHECK(result);
+    }
+    QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
+    QUICHE_DCHECK(!send_remaining_.empty());
+  } else {
+    QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
+    QUICHE_DCHECK(send_remaining_.empty());
+    async_visitor_->SendComplete(status);
+  }
+}
+
+}  // namespace quic
diff --git a/quiche/quic/core/io/event_loop_tcp_client_socket.h b/quiche/quic/core/io/event_loop_tcp_client_socket.h
new file mode 100644
index 0000000..2d50087
--- /dev/null
+++ b/quiche/quic/core/io/event_loop_tcp_client_socket.h
@@ -0,0 +1,103 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_IO_EVENT_LOOP_TCP_CLIENT_SOCKET_H_
+#define QUICHE_QUIC_CORE_IO_EVENT_LOOP_TCP_CLIENT_SOCKET_H_
+
+#include <string>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "absl/types/variant.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/io/stream_client_socket.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+
+namespace quic {
+
+// A TCP client socket implemented using an underlying QuicEventLoop.
+class QUICHE_EXPORT_PRIVATE EventLoopTcpClientSocket
+    : public StreamClientSocket,
+      public QuicSocketEventListener {
+ public:
+  // Will use platform default buffer size if `receive_buffer_size` or
+  // `send_buffer_size` is zero. `async_visitor` may be null if no async
+  // operations will be  requested. `event_loop`, `buffer_allocator`, and
+  // `async_visitor` (if non-null) must outlive the created socket.
+  EventLoopTcpClientSocket(const quic::QuicSocketAddress& peer_address,
+                           QuicByteCount receive_buffer_size,
+                           QuicByteCount send_buffer_size,
+                           QuicEventLoop* event_loop,
+                           quiche::QuicheBufferAllocator* buffer_allocator,
+                           AsyncVisitor* async_visitor);
+
+  ~EventLoopTcpClientSocket() override;
+
+  // StreamClientSocket:
+  absl::Status ConnectBlocking() override;
+  void ConnectAsync() override;
+  void Disconnect() override;
+
+  // Socket:
+  absl::StatusOr<quiche::QuicheMemSlice> ReceiveBlocking(
+      QuicByteCount max_size) override;
+  void ReceiveAsync(QuicByteCount max_size) override;
+  absl::Status SendBlocking(std::string data) override;
+  absl::Status SendBlocking(quiche::QuicheMemSlice data) override;
+  void SendAsync(std::string data) override;
+  void SendAsync(quiche::QuicheMemSlice data) override;
+
+  // QuicSocketEventListener:
+  void OnSocketEvent(QuicEventLoop* event_loop, SocketFd fd,
+                     QuicSocketEventMask events) override;
+
+ private:
+  enum class ConnectStatus {
+    kNotConnected,
+    kConnecting,
+    kConnected,
+  };
+
+  absl::Status Open();
+  void Close();
+  absl::Status DoInitialConnect();
+  absl::Status GetConnectResult();
+  void FinishOrRearmAsyncConnect(absl::Status status);
+  absl::StatusOr<quiche::QuicheMemSlice> ReceiveInternal();
+  void FinishOrRearmAsyncReceive(absl::StatusOr<quiche::QuicheMemSlice> buffer);
+  // Returns `true` if a byte received, or `false` if successfully received
+  // empty data.
+  absl::StatusOr<bool> OneBytePeek();
+  absl::Status SendBlockingInternal();
+  absl::Status SendInternal();
+  void FinishOrRearmAsyncSend(absl::Status status);
+
+  const QuicSocketAddress peer_address_;
+  const QuicByteCount receive_buffer_size_;
+  const QuicByteCount send_buffer_size_;
+  QuicEventLoop* const event_loop_;                  // unowned
+  quiche::QuicheBufferAllocator* buffer_allocator_;  // unowned
+  AsyncVisitor* const async_visitor_;  // unowned, potentially null
+
+  SocketFd descriptor_ = kInvalidSocketFd;
+  ConnectStatus connect_status_ = ConnectStatus::kNotConnected;
+
+  // Only set while receive in progress or pending, otherwise nullopt.
+  absl::optional<QuicByteCount> receive_max_size_;
+
+  // Only contains data while send in progress or pending, otherwise monostate.
+  absl::variant<absl::monostate, std::string, quiche::QuicheMemSlice>
+      send_data_;
+  // Points to the unsent portion of `send_data_` while send in progress or
+  // pending, otherwise empty.
+  absl::string_view send_remaining_;
+};
+
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_CORE_IO_EVENT_LOOP_TCP_CLIENT_SOCKET_H_
diff --git a/quiche/quic/core/io/event_loop_tcp_client_socket_test.cc b/quiche/quic/core/io/event_loop_tcp_client_socket_test.cc
new file mode 100644
index 0000000..f3c0800
--- /dev/null
+++ b/quiche/quic/core/io/event_loop_tcp_client_socket_test.cc
@@ -0,0 +1,523 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/core/io/event_loop_tcp_client_socket.h"
+
+#include <functional>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "absl/functional/bind_front.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "absl/types/span.h"
+#include "quiche/quic/core/io/event_loop_socket_factory.h"
+#include "quiche/quic/core/io/quic_default_event_loop.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/io/socket.h"
+#include "quiche/quic/core/io/stream_client_socket.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/platform/api/quic_ip_address_family.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/quic/test_tools/mock_clock.h"
+#include "quiche/quic/test_tools/quic_test_utils.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+#include "quiche/common/platform/api/quiche_mutex.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/platform/api/quiche_test_loopback.h"
+#include "quiche/common/platform/api/quiche_thread.h"
+#include "quiche/common/simple_buffer_allocator.h"
+
+namespace quic::test {
+namespace {
+
+bool CreateListeningServerSocket(SocketFd* out_socket_descriptor,
+                                 QuicSocketAddress* out_socket_address) {
+  QUICHE_CHECK(out_socket_descriptor);
+  QUICHE_CHECK(out_socket_address);
+
+  absl::StatusOr<SocketFd> socket = socket_api::CreateSocket(
+      quiche::TestLoopback().address_family(), socket_api::SocketProtocol::kTcp,
+      /*blocking=*/true);
+  QUICHE_CHECK(socket.ok());
+
+  // Set an extremely small receive buffer size to increase the odds of buffers
+  // filling up when testing asynchronous writes.
+  static const QuicByteCount kReceiveBufferSize = 2;
+  absl::Status result =
+      socket_api::SetReceiveBufferSize(socket.value(), kReceiveBufferSize);
+  QUICHE_CHECK(result.ok());
+
+  QuicSocketAddress bind_address(quiche::TestLoopback(), /*port=*/0);
+  result = socket_api::Bind(socket.value(), bind_address);
+  QUICHE_CHECK(result.ok());
+
+  absl::StatusOr<QuicSocketAddress> socket_address =
+      socket_api::GetSocketAddress(socket.value());
+  QUICHE_CHECK(socket_address.ok());
+
+  result = socket_api::Listen(socket.value(), /*backlog=*/1);
+  QUICHE_CHECK(result.ok());
+
+  *out_socket_descriptor = socket.value();
+  *out_socket_address = std::move(socket_address).value();
+  return true;
+}
+
+class TestTcpServerSocketRunner : public quiche::QuicheThread {
+ public:
+  using SocketBehavior = std::function<void(SocketFd connected_socket)>;
+
+  // On construction, spins a separate thread to accept a connection from
+  // `server_socket_descriptor`, runs `behavior` with that connection, and then
+  // closes the accepted connection socket. If `allow_accept_failure` is true,
+  // will silently stop if an error is encountered accepting the connection.
+  TestTcpServerSocketRunner(SocketFd server_socket_descriptor,
+                            SocketBehavior behavior,
+                            bool allow_accept_failure = false)
+      : QuicheThread("TestTcpServerSocketRunner"),
+        server_socket_descriptor_(server_socket_descriptor),
+        behavior_(std::move(behavior)),
+        allow_accept_failure_(allow_accept_failure) {
+    Start();
+  }
+
+  ~TestTcpServerSocketRunner() override { WaitForCompletion(); }
+
+  void WaitForCompletion() { completion_notification_.WaitForNotification(); }
+
+ protected:
+  void Run() override {
+    if (AcceptSocket()) {
+      behavior_(connection_socket_descriptor_);
+      CloseSocket();
+    } else {
+      QUICHE_CHECK(allow_accept_failure_);
+    }
+
+    completion_notification_.Notify();
+  }
+
+ private:
+  bool AcceptSocket() {
+    absl::StatusOr<socket_api::AcceptResult> connection_socket =
+        socket_api::Accept(server_socket_descriptor_, /*blocking=*/true);
+    if (connection_socket.ok()) {
+      connection_socket_descriptor_ = connection_socket.value().fd;
+    }
+    return connection_socket.ok();
+  }
+
+  void CloseSocket() {
+    QUICHE_CHECK(socket_api::Close(connection_socket_descriptor_).ok());
+  }
+
+  const SocketFd server_socket_descriptor_;
+  const SocketBehavior behavior_;
+  const bool allow_accept_failure_;
+
+  SocketFd connection_socket_descriptor_;
+
+  quiche::QuicheNotification completion_notification_;
+};
+
+class EventLoopTcpClientSocketTest
+    : public quiche::test::QuicheTestWithParam<QuicEventLoopFactory*>,
+      public StreamClientSocket::AsyncVisitor {
+ public:
+  void SetUp() override {
+    QUICHE_CHECK(CreateListeningServerSocket(&server_socket_descriptor_,
+                                             &server_socket_address_));
+  }
+
+  void TearDown() override {
+    if (server_socket_descriptor_ != kInvalidSocketFd) {
+      QUICHE_CHECK(socket_api::Close(server_socket_descriptor_).ok());
+    }
+  }
+
+  void ConnectComplete(absl::Status status) override {
+    QUICHE_CHECK(!connect_result_.has_value());
+    connect_result_ = std::move(status);
+  }
+
+  void ReceiveComplete(absl::StatusOr<quiche::QuicheMemSlice> data) override {
+    QUICHE_CHECK(!receive_result_.has_value());
+    receive_result_ = std::move(data);
+  }
+
+  void SendComplete(absl::Status status) override {
+    QUICHE_CHECK(!send_result_.has_value());
+    send_result_ = std::move(status);
+  }
+
+ protected:
+  SocketFd server_socket_descriptor_ = kInvalidSocketFd;
+  QuicSocketAddress server_socket_address_;
+
+  MockClock clock_;
+  std::unique_ptr<QuicEventLoop> event_loop_ = GetParam()->Create(&clock_);
+  EventLoopSocketFactory socket_factory_{event_loop_.get(),
+                                         quiche::SimpleBufferAllocator::Get()};
+
+  absl::optional<absl::Status> connect_result_;
+  absl::optional<absl::StatusOr<quiche::QuicheMemSlice>> receive_result_;
+  absl::optional<absl::Status> send_result_;
+};
+
+std::string GetTestParamName(
+    ::testing::TestParamInfo<QuicEventLoopFactory*> info) {
+  return EscapeTestParamName(info.param->GetName());
+}
+
+INSTANTIATE_TEST_SUITE_P(EventLoopTcpClientSocketTests,
+                         EventLoopTcpClientSocketTest,
+                         ::testing::ValuesIn(GetAllSupportedEventLoops()),
+                         &GetTestParamName);
+
+TEST_P(EventLoopTcpClientSocketTest, Connect) {
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/nullptr);
+
+  // No socket runner to accept the connection for the server, but that is not
+  // expected to be necessary for the connection to complete from the client.
+  EXPECT_TRUE(socket->ConnectBlocking().ok());
+
+  socket->Disconnect();
+}
+
+TEST_P(EventLoopTcpClientSocketTest, ConnectAsync) {
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/this);
+
+  socket->ConnectAsync();
+
+  // Synchronous completion not normally expected, but since there is no known
+  // way to delay the server side of the connection (the OS does not wait for
+  // an accept() call), cannot be gauranteed that the connection will always
+  // complete asynchronously. If connecting asynchronously (normal behavior),
+  // expect completion once signalled by the event loop.
+  if (!connect_result_.has_value()) {
+    event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
+    ASSERT_TRUE(connect_result_.has_value());
+  }
+  EXPECT_TRUE(connect_result_.value().ok());
+
+  connect_result_.reset();
+  socket->Disconnect();
+  EXPECT_FALSE(connect_result_.has_value());
+}
+
+TEST_P(EventLoopTcpClientSocketTest, ErrorBeforeConnectAsync) {
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/this);
+
+  // Close the server socket.
+  EXPECT_TRUE(socket_api::Close(server_socket_descriptor_).ok());
+  server_socket_descriptor_ = kInvalidSocketFd;
+
+  socket->ConnectAsync();
+  if (!connect_result_.has_value()) {
+    event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
+    ASSERT_TRUE(connect_result_.has_value());
+  }
+
+  // Expect an error because server socket was closed before connection.
+  EXPECT_FALSE(connect_result_.value().ok());
+}
+
+TEST_P(EventLoopTcpClientSocketTest, ErrorDuringConnectAsync) {
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/this);
+
+  socket->ConnectAsync();
+
+  if (connect_result_.has_value()) {
+    // Not typical, but theoretically nothing to stop the connection from
+    // completing before the server socket is closed to trigger the error.
+    EXPECT_TRUE(connect_result_.value().ok());
+    return;
+  }
+
+  // Close the server socket.
+  EXPECT_TRUE(socket_api::Close(server_socket_descriptor_).ok());
+  server_socket_descriptor_ = kInvalidSocketFd;
+
+  // Expect an error once signalled.
+  EXPECT_FALSE(connect_result_.has_value());
+  event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
+  ASSERT_TRUE(connect_result_.has_value());
+  EXPECT_FALSE(connect_result_.value().ok());
+}
+
+TEST_P(EventLoopTcpClientSocketTest, Disconnect) {
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/nullptr);
+
+  ASSERT_TRUE(socket->ConnectBlocking().ok());
+  socket->Disconnect();
+}
+
+TEST_P(EventLoopTcpClientSocketTest, DisconnectCancelsConnectAsync) {
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/this);
+
+  socket->ConnectAsync();
+
+  if (connect_result_.has_value()) {
+    // Not typical, but theoretically nothing to stop the connection from
+    // completing before the server socket is closed to trigger the error.
+    EXPECT_TRUE(connect_result_.value().ok());
+    return;
+  }
+
+  socket->Disconnect();
+
+  // Expect immediate cancelled error.
+  ASSERT_TRUE(connect_result_.has_value());
+  EXPECT_TRUE(absl::IsCancelled(connect_result_.value()));
+}
+
+TEST_P(EventLoopTcpClientSocketTest, ConnectAndReconnect) {
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/nullptr);
+
+  ASSERT_TRUE(socket->ConnectBlocking().ok());
+  socket->Disconnect();
+
+  // Expect `socket` can reconnect now that it has been disconnected.
+  EXPECT_TRUE(socket->ConnectBlocking().ok());
+  socket->Disconnect();
+}
+
+void SendDataOnSocket(absl::string_view data, SocketFd connected_socket) {
+  while (!data.empty()) {
+    absl::StatusOr<absl::string_view> remainder =
+        socket_api::Send(connected_socket, data);
+    if (!remainder.ok()) {
+      return;
+    }
+    data = remainder.value();
+  }
+}
+
+TEST_P(EventLoopTcpClientSocketTest, Receive) {
+  std::string expected = {1, 2, 3, 4, 5, 6, 7, 8};
+  TestTcpServerSocketRunner runner(
+      server_socket_descriptor_, absl::bind_front(&SendDataOnSocket, expected));
+
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/nullptr);
+  ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+  std::string received;
+  absl::StatusOr<quiche::QuicheMemSlice> data;
+  do {
+    data = socket->ReceiveBlocking(100);
+    ASSERT_TRUE(data.ok());
+    received.append(data.value().data(), data.value().length());
+  } while (!data.value().empty());
+  EXPECT_EQ(received, expected);
+
+  socket->Disconnect();
+}
+
+TEST_P(EventLoopTcpClientSocketTest, ReceiveAsync) {
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/this);
+  ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+  // Start an async receive.  Expect no immediate results because runner not yet
+  // setup to accept and send.
+  socket->ReceiveAsync(100);
+  EXPECT_FALSE(receive_result_.has_value());
+
+  // Send data from server.
+  std::string expected = {1, 2, 3, 4, 5, 6, 7, 8};
+  TestTcpServerSocketRunner runner(
+      server_socket_descriptor_, absl::bind_front(&SendDataOnSocket, expected));
+  EXPECT_FALSE(receive_result_.has_value());
+  for (int i = 0; i < 5 && !receive_result_.has_value(); ++i) {
+    event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
+  }
+
+  // Expect to receive at least some of the sent data.
+  ASSERT_TRUE(receive_result_.has_value());
+  ASSERT_TRUE(receive_result_.value().ok());
+  EXPECT_FALSE(receive_result_.value().value().empty());
+  std::string received(receive_result_.value().value().data(),
+                       receive_result_.value().value().length());
+
+  // Get any remaining data via blocking calls.
+  absl::StatusOr<quiche::QuicheMemSlice> data;
+  do {
+    data = socket->ReceiveBlocking(100);
+    ASSERT_TRUE(data.ok());
+    received.append(data.value().data(), data.value().length());
+  } while (!data.value().empty());
+
+  EXPECT_EQ(received, expected);
+
+  receive_result_.reset();
+  socket->Disconnect();
+  EXPECT_FALSE(receive_result_.has_value());
+}
+
+TEST_P(EventLoopTcpClientSocketTest, DisconnectCancelsReceiveAsync) {
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/this);
+
+  ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+  // Start an asynchronous read, expecting no completion because server never
+  // sends any data.
+  socket->ReceiveAsync(100);
+  EXPECT_FALSE(receive_result_.has_value());
+
+  // Disconnect and expect an immediate cancelled error.
+  socket->Disconnect();
+  ASSERT_TRUE(receive_result_.has_value());
+  ASSERT_FALSE(receive_result_.value().ok());
+  EXPECT_TRUE(absl::IsCancelled(receive_result_.value().status()));
+}
+
+// Receive from `connected_socket` until connection is closed, writing received
+// data to `out_received`.
+void ReceiveDataFromSocket(std::string* out_received,
+                           SocketFd connected_socket) {
+  out_received->clear();
+
+  std::string buffer(100, 0);
+  absl::StatusOr<absl::Span<char>> received;
+  do {
+    received = socket_api::Receive(connected_socket, absl::MakeSpan(buffer));
+    QUICHE_CHECK(received.ok());
+    out_received->insert(out_received->end(), received.value().begin(),
+                         received.value().end());
+  } while (!received.value().empty());
+}
+
+TEST_P(EventLoopTcpClientSocketTest, Send) {
+  std::string sent;
+  TestTcpServerSocketRunner runner(
+      server_socket_descriptor_,
+      absl::bind_front(&ReceiveDataFromSocket, &sent));
+
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/0,
+                                            /*async_visitor=*/nullptr);
+  ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+  std::string expected = {1, 2, 3, 4, 5, 6, 7, 8};
+  EXPECT_TRUE(socket->SendBlocking(expected).ok());
+  socket->Disconnect();
+
+  runner.WaitForCompletion();
+  EXPECT_EQ(sent, expected);
+}
+
+TEST_P(EventLoopTcpClientSocketTest, SendAsync) {
+  // Use a small send buffer to improve chances of a send needing to be
+  // asynchronous.
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/4,
+                                            /*async_visitor=*/this);
+  ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+  std::string data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+  std::string expected;
+
+  // Repeatedly write to socket until it does not complete synchronously.
+  do {
+    expected.insert(expected.end(), data.begin(), data.end());
+    send_result_.reset();
+    socket->SendAsync(data);
+    ASSERT_TRUE(!send_result_.has_value() || send_result_.value().ok());
+  } while (send_result_.has_value());
+
+  // Begin receiving from server and expect more data to send.
+  std::string sent;
+  TestTcpServerSocketRunner runner(
+      server_socket_descriptor_,
+      absl::bind_front(&ReceiveDataFromSocket, &sent));
+  EXPECT_FALSE(send_result_.has_value());
+  for (int i = 0; i < 5 && !send_result_.has_value(); ++i) {
+    event_loop_->RunEventLoopOnce(QuicTime::Delta::FromSeconds(1));
+  }
+  ASSERT_TRUE(send_result_.has_value());
+  EXPECT_TRUE(send_result_.value().ok());
+
+  send_result_.reset();
+  socket->Disconnect();
+  EXPECT_FALSE(send_result_.has_value());
+
+  runner.WaitForCompletion();
+  EXPECT_EQ(sent, expected);
+}
+
+TEST_P(EventLoopTcpClientSocketTest, DisconnectCancelsSendAsync) {
+  // Use a small send buffer to improve chances of a send needing to be
+  // asynchronous.
+  std::unique_ptr<StreamClientSocket> socket =
+      socket_factory_.CreateTcpClientSocket(server_socket_address_,
+                                            /*receive_buffer_size=*/0,
+                                            /*send_buffer_size=*/4,
+                                            /*async_visitor=*/this);
+  ASSERT_TRUE(socket->ConnectBlocking().ok());
+
+  std::string data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+
+  // Repeatedly write to socket until it does not complete synchronously.
+  do {
+    send_result_.reset();
+    socket->SendAsync(data);
+    ASSERT_TRUE(!send_result_.has_value() || send_result_.value().ok());
+  } while (send_result_.has_value());
+
+  // Disconnect and expect immediate cancelled error.
+  socket->Disconnect();
+  ASSERT_TRUE(send_result_.has_value());
+  EXPECT_TRUE(absl::IsCancelled(send_result_.value()));
+}
+
+}  // namespace
+}  // namespace quic::test
diff --git a/quiche/quic/core/io/socket.h b/quiche/quic/core/io/socket.h
index 748de4d..fcc6418 100644
--- a/quiche/quic/core/io/socket.h
+++ b/quiche/quic/core/io/socket.h
@@ -5,6 +5,9 @@
 #ifndef QUICHE_QUIC_CORE_IO_SOCKET_H_
 #define QUICHE_QUIC_CORE_IO_SOCKET_H_
 
+#include <functional>
+#include <string>
+
 #include "absl/status/status.h"
 #include "absl/status/statusor.h"
 #include "absl/strings/string_view.h"
@@ -13,6 +16,7 @@
 #include "quiche/quic/platform/api/quic_ip_address_family.h"
 #include "quiche/quic/platform/api/quic_socket_address.h"
 #include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
 
 #if defined(_WIN32)
 #include <winsock2.h>
@@ -28,6 +32,57 @@
 inline constexpr SocketFd kInvalidSocketFd = -1;
 #endif
 
+// A read/write socket.
+//
+// Warning regarding blocking calls: Code in the QUICHE library typically
+// handles IO on a single thread, so if making calls from that typical
+// environment, it would be problematic to make a blocking call and block that
+// single thread.
+class QUICHE_EXPORT_PRIVATE Socket {
+ public:
+  class AsyncVisitor {
+   public:
+    virtual ~AsyncVisitor() = default;
+
+    // If the operation completed without error, `data` is set to the received
+    // data.
+    virtual void ReceiveComplete(
+        absl::StatusOr<quiche::QuicheMemSlice> data) = 0;
+
+    virtual void SendComplete(absl::Status status) = 0;
+  };
+
+  virtual ~Socket() = default;
+
+  // Blocking read. Receives and returns a buffer of up to `max_size` bytes from
+  // socket. Returns status on error.
+  virtual absl::StatusOr<quiche::QuicheMemSlice> ReceiveBlocking(
+      QuicByteCount max_size) = 0;
+
+  // Asynchronous read. Receives up to `max_size` bytes from socket. If
+  // no data is synchronously available to be read, waits until some data is
+  // available or the socket is closed. On completion, calls ReceiveComplete()
+  // on the visitor, potentially before return from ReceiveAsync().
+  //
+  // After calling, the socket must not be destroyed until ReceiveComplete() is
+  // called.
+  virtual void ReceiveAsync(QuicByteCount max_size) = 0;
+
+  // Blocking write. Sends all of `data` (potentially via multiple underlying
+  // socket sends).
+  virtual absl::Status SendBlocking(std::string data) = 0;
+  virtual absl::Status SendBlocking(quiche::QuicheMemSlice data) = 0;
+
+  // Asynchronous write. Sends all of `data` (potentially via multiple
+  // underlying socket sends). On completion, calls SendComplete() on the
+  // visitor, potentially before return from SendAsync().
+  //
+  // After calling, the socket must not be destroyed until SendComplete() is
+  // called.
+  virtual void SendAsync(std::string data) = 0;
+  virtual void SendAsync(quiche::QuicheMemSlice data) = 0;
+};
+
 // Low-level platform-agnostic socket operations. Closely follows the behavior
 // of basic POSIX socket APIs, diverging mostly only to convert to/from cleaner
 // and platform-agnostic types.
diff --git a/quiche/quic/core/io/socket_factory.h b/quiche/quic/core/io/socket_factory.h
new file mode 100644
index 0000000..39102f1
--- /dev/null
+++ b/quiche/quic/core/io/socket_factory.h
@@ -0,0 +1,34 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_IO_SOCKET_FACTORY_H_
+#define QUICHE_QUIC_CORE_IO_SOCKET_FACTORY_H_
+
+#include <memory>
+
+#include "quiche/quic/core/io/stream_client_socket.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_export.h"
+
+namespace quic {
+
+// A factory to create objects of type Socket and derived interfaces.
+class QUICHE_EXPORT_PRIVATE SocketFactory {
+ public:
+  virtual ~SocketFactory() = default;
+
+  // Will use platform default buffer size if `receive_buffer_size` or
+  // `send_buffer_size` is zero. If `async_visitor` is null, async operations
+  // must not be called on the created socket. If `async_visitor` is non-null,
+  // it must outlive the created socket.
+  virtual std::unique_ptr<StreamClientSocket> CreateTcpClientSocket(
+      const quic::QuicSocketAddress& peer_address,
+      QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
+      StreamClientSocket::AsyncVisitor* async_visitor) = 0;
+};
+
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_CORE_IO_SOCKET_FACTORY_H_
diff --git a/quiche/quic/core/io/stream_client_socket.h b/quiche/quic/core/io/stream_client_socket.h
new file mode 100644
index 0000000..00f9a42
--- /dev/null
+++ b/quiche/quic/core/io/stream_client_socket.h
@@ -0,0 +1,62 @@
+// Copyright 2022 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_CORE_IO_STREAM_CLIENT_SOCKET_H_
+#define QUICHE_QUIC_CORE_IO_STREAM_CLIENT_SOCKET_H_
+
+#include "absl/status/status.h"
+#include "quiche/quic/core/io/socket.h"
+#include "quiche/common/platform/api/quiche_export.h"
+
+namespace quic {
+
+// A client socket using a protocol (typically TCP) that provides
+// connection-based streams.
+//
+// Must not destroy a connected/connecting socket. If connected or connecting,
+// must call Disconnect() to disconnect or cancel the connection before
+// destruction.
+//
+// Warning regarding blocking calls: Code in the QUICHE library typically
+// handles IO on a single thread, so if making calls from that typical
+// environment, it would be problematic to make a blocking call and block that
+// single thread.
+class QUICHE_EXPORT_PRIVATE StreamClientSocket : public Socket {
+ public:
+  class AsyncVisitor : public Socket::AsyncVisitor {
+   public:
+    virtual void ConnectComplete(absl::Status status) = 0;
+  };
+
+  ~StreamClientSocket() override = default;
+
+  // Establishes a connection synchronously. Should not be called if socket has
+  // already been successfully connected without first calling Disconnect().
+  virtual absl::Status ConnectBlocking() = 0;
+
+  // Establishes a connection asynchronously. On completion, calls
+  // ConnectComplete() on the visitor, potentially before return from
+  // ConnectAsync(). Should not be called if socket has already been
+  // successfully connected without first calling Disconnect().
+  //
+  // After calling, the socket must not be destroyed until Disconnect() is
+  // called.
+  virtual void ConnectAsync() = 0;
+
+  // Disconnects a connected socket or cancels an in-progress ConnectAsync(),
+  // invoking the `ConnectComplete(absl::CancelledError())` on the visitor.
+  // After success, it is possible to call ConnectBlocking() or ConnectAsync()
+  // again to establish a new connection. Cancels any pending read or write
+  // operations, calling visitor completion methods with
+  // `absl::CancelledError()`.
+  //
+  // Typically implemented via a call to ::close(), which for TCP can result in
+  // either FIN or RST, depending on socket/platform state and undefined
+  // platform behavior.
+  virtual void Disconnect() = 0;
+};
+
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_CORE_IO_STREAM_CLIENT_SOCKET_H_