Implement MoqtClient and MoqtServer

Based on cl/578655843

PiperOrigin-RevId: 582112382
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 1038ec4..0ba5220 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -696,6 +696,7 @@
     "quic/tools/quic_backend_response.h",
     "quic/tools/quic_client_base.h",
     "quic/tools/quic_client_factory.h",
+    "quic/tools/quic_event_loop_tools.h",
     "quic/tools/quic_memory_cache_backend.h",
     "quic/tools/quic_name_lookup.h",
     "quic/tools/quic_simple_client_session.h",
@@ -710,6 +711,7 @@
     "quic/tools/quic_tcp_like_trace_converter.h",
     "quic/tools/quic_url.h",
     "quic/tools/simple_ticket_crypter.h",
+    "quic/tools/web_transport_only_backend.h",
     "quic/tools/web_transport_test_visitors.h",
 ]
 quiche_tool_support_srcs = [
@@ -733,6 +735,7 @@
     "quic/tools/quic_tcp_like_trace_converter.cc",
     "quic/tools/quic_url.cc",
     "quic/tools/simple_ticket_crypter.cc",
+    "quic/tools/web_transport_only_backend.cc",
 ]
 quiche_test_support_hdrs = [
     "common/platform/api/quiche_expect_bug.h",
@@ -1470,6 +1473,8 @@
     "quic/moqt/moqt_parser.h",
     "quic/moqt/moqt_session.h",
     "quic/moqt/test_tools/moqt_test_message.h",
+    "quic/moqt/tools/moqt_client.h",
+    "quic/moqt/tools/moqt_server.h",
 ]
 moqt_srcs = [
     "quic/moqt/moqt_framer.cc",
@@ -1479,6 +1484,9 @@
     "quic/moqt/moqt_parser.cc",
     "quic/moqt/moqt_parser_test.cc",
     "quic/moqt/moqt_session.cc",
+    "quic/moqt/tools/moqt_client.cc",
+    "quic/moqt/tools/moqt_end_to_end_test.cc",
+    "quic/moqt/tools/moqt_server.cc",
 ]
 binary_http_hdrs = [
     "binary_http/binary_http_message.h",
diff --git a/build/source_list.gni b/build/source_list.gni
index ad82d4d..69e5f27 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -696,6 +696,7 @@
     "src/quiche/quic/tools/quic_backend_response.h",
     "src/quiche/quic/tools/quic_client_base.h",
     "src/quiche/quic/tools/quic_client_factory.h",
+    "src/quiche/quic/tools/quic_event_loop_tools.h",
     "src/quiche/quic/tools/quic_memory_cache_backend.h",
     "src/quiche/quic/tools/quic_name_lookup.h",
     "src/quiche/quic/tools/quic_simple_client_session.h",
@@ -710,6 +711,7 @@
     "src/quiche/quic/tools/quic_tcp_like_trace_converter.h",
     "src/quiche/quic/tools/quic_url.h",
     "src/quiche/quic/tools/simple_ticket_crypter.h",
+    "src/quiche/quic/tools/web_transport_only_backend.h",
     "src/quiche/quic/tools/web_transport_test_visitors.h",
 ]
 quiche_tool_support_srcs = [
@@ -733,6 +735,7 @@
     "src/quiche/quic/tools/quic_tcp_like_trace_converter.cc",
     "src/quiche/quic/tools/quic_url.cc",
     "src/quiche/quic/tools/simple_ticket_crypter.cc",
+    "src/quiche/quic/tools/web_transport_only_backend.cc",
 ]
 quiche_test_support_hdrs = [
     "src/quiche/common/platform/api/quiche_expect_bug.h",
@@ -1474,6 +1477,8 @@
     "src/quiche/quic/moqt/moqt_parser.h",
     "src/quiche/quic/moqt/moqt_session.h",
     "src/quiche/quic/moqt/test_tools/moqt_test_message.h",
+    "src/quiche/quic/moqt/tools/moqt_client.h",
+    "src/quiche/quic/moqt/tools/moqt_server.h",
 ]
 moqt_srcs = [
     "src/quiche/quic/moqt/moqt_framer.cc",
@@ -1483,6 +1488,9 @@
     "src/quiche/quic/moqt/moqt_parser.cc",
     "src/quiche/quic/moqt/moqt_parser_test.cc",
     "src/quiche/quic/moqt/moqt_session.cc",
+    "src/quiche/quic/moqt/tools/moqt_client.cc",
+    "src/quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
+    "src/quiche/quic/moqt/tools/moqt_server.cc",
 ]
 binary_http_hdrs = [
     "src/quiche/binary_http/binary_http_message.h",
diff --git a/build/source_list.json b/build/source_list.json
index 3e2d6f3..165f594 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -695,6 +695,7 @@
     "quiche/quic/tools/quic_backend_response.h",
     "quiche/quic/tools/quic_client_base.h",
     "quiche/quic/tools/quic_client_factory.h",
+    "quiche/quic/tools/quic_event_loop_tools.h",
     "quiche/quic/tools/quic_memory_cache_backend.h",
     "quiche/quic/tools/quic_name_lookup.h",
     "quiche/quic/tools/quic_simple_client_session.h",
@@ -709,6 +710,7 @@
     "quiche/quic/tools/quic_tcp_like_trace_converter.h",
     "quiche/quic/tools/quic_url.h",
     "quiche/quic/tools/simple_ticket_crypter.h",
+    "quiche/quic/tools/web_transport_only_backend.h",
     "quiche/quic/tools/web_transport_test_visitors.h"
   ],
   "quiche_tool_support_srcs": [
@@ -731,7 +733,8 @@
     "quiche/quic/tools/quic_spdy_client_base.cc",
     "quiche/quic/tools/quic_tcp_like_trace_converter.cc",
     "quiche/quic/tools/quic_url.cc",
-    "quiche/quic/tools/simple_ticket_crypter.cc"
+    "quiche/quic/tools/simple_ticket_crypter.cc",
+    "quiche/quic/tools/web_transport_only_backend.cc"
   ],
   "quiche_test_support_hdrs": [
     "quiche/common/platform/api/quiche_expect_bug.h",
@@ -1472,7 +1475,9 @@
     "quiche/quic/moqt/moqt_messages.h",
     "quiche/quic/moqt/moqt_parser.h",
     "quiche/quic/moqt/moqt_session.h",
-    "quiche/quic/moqt/test_tools/moqt_test_message.h"
+    "quiche/quic/moqt/test_tools/moqt_test_message.h",
+    "quiche/quic/moqt/tools/moqt_client.h",
+    "quiche/quic/moqt/tools/moqt_server.h"
   ],
   "moqt_srcs": [
     "quiche/quic/moqt/moqt_framer.cc",
@@ -1481,7 +1486,10 @@
     "quiche/quic/moqt/moqt_messages.cc",
     "quiche/quic/moqt/moqt_parser.cc",
     "quiche/quic/moqt/moqt_parser_test.cc",
-    "quiche/quic/moqt/moqt_session.cc"
+    "quiche/quic/moqt/moqt_session.cc",
+    "quiche/quic/moqt/tools/moqt_client.cc",
+    "quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
+    "quiche/quic/moqt/tools/moqt_server.cc"
   ],
   "binary_http_hdrs": [
     "quiche/binary_http/binary_http_message.h"
diff --git a/quiche/quic/core/http/quic_spdy_session.h b/quiche/quic/core/http/quic_spdy_session.h
index 290b640..bf6d82b 100644
--- a/quiche/quic/core/http/quic_spdy_session.h
+++ b/quiche/quic/core/http/quic_spdy_session.h
@@ -469,6 +469,9 @@
 
   void OnConfigNegotiated() override;
 
+  // Returns true if the SETTINGS frame has been received from the peer.
+  bool settings_received() const { return settings_received_; }
+
  protected:
   // Override CreateIncomingStream(), CreateOutgoingBidirectionalStream() and
   // CreateOutgoingUnidirectionalStream() with QuicSpdyStream return type to
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 0fa4245..8efefec 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -27,6 +27,18 @@
 using ::testing::_;
 using ::testing::Assign;
 
+struct MockSessionCallbacks {
+  testing::MockFunction<void()> session_established_callback;
+  testing::MockFunction<void(absl::string_view)> session_terminated_callback;
+  testing::MockFunction<void()> session_deleted_callback;
+
+  MoqtSessionCallbacks AsSessionCallbacks() {
+    return MoqtSessionCallbacks{session_established_callback.AsStdFunction(),
+                                session_terminated_callback.AsStdFunction(),
+                                session_deleted_callback.AsStdFunction()};
+  }
+};
+
 class ClientEndpoint : public quic::simulator::QuicEndpointWithConnection {
  public:
   ClientEndpoint(Simulator* simulator, const std::string& name,
@@ -44,23 +56,21 @@
             MoqtSessionParameters{.version = version,
                                   .perspective = quic::Perspective::IS_CLIENT,
                                   .using_webtrans = false},
-            established_callback_.AsStdFunction(),
-            terminated_callback_.AsStdFunction()) {
+            callbacks_.AsSessionCallbacks()) {
     quic_session_.Initialize();
   }
 
   MoqtSession* session() { return &session_; }
   quic::QuicGenericClientSession* quic_session() { return &quic_session_; }
   testing::MockFunction<void()>& established_callback() {
-    return established_callback_;
+    return callbacks_.session_established_callback;
   }
   testing::MockFunction<void(absl::string_view)>& terminated_callback() {
-    return terminated_callback_;
+    return callbacks_.session_terminated_callback;
   }
 
  private:
-  testing::MockFunction<void()> established_callback_;
-  testing::MockFunction<void(absl::string_view)> terminated_callback_;
+  MockSessionCallbacks callbacks_;
   quic::QuicCryptoClientConfig crypto_config_;
   quic::QuicGenericClientSession quic_session_;
   MoqtSession session_;
@@ -88,22 +98,20 @@
             MoqtSessionParameters{.version = version,
                                   .perspective = quic::Perspective::IS_SERVER,
                                   .using_webtrans = false},
-            established_callback_.AsStdFunction(),
-            terminated_callback_.AsStdFunction()) {
+            callbacks_.AsSessionCallbacks()) {
     quic_session_.Initialize();
   }
 
   MoqtSession* session() { return &session_; }
   testing::MockFunction<void()>& established_callback() {
-    return established_callback_;
+    return callbacks_.session_established_callback;
   }
   testing::MockFunction<void(absl::string_view)>& terminated_callback() {
-    return terminated_callback_;
+    return callbacks_.session_terminated_callback;
   }
 
  private:
-  testing::MockFunction<void()> established_callback_;
-  testing::MockFunction<void(absl::string_view)> terminated_callback_;
+  MockSessionCallbacks callbacks_;
   quic::QuicCompressedCertsCache compressed_certs_cache_;
   quic::QuicCryptoServerConfig crypto_config_;
   quic::QuicGenericServerSession quic_session_;
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 15948a1..3586a22 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -16,10 +16,15 @@
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/core/quic_versions.h"
 #include "quiche/common/platform/api/quiche_export.h"
 
 namespace moqt {
 
+inline constexpr quic::ParsedQuicVersionVector GetMoqtSupportedQuicVersions() {
+  return quic::ParsedQuicVersionVector{quic::ParsedQuicVersion::RFCv1()};
+}
+
 enum class MoqtVersion : uint64_t {
   kDraft01 = 0xff000001,
   kUnrecognizedVersionForTests = 0xfe0000ff,
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index f33c753..a13c14d 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -24,18 +24,31 @@
 using MoqtSessionEstablishedCallback = quiche::SingleUseCallback<void()>;
 using MoqtSessionTerminatedCallback =
     quiche::SingleUseCallback<void(absl::string_view error_message)>;
+using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>;
+
+// Callbacks for session-level events.
+struct MoqtSessionCallbacks {
+  MoqtSessionEstablishedCallback session_established_callback = +[] {};
+  MoqtSessionTerminatedCallback session_terminated_callback =
+      +[](absl::string_view) {};
+  MoqtSessionDeletedCallback session_deleted_callback = +[] {};
+};
 
 class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
  public:
   MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
-              MoqtSessionEstablishedCallback session_established_callback,
-              MoqtSessionTerminatedCallback session_terminated_callback)
+              MoqtSessionCallbacks callbacks)
       : session_(session),
         parameters_(parameters),
-        session_established_callback_(std::move(session_established_callback)),
-        session_terminated_callback_(std::move(session_terminated_callback)),
+        session_established_callback_(
+            std::move(callbacks.session_established_callback)),
+        session_terminated_callback_(
+            std::move(callbacks.session_terminated_callback)),
+        session_deleted_callback_(
+            std::move(callbacks.session_deleted_callback)),
         framer_(quiche::SimpleBufferAllocator::Get(),
                 parameters.using_webtrans) {}
+  ~MoqtSession() { std::move(session_deleted_callback_)(); }
 
   // webtransport::SessionVisitor implementation.
   void OnSessionReady() override;
@@ -109,6 +122,7 @@
   MoqtSessionParameters parameters_;
   MoqtSessionEstablishedCallback session_established_callback_;
   MoqtSessionTerminatedCallback session_terminated_callback_;
+  MoqtSessionDeletedCallback session_deleted_callback_;
   MoqtFramer framer_;
 
   std::optional<webtransport::StreamId> control_stream_;
diff --git a/quiche/quic/moqt/tools/moqt_client.cc b/quiche/quic/moqt/tools/moqt_client.cc
new file mode 100644
index 0000000..2ed51ec
--- /dev/null
+++ b/quiche/quic/moqt/tools/moqt_client.cc
@@ -0,0 +1,111 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/tools/moqt_client.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/crypto/proof_verifier.h"
+#include "quiche/quic/core/http/quic_spdy_client_stream.h"
+#include "quiche/quic/core/http/web_transport_http3.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/quic_server_id.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/quic/tools/quic_default_client.h"
+#include "quiche/quic/tools/quic_event_loop_tools.h"
+#include "quiche/quic/tools/quic_name_lookup.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/spdy/core/http2_header_block.h"
+
+namespace moqt {
+
+MoqtClient::MoqtClient(quic::QuicSocketAddress peer_address,
+                       const quic::QuicServerId& server_id,
+                       std::unique_ptr<quic::ProofVerifier> proof_verifier,
+                       quic::QuicEventLoop* event_loop)
+    : spdy_client_(peer_address, server_id, GetMoqtSupportedQuicVersions(),
+                   event_loop, std::move(proof_verifier)) {
+  spdy_client_.set_enable_web_transport(true);
+}
+
+void MoqtClient::Connect(std::string path, MoqtSessionCallbacks callbacks) {
+  absl::Status status = ConnectInner(std::move(path), callbacks);
+  if (!status.ok()) {
+    std::move(callbacks.session_terminated_callback)(status.message());
+  }
+}
+
+absl::Status MoqtClient::ConnectInner(std::string path,
+                                      MoqtSessionCallbacks& callbacks) {
+  if (!spdy_client_.Initialize()) {
+    return absl::InternalError("Initialization failed");
+  }
+  if (!spdy_client_.Connect()) {
+    return absl::UnavailableError("Failed to establish a QUIC connection");
+  }
+  bool settings_received = quic::ProcessEventsUntil(
+      spdy_client_.default_network_helper()->event_loop(),
+      [&] { return spdy_client_.client_session()->settings_received(); });
+  if (!settings_received) {
+    return absl::UnavailableError(
+        "Timed out while waiting for server SETTINGS");
+  }
+  if (!spdy_client_.client_session()->SupportsWebTransport()) {
+    QUICHE_DLOG(INFO) << "session: SupportsWebTransport = "
+                      << spdy_client_.client_session()->SupportsWebTransport()
+                      << ", SupportsH3Datagram = "
+                      << spdy_client_.client_session()->SupportsH3Datagram()
+                      << ", OneRttKeysAvailable = "
+                      << spdy_client_.client_session()->OneRttKeysAvailable();
+    return absl::FailedPreconditionError(
+        "Server does not support WebTransport");
+  }
+  auto* stream = static_cast<quic::QuicSpdyClientStream*>(
+      spdy_client_.client_session()->CreateOutgoingBidirectionalStream());
+  if (!stream) {
+    return absl::InternalError("Could not open a CONNECT stream");
+  }
+  spdy_client_.set_store_response(true);
+
+  spdy::Http2HeaderBlock headers;
+  headers[":scheme"] = "https";
+  headers[":authority"] = spdy_client_.server_id().host();
+  headers[":path"] = path;
+  headers[":method"] = "CONNECT";
+  headers[":protocol"] = "webtransport";
+  stream->SendRequest(std::move(headers), "", false);
+
+  quic::WebTransportHttp3* web_transport = stream->web_transport();
+  if (web_transport == nullptr) {
+    return absl::InternalError("Failed to initialize WebTransport session");
+  }
+
+  MoqtSessionParameters parameters;
+  parameters.version = MoqtVersion::kDraft01;
+  parameters.perspective = quic::Perspective::IS_CLIENT,
+  parameters.using_webtrans = true;
+  parameters.path = "";
+
+  // Ensure that we never have a dangling pointer to the session.
+  MoqtSessionDeletedCallback deleted_callback =
+      std::move(callbacks.session_deleted_callback);
+  callbacks.session_deleted_callback =
+      [this, old = std::move(deleted_callback)]() mutable {
+        session_ = nullptr;
+        std::move(old)();
+      };
+
+  web_transport->SetVisitor(std::make_unique<MoqtSession>(
+      web_transport, parameters, std::move(callbacks)));
+  return absl::OkStatus();
+}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/tools/moqt_client.h b/quiche/quic/moqt/tools/moqt_client.h
new file mode 100644
index 0000000..7959bf3
--- /dev/null
+++ b/quiche/quic/moqt/tools/moqt_client.h
@@ -0,0 +1,45 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_TOOLS_MOQT_CLIENT_H_
+#define QUICHE_QUIC_MOQT_TOOLS_MOQT_CLIENT_H_
+
+#include <memory>
+#include <string>
+
+#include "absl/status/status.h"
+#include "quiche/quic/core/crypto/proof_verifier.h"
+#include "quiche/quic/core/http/quic_spdy_client_session.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/quic/tools/quic_default_client.h"
+#include "quiche/common/platform/api/quiche_export.h"
+
+namespace moqt {
+
+// A synchronous MoQT client based on QuicDefaultClient.
+class MoqtClient {
+ public:
+  MoqtClient(quic::QuicSocketAddress peer_address,
+             const quic::QuicServerId& server_id,
+             std::unique_ptr<quic::ProofVerifier> proof_verifier,
+             quic::QuicEventLoop* event_loop);
+
+  // Establishes the connection to the specified endpoint. The errors are
+  // returned via the session termination callback.
+  void Connect(std::string path, MoqtSessionCallbacks callbacks);
+
+  MoqtSession* session() { return session_; }
+
+ private:
+  absl::Status ConnectInner(std::string path, MoqtSessionCallbacks& callbacks);
+
+  quic::QuicDefaultClient spdy_client_;
+  MoqtSession* session_ = nullptr;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_TOOLS_MOQT_CLIENT_H_
diff --git a/quiche/quic/moqt/tools/moqt_end_to_end_test.cc b/quiche/quic/moqt/tools/moqt_end_to_end_test.cc
new file mode 100644
index 0000000..0763085
--- /dev/null
+++ b/quiche/quic/moqt/tools/moqt_end_to_end_test.cc
@@ -0,0 +1,121 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// End-to-end test for MoqtClient/MoqtServer.
+//
+// IMPORTANT NOTE:
+// This test mostly exists to test the two classes mentioned above. When
+// possible, moqt_integration_test should be used instead, as it does not use
+// real clocks or I/O and thus has less overhead.
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/functional/bind_front.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/quic_server_id.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/tools/moqt_client.h"
+#include "quiche/quic/moqt/tools/moqt_server.h"
+#include "quiche/quic/platform/api/quic_ip_address.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/quic/platform/api/quic_test_loopback.h"
+#include "quiche/quic/test_tools/crypto_test_utils.h"
+#include "quiche/quic/tools/quic_event_loop_tools.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_callbacks.h"
+
+namespace moqt::test {
+namespace {
+
+constexpr absl::string_view kNotFoundPath = "/not-found";
+
+void UnexpectedClose(absl::string_view reason) {
+  ADD_FAILURE() << "Unexpected close of MoQT session with reason: " << reason;
+}
+
+class MoqtEndToEndTest : public quiche::test::QuicheTest {
+ public:
+  MoqtEndToEndTest()
+      : server_(quic::test::crypto_test_utils::ProofSourceForTesting(),
+                absl::bind_front(&MoqtEndToEndTest::ServerBackend, this)) {
+    quic::QuicIpAddress host = quic::TestLoopback();
+    bool success = server_.quic_server().CreateUDPSocketAndListen(
+        quic::QuicSocketAddress(host, /*port=*/0));
+    QUICHE_CHECK(success);
+    server_address_ =
+        quic::QuicSocketAddress(host, server_.quic_server().port());
+    event_loop_ = server_.quic_server().event_loop();
+  }
+
+  absl::StatusOr<MoqtSessionCallbacks> ServerBackend(absl::string_view path) {
+    QUICHE_LOG(INFO) << "Server: Received a request for path " << path;
+    if (path == kNotFoundPath) {
+      return absl::NotFoundError("404 test endpoint");
+    }
+    MoqtSessionCallbacks callbacks;
+    callbacks.session_established_callback = []() {
+      QUICHE_LOG(INFO) << "Server: session established";
+    };
+    callbacks.session_terminated_callback = [](absl::string_view reason) {
+      QUICHE_LOG(INFO) << "Server: session terminated with reason: " << reason;
+    };
+    return std::move(callbacks);
+  }
+
+  std::unique_ptr<MoqtClient> CreateClient() {
+    return std::make_unique<MoqtClient>(
+        server_address_, quic::QuicServerId("test.example.com", 443),
+        quic::test::crypto_test_utils::ProofVerifierForTesting(), event_loop_);
+  }
+
+  bool RunEventsUntil(quiche::UnretainedCallback<bool()> callback) {
+    return quic::ProcessEventsUntil(event_loop_, callback);
+  }
+
+ private:
+  MoqtServer server_;
+  quic::QuicEventLoop* event_loop_;
+  quic::QuicSocketAddress server_address_;
+};
+
+TEST_F(MoqtEndToEndTest, SuccessfulHandshake) {
+  MoqtSessionCallbacks callbacks;
+  bool established = false;
+  bool deleted = false;
+  callbacks.session_established_callback = [&] { established = true; };
+  callbacks.session_terminated_callback = UnexpectedClose;
+  callbacks.session_deleted_callback = [&] { deleted = true; };
+  std::unique_ptr<MoqtClient> client = CreateClient();
+  client->Connect("/test", std::move(callbacks));
+  bool success = RunEventsUntil([&] { return established; });
+  EXPECT_TRUE(success);
+  EXPECT_FALSE(deleted);
+  client.reset();
+  EXPECT_TRUE(deleted);
+}
+
+TEST_F(MoqtEndToEndTest, HandshakeFailed404) {
+  MoqtSessionCallbacks callbacks;
+  bool resolved = false;
+  callbacks.session_established_callback = [&] {
+    ADD_FAILURE() << "Established session when 404 expected";
+    resolved = true;
+  };
+  callbacks.session_terminated_callback = [&](absl::string_view error) {
+    resolved = true;
+  };
+  std::unique_ptr<MoqtClient> client = CreateClient();
+  client->Connect(std::string(kNotFoundPath), std::move(callbacks));
+  bool success = RunEventsUntil([&] { return resolved; });
+  EXPECT_TRUE(success);
+}
+
+}  // namespace
+}  // namespace moqt::test
diff --git a/quiche/quic/moqt/tools/moqt_server.cc b/quiche/quic/moqt/tools/moqt_server.cc
new file mode 100644
index 0000000..c2ec5fb
--- /dev/null
+++ b/quiche/quic/moqt/tools/moqt_server.cc
@@ -0,0 +1,48 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/tools/moqt_server.h"
+
+#include <memory>
+#include <utility>
+
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/crypto/proof_source.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/tools/quic_server.h"
+#include "quiche/quic/tools/web_transport_only_backend.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+namespace {
+quic::WebTransportRequestCallback CreateWebTransportCallback(
+    MoqtIncomingSessionCallback callback) {
+  return [callback = std::move(callback)](absl::string_view path,
+                                          webtransport::Session* session)
+             -> absl::StatusOr<std::unique_ptr<webtransport::SessionVisitor>> {
+    absl::StatusOr<MoqtSessionCallbacks> callbacks = callback(path);
+    if (!callbacks.ok()) {
+      return callbacks.status();
+    }
+    MoqtSessionParameters parameters;
+    parameters.perspective = quic::Perspective::IS_SERVER;
+    parameters.path = path;
+    parameters.using_webtrans = true;
+    parameters.version = MoqtVersion::kDraft01;
+    return std::make_unique<MoqtSession>(session, parameters,
+                                         *std::move(callbacks));
+  };
+}
+}  // namespace
+
+MoqtServer::MoqtServer(std::unique_ptr<quic::ProofSource> proof_source,
+                       MoqtIncomingSessionCallback callback)
+    : backend_(CreateWebTransportCallback(std::move(callback))),
+      server_(std::move(proof_source), &backend_) {}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/tools/moqt_server.h b/quiche/quic/moqt/tools/moqt_server.h
new file mode 100644
index 0000000..44c8fb8
--- /dev/null
+++ b/quiche/quic/moqt/tools/moqt_server.h
@@ -0,0 +1,42 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_TOOLS_MOQT_SERVER_H_
+#define QUICHE_QUIC_MOQT_TOOLS_MOQT_SERVER_H_
+
+#include <memory>
+
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/crypto/proof_source.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/tools/quic_server.h"
+#include "quiche/quic/tools/web_transport_only_backend.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_callbacks.h"
+
+namespace moqt {
+
+// A callback to provide MoQT handler based on the path in the request.
+using MoqtIncomingSessionCallback =
+    quiche::MultiUseCallback<absl::StatusOr<MoqtSessionCallbacks>(
+        absl::string_view path)>;
+
+// A simple MoQT server.
+class MoqtServer {
+ public:
+  explicit MoqtServer(std::unique_ptr<quic::ProofSource> proof_source,
+                      MoqtIncomingSessionCallback callback);
+
+  quic::QuicServer& quic_server() { return server_; }
+
+ private:
+  quic::WebTransportOnlyBackend backend_;
+  quic::QuicServer server_;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_TOOLS_MOQT_SERVER_H_
diff --git a/quiche/quic/tools/quic_event_loop_tools.h b/quiche/quic/tools/quic_event_loop_tools.h
new file mode 100644
index 0000000..1bf1d28
--- /dev/null
+++ b/quiche/quic/tools/quic_event_loop_tools.h
@@ -0,0 +1,40 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_TOOLS_QUIC_EVENT_LOOP_TOOLS_H_
+#define QUICHE_QUIC_TOOLS_QUIC_EVENT_LOOP_TOOLS_H_
+
+#include "absl/base/attributes.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/quic_clock.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/common/quiche_callbacks.h"
+
+namespace quic {
+
+inline constexpr QuicTimeDelta kDefaultTimeoutForTools =
+    QuicTimeDelta::FromSeconds(3);
+inline constexpr QuicTimeDelta kDefaultEventLoopTimeoutForTools =
+    QuicTimeDelta::FromMilliseconds(50);
+
+// Runs the event loop until the specified callback returns true, or until the
+// timeout occurs. Returns true if callback returned true at least once.
+ABSL_MUST_USE_RESULT inline bool ProcessEventsUntil(
+    QuicEventLoop* event_loop, quiche::UnretainedCallback<bool()> callback,
+    QuicTimeDelta timeout = kDefaultTimeoutForTools) {
+  const QuicClock* clock = event_loop->GetClock();
+  QuicTime start = clock->Now();
+  while (!callback()) {
+    event_loop->RunEventLoopOnce(kDefaultEventLoopTimeoutForTools);
+    QuicTimeDelta elapsed = clock->Now() - start;
+    if (elapsed >= timeout) {
+      return false;
+    }
+  }
+  return true;
+}
+
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_TOOLS_QUIC_EVENT_LOOP_TOOLS_H_
diff --git a/quiche/quic/tools/web_transport_only_backend.cc b/quiche/quic/tools/web_transport_only_backend.cc
new file mode 100644
index 0000000..d908de3
--- /dev/null
+++ b/quiche/quic/tools/web_transport_only_backend.cc
@@ -0,0 +1,68 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/tools/web_transport_only_backend.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "quiche/quic/tools/quic_backend_response.h"
+#include "quiche/common/http/http_header_block.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace quic {
+
+void WebTransportOnlyBackend::FetchResponseFromBackend(
+    const quiche::HttpHeaderBlock&, const std::string&,
+    RequestHandler* request_handler) {
+  static QuicBackendResponse* response = []() {
+    quiche::HttpHeaderBlock headers;
+    headers[":status"] = "405";  // 405 Method Not Allowed
+    headers["content-type"] = "text/plain";
+    auto response = std::make_unique<QuicBackendResponse>();
+    response->set_headers(std::move(headers));
+    response->set_body("This endpoint only accepts WebTransport requests");
+    return response.release();
+  }();
+  request_handler->OnResponseBackendComplete(response);
+}
+
+WebTransportOnlyBackend::WebTransportResponse
+WebTransportOnlyBackend::ProcessWebTransportRequest(
+    const quiche::HttpHeaderBlock& request_headers,
+    webtransport::Session* session) {
+  WebTransportResponse response;
+
+  auto path = request_headers.find(":path");
+  if (path == request_headers.end()) {
+    response.response_headers[":status"] = "400";
+    return response;
+  }
+
+  absl::StatusOr<std::unique_ptr<webtransport::SessionVisitor>> processed =
+      callback_(path->second, session);
+  switch (processed.status().code()) {
+    case absl::StatusCode::kOk:
+      response.response_headers[":status"] = "200";
+      response.visitor = *std::move(processed);
+      return response;
+    case absl::StatusCode::kNotFound:
+      response.response_headers[":status"] = "404";
+      return response;
+    case absl::StatusCode::kInvalidArgument:
+      response.response_headers[":status"] = "400";
+      return response;
+    case absl::StatusCode::kResourceExhausted:
+      response.response_headers[":status"] = "429";
+      return response;
+    default:
+      response.response_headers[":status"] = "500";
+      return response;
+  }
+}
+
+}  // namespace quic
diff --git a/quiche/quic/tools/web_transport_only_backend.h b/quiche/quic/tools/web_transport_only_backend.h
new file mode 100644
index 0000000..cd37672
--- /dev/null
+++ b/quiche/quic/tools/web_transport_only_backend.h
@@ -0,0 +1,51 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_TOOLS_WEB_TRANSPORT_ONLY_BACKEND_H_
+#define QUICHE_QUIC_TOOLS_WEB_TRANSPORT_ONLY_BACKEND_H_
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/web_transport_interface.h"
+#include "quiche/quic/tools/quic_simple_server_backend.h"
+#include "quiche/common/quiche_callbacks.h"
+#include "quiche/web_transport/web_transport.h"
+#include "quiche/spdy/core/http2_header_block.h"
+
+namespace quic {
+
+// A callback to create a WebTransport session visitor for a given path and the
+// session object. The path includes both the path and the query.
+using WebTransportRequestCallback = quiche::MultiUseCallback<
+    absl::StatusOr<std::unique_ptr<webtransport::SessionVisitor>>(
+        absl::string_view path, WebTransportSession* session)>;
+
+class WebTransportOnlyBackend : public QuicSimpleServerBackend {
+ public:
+  explicit WebTransportOnlyBackend(WebTransportRequestCallback callback)
+      : callback_(std::move(callback)) {}
+
+  // QuicSimpleServerBackend implementation.
+  bool InitializeBackend(const std::string&) override { return true; }
+  bool IsBackendInitialized() const override { return true; }
+  void FetchResponseFromBackend(const spdy::Http2HeaderBlock&,
+                                const std::string&,
+                                RequestHandler* request_handler) override;
+  void CloseBackendResponseStream(RequestHandler*) override {}
+  bool SupportsWebTransport() override { return true; }
+  WebTransportResponse ProcessWebTransportRequest(
+      const spdy::Http2HeaderBlock& request_headers,
+      WebTransportSession* session) override;
+
+ private:
+  WebTransportRequestCallback callback_;
+};
+
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_TOOLS_WEB_TRANSPORT_ONLY_BACKEND_H_
diff --git a/quiche/quic/tools/web_transport_test_server.cc b/quiche/quic/tools/web_transport_test_server.cc
index 3ba29a3..a7c1794 100644
--- a/quiche/quic/tools/web_transport_test_server.cc
+++ b/quiche/quic/tools/web_transport_test_server.cc
@@ -8,12 +8,11 @@
 #include "absl/status/statusor.h"
 #include "absl/strings/numbers.h"
 #include "absl/strings/string_view.h"
-#include "quiche/quic/core/quic_error_codes.h"
 #include "quiche/quic/core/web_transport_interface.h"
 #include "quiche/quic/platform/api/quic_socket_address.h"
 #include "quiche/quic/tools/devious_baton.h"
 #include "quiche/quic/tools/quic_server.h"
-#include "quiche/quic/tools/quic_simple_server_backend.h"
+#include "quiche/quic/tools/web_transport_only_backend.h"
 #include "quiche/quic/tools/web_transport_test_visitors.h"
 #include "quiche/common/platform/api/quiche_command_line_flags.h"
 #include "quiche/common/platform/api/quiche_default_proof_providers.h"
@@ -30,7 +29,12 @@
 namespace {
 
 absl::StatusOr<std::unique_ptr<webtransport::SessionVisitor>> ProcessRequest(
-    const GURL& url, WebTransportSession* session) {
+    absl::string_view path, WebTransportSession* session) {
+  GURL url(absl::StrCat("https://localhost", path));
+  if (!url.is_valid()) {
+    return absl::InvalidArgumentError("Unable to parse the :path");
+  }
+
   if (url.path() == "/webtransport/echo") {
     return std::make_unique<EchoWebTransportSessionVisitor>(session);
   }
@@ -70,59 +74,13 @@
   return absl::NotFoundError("Path not found");
 }
 
-class WebTransportTestBackend : public QuicSimpleServerBackend {
- public:
-  bool InitializeBackend(const std::string&) override { return true; }
-  bool IsBackendInitialized() const override { return true; }
-  void FetchResponseFromBackend(const spdy::Http2HeaderBlock&,
-                                const std::string&,
-                                RequestHandler* request_handler) override {
-    request_handler->TerminateStreamWithError(
-        QuicResetStreamError::FromInternal(QUIC_STREAM_INTERNAL_ERROR));
-  }
-  void CloseBackendResponseStream(RequestHandler*) override {}
-  bool SupportsWebTransport() override { return true; }
-  WebTransportResponse ProcessWebTransportRequest(
-      const spdy::Http2HeaderBlock& request_headers,
-      WebTransportSession* session) override {
-    WebTransportResponse response;
-    response.response_headers[":status"] = "400";
-
-    auto path = request_headers.find(":path");
-    if (path == request_headers.end()) {
-      return response;
-    }
-    GURL url(absl::StrCat("https://localhost", path->second));
-    if (!url.is_valid()) {
-      return response;
-    }
-    absl::StatusOr<std::unique_ptr<webtransport::SessionVisitor>> processed =
-        ProcessRequest(url, session);
-    switch (processed.status().code()) {
-      case absl::StatusCode::kOk:
-        response.response_headers[":status"] = "200";
-        response.visitor = *std::move(processed);
-        return response;
-      case absl::StatusCode::kNotFound:
-        response.response_headers[":status"] = "404";
-        return response;
-      case absl::StatusCode::kInvalidArgument:
-        response.response_headers[":status"] = "400";
-        return response;
-      default:
-        response.response_headers[":status"] = "500";
-        return response;
-    }
-  }
-};
-
 int Main(int argc, char** argv) {
   quiche::QuicheSystemEventLoop event_loop("web_transport_test_server");
   const char* usage = "Usage: web_transport_test_server [options]";
   std::vector<std::string> non_option_args =
       quiche::QuicheParseCommandLineFlags(usage, argc, argv);
 
-  WebTransportTestBackend backend;
+  WebTransportOnlyBackend backend(ProcessRequest);
   QuicServer server(quiche::CreateDefaultProofSource(), &backend);
   quic::QuicSocketAddress addr(quic::QuicIpAddress::Any6(),
                                quiche::GetQuicheCommandLineFlag(FLAGS_port));