Implement basic WebTransport over HTTP/2 functionality.

This includes datagrams, closing session and draining session, but does not include streams yet.

PiperOrigin-RevId: 582768912
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 0ba5220..e7806bf 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -392,6 +392,7 @@
     "spdy/core/spdy_protocol.h",
     "spdy/core/zero_copy_output_buffer.h",
     "web_transport/complete_buffer_visitor.h",
+    "web_transport/encapsulated/encapsulated_web_transport.h",
     "web_transport/web_transport.h",
 ]
 quiche_core_srcs = [
@@ -680,6 +681,7 @@
     "spdy/core/spdy_prefixed_buffer_reader.cc",
     "spdy/core/spdy_protocol.cc",
     "web_transport/complete_buffer_visitor.cc",
+    "web_transport/encapsulated/encapsulated_web_transport.cc",
 ]
 quiche_tool_support_hdrs = [
     "common/platform/api/quiche_command_line_flags.h",
@@ -742,6 +744,7 @@
     "common/platform/api/quiche_test.h",
     "common/platform/api/quiche_test_loopback.h",
     "common/platform/api/quiche_test_output.h",
+    "common/test_tools/mock_streams.h",
     "common/test_tools/quiche_test_utils.h",
     "http2/adapter/mock_http2_visitor.h",
     "http2/adapter/recording_http2_visitor.h",
@@ -1072,6 +1075,7 @@
     "common/simple_buffer_allocator_test.cc",
     "common/structured_headers_generated_test.cc",
     "common/structured_headers_test.cc",
+    "common/test_tools/mock_streams_test.cc",
     "common/test_tools/quiche_test_utils_test.cc",
     "common/wire_serialization_test.cc",
     "http2/adapter/event_forwarder_test.cc",
@@ -1302,6 +1306,7 @@
     "spdy/core/spdy_pinnable_buffer_piece_test.cc",
     "spdy/core/spdy_prefixed_buffer_reader_test.cc",
     "spdy/core/spdy_protocol_test.cc",
+    "web_transport/encapsulated/encapsulated_web_transport_test.cc",
 ]
 io_tests_hdrs = [
 ]
diff --git a/build/source_list.gni b/build/source_list.gni
index 69e5f27..da2d729 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -392,6 +392,7 @@
     "src/quiche/spdy/core/spdy_protocol.h",
     "src/quiche/spdy/core/zero_copy_output_buffer.h",
     "src/quiche/web_transport/complete_buffer_visitor.h",
+    "src/quiche/web_transport/encapsulated/encapsulated_web_transport.h",
     "src/quiche/web_transport/web_transport.h",
 ]
 quiche_core_srcs = [
@@ -680,6 +681,7 @@
     "src/quiche/spdy/core/spdy_prefixed_buffer_reader.cc",
     "src/quiche/spdy/core/spdy_protocol.cc",
     "src/quiche/web_transport/complete_buffer_visitor.cc",
+    "src/quiche/web_transport/encapsulated/encapsulated_web_transport.cc",
 ]
 quiche_tool_support_hdrs = [
     "src/quiche/common/platform/api/quiche_command_line_flags.h",
@@ -742,6 +744,7 @@
     "src/quiche/common/platform/api/quiche_test.h",
     "src/quiche/common/platform/api/quiche_test_loopback.h",
     "src/quiche/common/platform/api/quiche_test_output.h",
+    "src/quiche/common/test_tools/mock_streams.h",
     "src/quiche/common/test_tools/quiche_test_utils.h",
     "src/quiche/http2/adapter/mock_http2_visitor.h",
     "src/quiche/http2/adapter/recording_http2_visitor.h",
@@ -1073,6 +1076,7 @@
     "src/quiche/common/simple_buffer_allocator_test.cc",
     "src/quiche/common/structured_headers_generated_test.cc",
     "src/quiche/common/structured_headers_test.cc",
+    "src/quiche/common/test_tools/mock_streams_test.cc",
     "src/quiche/common/test_tools/quiche_test_utils_test.cc",
     "src/quiche/common/wire_serialization_test.cc",
     "src/quiche/http2/adapter/event_forwarder_test.cc",
@@ -1303,6 +1307,7 @@
     "src/quiche/spdy/core/spdy_pinnable_buffer_piece_test.cc",
     "src/quiche/spdy/core/spdy_prefixed_buffer_reader_test.cc",
     "src/quiche/spdy/core/spdy_protocol_test.cc",
+    "src/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc",
 ]
 io_tests_hdrs = [
 
diff --git a/build/source_list.json b/build/source_list.json
index 165f594..42f3fbb 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -391,6 +391,7 @@
     "quiche/spdy/core/spdy_protocol.h",
     "quiche/spdy/core/zero_copy_output_buffer.h",
     "quiche/web_transport/complete_buffer_visitor.h",
+    "quiche/web_transport/encapsulated/encapsulated_web_transport.h",
     "quiche/web_transport/web_transport.h"
   ],
   "quiche_core_srcs": [
@@ -678,7 +679,8 @@
     "quiche/spdy/core/spdy_pinnable_buffer_piece.cc",
     "quiche/spdy/core/spdy_prefixed_buffer_reader.cc",
     "quiche/spdy/core/spdy_protocol.cc",
-    "quiche/web_transport/complete_buffer_visitor.cc"
+    "quiche/web_transport/complete_buffer_visitor.cc",
+    "quiche/web_transport/encapsulated/encapsulated_web_transport.cc"
   ],
   "quiche_tool_support_hdrs": [
     "quiche/common/platform/api/quiche_command_line_flags.h",
@@ -741,6 +743,7 @@
     "quiche/common/platform/api/quiche_test.h",
     "quiche/common/platform/api/quiche_test_loopback.h",
     "quiche/common/platform/api/quiche_test_output.h",
+    "quiche/common/test_tools/mock_streams.h",
     "quiche/common/test_tools/quiche_test_utils.h",
     "quiche/http2/adapter/mock_http2_visitor.h",
     "quiche/http2/adapter/recording_http2_visitor.h",
@@ -1072,6 +1075,7 @@
     "quiche/common/simple_buffer_allocator_test.cc",
     "quiche/common/structured_headers_generated_test.cc",
     "quiche/common/structured_headers_test.cc",
+    "quiche/common/test_tools/mock_streams_test.cc",
     "quiche/common/test_tools/quiche_test_utils_test.cc",
     "quiche/common/wire_serialization_test.cc",
     "quiche/http2/adapter/event_forwarder_test.cc",
@@ -1301,7 +1305,8 @@
     "quiche/spdy/core/spdy_intrusive_list_test.cc",
     "quiche/spdy/core/spdy_pinnable_buffer_piece_test.cc",
     "quiche/spdy/core/spdy_prefixed_buffer_reader_test.cc",
-    "quiche/spdy/core/spdy_protocol_test.cc"
+    "quiche/spdy/core/spdy_protocol_test.cc",
+    "quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc"
   ],
   "io_tests_hdrs": [
 
diff --git a/quiche/common/capsule.cc b/quiche/common/capsule.cc
index 66d4a66..421f11d 100644
--- a/quiche/common/capsule.cc
+++ b/quiche/common/capsule.cc
@@ -4,8 +4,13 @@
 
 #include "quiche/common/capsule.h"
 
+#include <cstddef>
+#include <cstdint>
 #include <limits>
+#include <ostream>
+#include <string>
 #include <type_traits>
+#include <utility>
 
 #include "absl/status/status.h"
 #include "absl/status/statusor.h"
@@ -350,6 +355,17 @@
   }
 }
 
+QuicheBuffer SerializeDatagramCapsuleHeader(uint64_t datagram_size,
+                                            QuicheBufferAllocator* allocator) {
+  absl::StatusOr<QuicheBuffer> buffer =
+      SerializeIntoBuffer(allocator, WireVarInt62(CapsuleType::DATAGRAM),
+                          WireVarInt62(datagram_size));
+  if (!buffer.ok()) {
+    return QuicheBuffer();
+  }
+  return *std::move(buffer);
+}
+
 QuicheBuffer SerializeCapsule(const Capsule& capsule,
                               quiche::QuicheBufferAllocator* allocator) {
   absl::StatusOr<QuicheBuffer> serialized =
diff --git a/quiche/common/capsule.h b/quiche/common/capsule.h
index b5e1831..3cadc4d 100644
--- a/quiche/common/capsule.h
+++ b/quiche/common/capsule.h
@@ -391,6 +391,10 @@
 QUICHE_EXPORT quiche::QuicheBuffer SerializeCapsule(
     const Capsule& capsule, quiche::QuicheBufferAllocator* allocator);
 
+// Serializes the header for a datagram of size |datagram_size|.
+QUICHE_EXPORT QuicheBuffer SerializeDatagramCapsuleHeader(
+    uint64_t datagram_size, QuicheBufferAllocator* allocator);
+
 }  // namespace quiche
 
 #endif  // QUICHE_COMMON_CAPSULE_H_
diff --git a/quiche/common/capsule_test.cc b/quiche/common/capsule_test.cc
index ae55aaf..0aed714 100644
--- a/quiche/common/capsule_test.cc
+++ b/quiche/common/capsule_test.cc
@@ -10,8 +10,10 @@
 #include <vector>
 
 #include "absl/strings/escaping.h"
+#include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_ip_address.h"
 #include "quiche/common/simple_buffer_allocator.h"
 #include "quiche/common/test_tools/quiche_test_utils.h"
@@ -86,6 +88,16 @@
   TestSerialization(expected_capsule, capsule_fragment);
 }
 
+TEST_F(CapsuleTest, DatagramCapsuleViaHeader) {
+  std::string datagram_payload = absl::HexStringToBytes("a1a2a3a4a5a6a7a8");
+  quiche::QuicheBuffer expected_capsule = SerializeCapsule(
+      Capsule::Datagram(datagram_payload), SimpleBufferAllocator::Get());
+  quiche::QuicheBuffer actual_header = SerializeDatagramCapsuleHeader(
+      datagram_payload.size(), SimpleBufferAllocator::Get());
+  EXPECT_EQ(expected_capsule.AsStringView(),
+            absl::StrCat(actual_header.AsStringView(), datagram_payload));
+}
+
 TEST_F(CapsuleTest, LegacyDatagramCapsule) {
   std::string capsule_fragment = absl::HexStringToBytes(
       "80ff37a0"          // LEGACY_DATAGRAM capsule type
diff --git a/quiche/common/test_tools/mock_streams.h b/quiche/common/test_tools/mock_streams.h
new file mode 100644
index 0000000..577a324
--- /dev/null
+++ b/quiche/common/test_tools/mock_streams.h
@@ -0,0 +1,101 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
+#define QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
+
+#include <algorithm>
+#include <cstddef>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_stream.h"
+
+namespace quiche::test {
+
+// Mockable stream that stores all of the data into an std::string by default.
+class MockWriteStream : public quiche::WriteStream {
+ public:
+  MockWriteStream() {
+    ON_CALL(*this, CanWrite()).WillByDefault(testing::Return(true));
+    ON_CALL(*this, Writev(testing::_, testing::_))
+        .WillByDefault([&](absl::Span<const absl::string_view> data,
+                           const StreamWriteOptions& options) {
+          return AppendToData(data, options);
+        });
+  }
+
+  MOCK_METHOD(absl::Status, Writev,
+              (absl::Span<const absl::string_view> data,
+               const StreamWriteOptions& options),
+              (override));
+  MOCK_METHOD(bool, CanWrite, (), (const, override));
+
+  absl::Status AppendToData(absl::Span<const absl::string_view> data,
+                            const StreamWriteOptions& options) {
+    for (absl::string_view fragment : data) {
+      data_.append(fragment.data(), fragment.size());
+    }
+    ProcessOptions(options);
+    return absl::OkStatus();
+  }
+  void ProcessOptions(const StreamWriteOptions& options) {
+    fin_written_ |= options.send_fin();
+  }
+
+  std::string& data() { return data_; }
+  bool fin_written() { return fin_written_; }
+
+ private:
+  std::string data_;
+  bool fin_written_ = false;
+};
+
+// Reads stream data from an std::string buffer.
+class ReadStreamFromString : public ReadStream {
+ public:
+  explicit ReadStreamFromString(std::string* data) : data_(data) {}
+
+  ReadResult Read(absl::Span<char> buffer) override {
+    size_t data_to_copy = std::min(buffer.size(), data_->size());
+    std::copy(data_->begin(), data_->begin() + data_to_copy, buffer.begin());
+    *data_ = data_->substr(data_to_copy);
+    return ReadResult{data_to_copy, data_->empty() && fin_};
+  }
+  ReadResult Read(std::string* output) override {
+    size_t bytes = data_->size();
+    output->append(std::move(*data_));
+    data_->clear();
+    return ReadResult{bytes, fin_};
+  }
+
+  size_t ReadableBytes() const override { return data_->size(); }
+
+  virtual PeekResult PeekNextReadableRegion() const override {
+    PeekResult result;
+    result.peeked_data = *data_;
+    result.fin_next = data_->empty() && fin_;
+    result.all_data_received = fin_;
+    return result;
+  }
+
+  bool SkipBytes(size_t bytes) override {
+    *data_ = data_->substr(bytes);
+    return data_->empty() && fin_;
+  }
+
+  void set_fin() { fin_ = true; }
+
+ private:
+  std::string* data_;
+  bool fin_ = false;
+};
+
+}  // namespace quiche::test
+
+#endif  // QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
diff --git a/quiche/common/test_tools/mock_streams_test.cc b/quiche/common/test_tools/mock_streams_test.cc
new file mode 100644
index 0000000..2e8a2e4
--- /dev/null
+++ b/quiche/common/test_tools/mock_streams_test.cc
@@ -0,0 +1,63 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/common/test_tools/mock_streams.h"
+
+#include <array>
+#include <string>
+
+#include "absl/types/span.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/common/test_tools/quiche_test_utils.h"
+
+namespace quiche::test {
+namespace {
+
+using ::testing::ElementsAre;
+using ::testing::IsEmpty;
+
+TEST(MockWriteStreamTest, DefaultWrite) {
+  MockWriteStream stream;
+  QUICHE_EXPECT_OK(quiche::WriteIntoStream(stream, "test"));
+  EXPECT_EQ(stream.data(), "test");
+  EXPECT_FALSE(stream.fin_written());
+}
+
+TEST(ReadStreamFromStringTest, ReadIntoSpan) {
+  std::string source = "abcdef";
+  std::array<char, 3> buffer;
+  ReadStreamFromString stream(&source);
+  EXPECT_EQ(stream.ReadableBytes(), 6);
+
+  stream.Read(absl::MakeSpan(buffer));
+  EXPECT_THAT(buffer, ElementsAre('a', 'b', 'c'));
+  EXPECT_EQ(stream.ReadableBytes(), 3);
+
+  stream.Read(absl::MakeSpan(buffer));
+  EXPECT_THAT(buffer, ElementsAre('d', 'e', 'f'));
+  EXPECT_EQ(stream.ReadableBytes(), 0);
+  EXPECT_THAT(source, IsEmpty());
+}
+
+TEST(ReadStreamFromStringTest, ReadIntoString) {
+  std::string source = "abcdef";
+  std::string destination;
+  ReadStreamFromString stream(&source);
+  stream.Read(&destination);
+  EXPECT_EQ(destination, "abcdef");
+  EXPECT_THAT(source, IsEmpty());
+}
+
+TEST(ReadStreamFromStringTest, PeekAndSkip) {
+  std::string source = "abcdef";
+  ReadStreamFromString stream(&source);
+  EXPECT_EQ(stream.PeekNextReadableRegion().peeked_data, "abcdef");
+  stream.SkipBytes(2);
+  EXPECT_EQ(stream.PeekNextReadableRegion().peeked_data, "cdef");
+  EXPECT_EQ(source, "cdef");
+}
+
+}  // namespace
+}  // namespace quiche::test
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
new file mode 100644
index 0000000..0aa3763
--- /dev/null
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
@@ -0,0 +1,355 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
+
+#include <array>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/time.h"
+#include "absl/types/span.h"
+#include "quiche/common/capsule.h"
+#include "quiche/common/http/http_header_block.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_callbacks.h"
+#include "quiche/common/quiche_status_utils.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+
+namespace {
+
+using ::quiche::Capsule;
+using ::quiche::CapsuleType;
+using ::quiche::CloseWebTransportSessionCapsule;
+
+// This is arbitrary, since we don't have any real MTU restriction when running
+// over TCP.
+constexpr uint64_t kEncapsulatedMaxDatagramSize = 9000;
+
+}  // namespace
+
+EncapsulatedSession::EncapsulatedSession(
+    Perspective perspective, FatalErrorCallback fatal_error_callback)
+    : perspective_(perspective),
+      fatal_error_callback_(std::move(fatal_error_callback)),
+      capsule_parser_(this) {}
+
+void EncapsulatedSession::InitializeClient(
+    std::unique_ptr<SessionVisitor> visitor,
+    quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
+    quiche::ReadStream* reader) {
+  if (state_ != kUninitialized) {
+    OnFatalError("Called InitializeClient() in an invalid state");
+    return;
+  }
+  if (perspective_ != Perspective::kClient) {
+    OnFatalError("Called InitializeClient() on a server session");
+    return;
+  }
+
+  visitor_ = std::move(visitor);
+  writer_ = writer;
+  reader_ = reader;
+  state_ = kWaitingForHeaders;
+}
+
+void EncapsulatedSession::InitializeServer(
+    std::unique_ptr<SessionVisitor> visitor,
+    const quiche::HttpHeaderBlock& /*incoming_headers*/,
+    quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
+    quiche::ReadStream* reader) {
+  if (state_ != kUninitialized) {
+    OnFatalError("Called InitializeServer() in an invalid state");
+    return;
+  }
+  if (perspective_ != Perspective::kServer) {
+    OnFatalError("Called InitializeServer() on a client session");
+    return;
+  }
+
+  visitor_ = std::move(visitor);
+  writer_ = writer;
+  reader_ = reader;
+  OpenSession();
+}
+void EncapsulatedSession::ProcessIncomingServerHeaders(
+    const quiche::HttpHeaderBlock& /*headers*/) {
+  if (state_ != kWaitingForHeaders) {
+    OnFatalError("Called ProcessIncomingServerHeaders() in an invalid state");
+    return;
+  }
+  OpenSession();
+}
+
+void EncapsulatedSession::CloseSession(SessionErrorCode error_code,
+                                       absl::string_view error_message) {
+  switch (state_) {
+    case kUninitialized:
+    case kWaitingForHeaders:
+      OnFatalError(absl::StrCat(
+          "Attempted to close a session before it opened with error 0x",
+          absl::Hex(error_code), ": ", error_message));
+      return;
+    case kSessionClosing:
+    case kSessionClosed:
+      OnFatalError(absl::StrCat(
+          "Attempted to close a session that is already closed with error 0x",
+          absl::Hex(error_code), ": ", error_message));
+      return;
+    case kSessionOpen:
+      break;
+  }
+  state_ = kSessionClosing;
+  buffered_session_close_ =
+      BufferedClose{error_code, std::string(error_message)};
+  OnCanWrite();
+}
+
+Stream* EncapsulatedSession::AcceptIncomingBidirectionalStream() {
+  return nullptr;
+}
+Stream* EncapsulatedSession::AcceptIncomingUnidirectionalStream() {
+  return nullptr;
+}
+bool EncapsulatedSession::CanOpenNextOutgoingBidirectionalStream() {
+  return false;
+}
+bool EncapsulatedSession::CanOpenNextOutgoingUnidirectionalStream() {
+  return false;
+}
+Stream* EncapsulatedSession::OpenOutgoingBidirectionalStream() {
+  return nullptr;
+}
+Stream* EncapsulatedSession::OpenOutgoingUnidirectionalStream() {
+  return nullptr;
+}
+
+Stream* EncapsulatedSession::GetStreamById(StreamId /*id*/) { return nullptr; }
+DatagramStats EncapsulatedSession::GetDatagramStats() {
+  DatagramStats stats;
+  stats.expired_outgoing = 0;
+  stats.lost_outgoing = 0;
+  return stats;
+}
+
+SessionStats EncapsulatedSession::GetSessionStats() {
+  // We could potentially get stats via tcp_info and similar mechanisms, but
+  // that would require us knowing what the underlying socket is.
+  return SessionStats();
+}
+
+void EncapsulatedSession::NotifySessionDraining() {
+  control_capsule_queue_.push_back(quiche::SerializeCapsule(
+      quiche::Capsule(quiche::DrainWebTransportSessionCapsule()), allocator_));
+  OnCanWrite();
+}
+void EncapsulatedSession::SetOnDraining(
+    quiche::SingleUseCallback<void()> callback) {
+  draining_callback_ = std::move(callback);
+}
+
+DatagramStatus EncapsulatedSession::SendOrQueueDatagram(
+    absl::string_view datagram) {
+  if (datagram.size() > GetMaxDatagramSize()) {
+    return DatagramStatus{
+        DatagramStatusCode::kTooBig,
+        absl::StrCat("Datagram is ", datagram.size(),
+                     " bytes long, while the specified maximum size is ",
+                     GetMaxDatagramSize())};
+  }
+
+  bool write_blocked;
+  switch (state_) {
+    case kUninitialized:
+      write_blocked = true;
+      break;
+    // We can send datagrams before receiving any headers from the peer, since
+    // datagrams are not subject to queueing.
+    case kWaitingForHeaders:
+    case kSessionOpen:
+      write_blocked = !writer_->CanWrite();
+      break;
+    case kSessionClosing:
+    case kSessionClosed:
+      return DatagramStatus{DatagramStatusCode::kInternalError,
+                            "Writing into an already closed session"};
+  }
+
+  if (write_blocked) {
+    // TODO: this *may* be useful to split into a separate queue.
+    control_capsule_queue_.push_back(
+        quiche::SerializeCapsule(Capsule::Datagram(datagram), allocator_));
+    return DatagramStatus{DatagramStatusCode::kSuccess, ""};
+  }
+
+  // We could always write via OnCanWrite() above, but the optimistic path below
+  // allows us to avoid a copy.
+  quiche::QuicheBuffer buffer =
+      quiche::SerializeDatagramCapsuleHeader(datagram.size(), allocator_);
+  std::array spans = {buffer.AsStringView(), datagram};
+  absl::Status write_status =
+      writer_->Writev(absl::MakeConstSpan(spans), quiche::StreamWriteOptions());
+  if (!write_status.ok()) {
+    OnWriteError(write_status);
+    return DatagramStatus{
+        DatagramStatusCode::kInternalError,
+        absl::StrCat("Write error for datagram: ", write_status.ToString())};
+  }
+  return DatagramStatus{DatagramStatusCode::kSuccess, ""};
+}
+
+uint64_t EncapsulatedSession::GetMaxDatagramSize() const {
+  return kEncapsulatedMaxDatagramSize;
+}
+
+void EncapsulatedSession::SetDatagramMaxTimeInQueue(
+    absl::Duration /*max_time_in_queue*/) {
+  // TODO(b/264263113): implement this (requires having a mockable clock).
+}
+
+void EncapsulatedSession::OnCanWrite() {
+  if (state_ == kUninitialized || !writer_) {
+    OnFatalError("Trying to write before the session is initialized");
+    return;
+  }
+  if (state_ == kSessionClosed) {
+    OnFatalError("Trying to write before the session is closed");
+    return;
+  }
+
+  if (state_ == kSessionClosing) {
+    if (writer_->CanWrite()) {
+      CloseWebTransportSessionCapsule capsule{
+          buffered_session_close_.error_code,
+          buffered_session_close_.error_message};
+      quiche::QuicheBuffer buffer =
+          quiche::SerializeCapsule(Capsule(std::move(capsule)), allocator_);
+      absl::Status write_status = SendFin(buffer.AsStringView());
+      if (!write_status.ok()) {
+        OnWriteError(quiche::AppendToStatus(write_status,
+                                            " while writing WT_CLOSE_SESSION"));
+        return;
+      }
+      OnSessionClosed(buffered_session_close_.error_code,
+                      buffered_session_close_.error_message);
+    }
+    return;
+  }
+
+  while (writer_->CanWrite() && !control_capsule_queue_.empty()) {
+    absl::Status write_status = quiche::WriteIntoStream(
+        *writer_, control_capsule_queue_.front().AsStringView());
+    if (!write_status.ok()) {
+      OnWriteError(write_status);
+      return;
+    }
+    control_capsule_queue_.pop_front();
+  }
+
+  // TODO(b/264263113): send stream data.
+}
+
+void EncapsulatedSession::OnCanRead() {
+  if (state_ == kSessionClosed || state_ == kSessionClosing) {
+    return;
+  }
+  bool has_fin = quiche::ProcessAllReadableRegions(
+      *reader_, [&](absl::string_view fragment) {
+        capsule_parser_.IngestCapsuleFragment(fragment);
+      });
+  if (has_fin) {
+    capsule_parser_.ErrorIfThereIsRemainingBufferedData();
+    OnSessionClosed(0, "");
+  }
+}
+
+bool EncapsulatedSession::OnCapsule(const quiche::Capsule& capsule) {
+  switch (capsule.capsule_type()) {
+    case CapsuleType::DATAGRAM:
+      visitor_->OnDatagramReceived(
+          capsule.datagram_capsule().http_datagram_payload);
+      break;
+    case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
+      if (draining_callback_) {
+        std::move(draining_callback_)();
+      }
+      break;
+    case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
+      OnSessionClosed(
+          capsule.close_web_transport_session_capsule().error_code,
+          std::string(
+              capsule.close_web_transport_session_capsule().error_message));
+      break;
+    default:
+      break;
+  }
+  return true;
+}
+
+void EncapsulatedSession::OnCapsuleParseFailure(
+    absl::string_view error_message) {
+  OnFatalError(absl::StrCat("Stream parse error: ", error_message));
+}
+
+void EncapsulatedSession::OpenSession() {
+  state_ = kSessionOpen;
+  visitor_->OnSessionReady();
+  OnCanWrite();
+  OnCanRead();
+}
+
+absl::Status EncapsulatedSession::SendFin(absl::string_view data) {
+  QUICHE_DCHECK(!fin_sent_);
+  fin_sent_ = true;
+  quiche::StreamWriteOptions options;
+  options.set_send_fin(true);
+  return quiche::WriteIntoStream(*writer_, data, options);
+}
+
+void EncapsulatedSession::OnSessionClosed(SessionErrorCode error_code,
+                                          const std::string& error_message) {
+  if (!fin_sent_) {
+    absl::Status status = SendFin("");
+    if (!status.ok()) {
+      OnWriteError(status);
+      return;
+    }
+  }
+
+  if (session_close_notified_) {
+    QUICHE_DCHECK_EQ(state_, kSessionClosed);
+    return;
+  }
+  state_ = kSessionClosed;
+  session_close_notified_ = true;
+
+  if (visitor_ != nullptr) {
+    visitor_->OnSessionClosed(error_code, error_message);
+  }
+}
+
+void EncapsulatedSession::OnFatalError(absl::string_view error_message) {
+  QUICHE_DLOG(ERROR) << "Fatal error in encapsulated WebTransport: "
+                     << error_message;
+  state_ = kSessionClosed;
+  if (fatal_error_callback_) {
+    std::move(fatal_error_callback_)(error_message);
+  }
+}
+
+void EncapsulatedSession::OnWriteError(absl::Status error) {
+  OnFatalError(absl::StrCat(
+      error, " while trying to write encapsulated WebTransport data"));
+}
+
+}  // namespace webtransport
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.h b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
new file mode 100644
index 0000000..85c14c9
--- /dev/null
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
@@ -0,0 +1,133 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
+#define QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/time.h"
+#include "quiche/common/capsule.h"
+#include "quiche/common/http/http_header_block.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_callbacks.h"
+#include "quiche/common/quiche_circular_deque.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/common/simple_buffer_allocator.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+
+using FatalErrorCallback = quiche::SingleUseCallback<void(absl::string_view)>;
+
+// Implementation of the WebTransport over HTTP/2 protocol; works over any
+// arbitrary bidirectional bytestream that can be prefixed with HTTP headers.
+// Specification: https://datatracker.ietf.org/doc/draft-ietf-webtrans-http2/
+class QUICHE_EXPORT EncapsulatedSession
+    : public webtransport::Session,
+      public quiche::WriteStreamVisitor,
+      public quiche::ReadStreamVisitor,
+      public quiche::CapsuleParser::Visitor {
+ public:
+  // The state machine of the transport.
+  enum State {
+    // The transport object has been created, but
+    // InitializeClient/InitializeServer has not been called yet.
+    kUninitialized,
+    // The client has sent its own headers, but haven't received a response yet.
+    kWaitingForHeaders,
+    // Both the client and the server headers have been processed.
+    kSessionOpen,
+    // The session close has been requested, but the CLOSE capsule hasn't been
+    // sent yet.
+    kSessionClosing,
+    // The session has been closed; no further data will be exchanged.
+    kSessionClosed,
+  };
+
+  // The `fatal_error_callback` implies that any state related to the session
+  // should be torn down after it's been called.
+  EncapsulatedSession(Perspective perspective,
+                      FatalErrorCallback fatal_error_callback);
+
+  // WebTransport uses HTTP headers in a similar way to how QUIC uses SETTINGS;
+  // thus, the headers are necessary to initialize the session.
+  void InitializeClient(std::unique_ptr<SessionVisitor> visitor,
+                        quiche::HttpHeaderBlock& outgoing_headers,
+                        quiche::WriteStream* writer,
+                        quiche::ReadStream* reader);
+  void InitializeServer(std::unique_ptr<SessionVisitor> visitor,
+                        const quiche::HttpHeaderBlock& incoming_headers,
+                        quiche::HttpHeaderBlock& outgoing_headers,
+                        quiche::WriteStream* writer,
+                        quiche::ReadStream* reader);
+  void ProcessIncomingServerHeaders(const quiche::HttpHeaderBlock& headers);
+
+  // webtransport::Session implementation.
+  void CloseSession(SessionErrorCode error_code,
+                    absl::string_view error_message) override;
+  Stream* AcceptIncomingBidirectionalStream() override;
+  Stream* AcceptIncomingUnidirectionalStream() override;
+  bool CanOpenNextOutgoingBidirectionalStream() override;
+  bool CanOpenNextOutgoingUnidirectionalStream() override;
+  Stream* OpenOutgoingBidirectionalStream() override;
+  Stream* OpenOutgoingUnidirectionalStream() override;
+  DatagramStatus SendOrQueueDatagram(absl::string_view datagram) override;
+  uint64_t GetMaxDatagramSize() const override;
+  void SetDatagramMaxTimeInQueue(absl::Duration max_time_in_queue) override;
+  Stream* GetStreamById(StreamId id) override;
+  DatagramStats GetDatagramStats() override;
+  SessionStats GetSessionStats() override;
+  void NotifySessionDraining() override;
+  void SetOnDraining(quiche::SingleUseCallback<void()> callback) override;
+
+  // quiche::WriteStreamVisitor implementation.
+  void OnCanWrite() override;
+  // quiche::ReadStreamVisitor implementation.
+  void OnCanRead() override;
+  // quiche::CapsuleParser::Visitor implementation.
+  bool OnCapsule(const quiche::Capsule& capsule) override;
+  void OnCapsuleParseFailure(absl::string_view error_message) override;
+
+  State state() const { return state_; }
+
+ private:
+  struct BufferedClose {
+    SessionErrorCode error_code = 0;
+    std::string error_message;
+  };
+
+  Perspective perspective_;
+  State state_ = kUninitialized;
+  std::unique_ptr<SessionVisitor> visitor_ = nullptr;
+  FatalErrorCallback fatal_error_callback_;
+  quiche::SingleUseCallback<void()> draining_callback_;
+  quiche::WriteStream* writer_ = nullptr;  // Not owned.
+  quiche::ReadStream* reader_ = nullptr;   // Not owned.
+  quiche::QuicheBufferAllocator* allocator_ =
+      quiche::SimpleBufferAllocator::Get();
+  quiche::CapsuleParser capsule_parser_;
+
+  bool session_close_notified_ = false;
+  bool fin_sent_ = false;
+
+  BufferedClose buffered_session_close_;
+  quiche::QuicheCircularDeque<quiche::QuicheBuffer> control_capsule_queue_;
+
+  void OpenSession();
+  absl::Status SendFin(absl::string_view data);
+  void OnSessionClosed(SessionErrorCode error_code,
+                       const std::string& error_message);
+  void OnFatalError(absl::string_view error_message);
+  void OnWriteError(absl::Status error);
+};
+
+}  // namespace webtransport
+
+#endif  // QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
new file mode 100644
index 0000000..15728e4
--- /dev/null
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
@@ -0,0 +1,318 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/common/capsule.h"
+#include "quiche/common/http/http_header_block.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/common/simple_buffer_allocator.h"
+#include "quiche/common/test_tools/mock_streams.h"
+#include "quiche/web_transport/test_tools/mock_web_transport.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport::test {
+namespace {
+
+using ::quiche::Capsule;
+using ::quiche::CapsuleType;
+using ::testing::_;
+using ::testing::HasSubstr;
+using ::testing::IsEmpty;
+using ::testing::Return;
+using ::testing::StrEq;
+
+class EncapsulatedWebTransportTest : public quiche::test::QuicheTest,
+                                     public quiche::CapsuleParser::Visitor {
+ public:
+  EncapsulatedWebTransportTest() : parser_(this), reader_(&read_buffer_) {
+    ON_CALL(fatal_error_callback_, Call(_))
+        .WillByDefault([](absl::string_view error) {
+          ADD_FAILURE() << "Fatal session error: " << error;
+        });
+    ON_CALL(writer_, Writev(_, _))
+        .WillByDefault([&](absl::Span<const absl::string_view> data,
+                           const quiche::StreamWriteOptions& options) {
+          for (absl::string_view fragment : data) {
+            parser_.IngestCapsuleFragment(fragment);
+          }
+          writer_.ProcessOptions(options);
+          return absl::OkStatus();
+        });
+  }
+
+  std::unique_ptr<EncapsulatedSession> CreateTransport(
+      Perspective perspective) {
+    auto transport = std::make_unique<EncapsulatedSession>(
+        perspective, fatal_error_callback_.AsStdFunction());
+    session_ = transport.get();
+    return transport;
+  }
+
+  std::unique_ptr<SessionVisitor> CreateAndStoreVisitor() {
+    auto visitor = std::make_unique<testing::StrictMock<MockSessionVisitor>>();
+    visitor_ = visitor.get();
+    return visitor;
+  }
+
+  MOCK_METHOD(bool, OnCapsule, (const Capsule&), (override));
+
+  void OnCapsuleParseFailure(absl::string_view error_message) override {
+    ADD_FAILURE() << "Written an invalid capsule: " << error_message;
+  }
+
+  void ProcessIncomingCapsule(const Capsule& capsule) {
+    quiche::QuicheBuffer buffer =
+        quiche::SerializeCapsule(capsule, quiche::SimpleBufferAllocator::Get());
+    read_buffer_.append(buffer.data(), buffer.size());
+    session_->OnCanRead();
+  }
+
+  void DefaultHandshakeForClient(EncapsulatedSession& session) {
+    quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
+    session.InitializeClient(CreateAndStoreVisitor(), outgoing_headers,
+                             &writer_, &reader_);
+    EXPECT_CALL(*visitor_, OnSessionReady());
+    session.ProcessIncomingServerHeaders(incoming_headers);
+  }
+
+ protected:
+  quiche::CapsuleParser parser_;
+  quiche::test::MockWriteStream writer_;
+  std::string read_buffer_;
+  quiche::test::ReadStreamFromString reader_;
+  MockSessionVisitor* visitor_ = nullptr;
+  EncapsulatedSession* session_ = nullptr;
+  testing::MockFunction<void(absl::string_view)> fatal_error_callback_;
+};
+
+TEST_F(EncapsulatedWebTransportTest, SetupClientSession) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
+  EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized);
+  session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_,
+                            &reader_);
+  EXPECT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders);
+  EXPECT_CALL(*visitor_, OnSessionReady());
+  session->ProcessIncomingServerHeaders(incoming_headers);
+  EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
+}
+
+TEST_F(EncapsulatedWebTransportTest, SetupServerSession) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kServer);
+  quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
+  EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized);
+  std::unique_ptr<SessionVisitor> visitor = CreateAndStoreVisitor();
+  EXPECT_CALL(*visitor_, OnSessionReady());
+  session->InitializeServer(std::move(visitor), outgoing_headers,
+                            incoming_headers, &writer_, &reader_);
+  EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
+}
+
+TEST_F(EncapsulatedWebTransportTest, CloseSession) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+  EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+    EXPECT_EQ(capsule.capsule_type(), CapsuleType::CLOSE_WEBTRANSPORT_SESSION);
+    EXPECT_EQ(capsule.close_web_transport_session_capsule().error_code, 0x1234);
+    EXPECT_EQ(capsule.close_web_transport_session_capsule().error_message,
+              "test close");
+    return true;
+  });
+  EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
+  EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close")));
+  session->CloseSession(0x1234, "test close");
+  EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed);
+  EXPECT_TRUE(writer_.fin_written());
+
+  EXPECT_CALL(fatal_error_callback_, Call(_))
+      .WillOnce([](absl::string_view error) {
+        EXPECT_THAT(error, HasSubstr("close a session that is already closed"));
+      });
+  session->CloseSession(0x1234, "test close");
+}
+
+TEST_F(EncapsulatedWebTransportTest, CloseSessionWriteBlocked) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+  EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
+  EXPECT_CALL(*this, OnCapsule(_)).Times(0);
+  EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
+  session->CloseSession(0x1234, "test close");
+  EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosing);
+
+  EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+    EXPECT_EQ(capsule.capsule_type(), CapsuleType::CLOSE_WEBTRANSPORT_SESSION);
+    EXPECT_EQ(capsule.close_web_transport_session_capsule().error_code, 0x1234);
+    EXPECT_EQ(capsule.close_web_transport_session_capsule().error_message,
+              "test close");
+    return true;
+  });
+  EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(true));
+  EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close")));
+  session->OnCanWrite();
+  EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed);
+  EXPECT_TRUE(writer_.fin_written());
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveFin) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+
+  EXPECT_CALL(*visitor_, OnSessionClosed(0, IsEmpty()));
+  reader_.set_fin();
+  session->OnCanRead();
+  EXPECT_TRUE(writer_.fin_written());
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveCloseSession) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+
+  EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test")));
+  ProcessIncomingCapsule(Capsule::CloseWebTransportSession(0x1234, "test"));
+  EXPECT_TRUE(writer_.fin_written());
+  reader_.set_fin();
+  session->OnCanRead();
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveMalformedData) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+
+  EXPECT_CALL(fatal_error_callback_, Call(HasSubstr("too much capsule data")))
+      .WillOnce([] {});
+  read_buffer_ = std::string(2 * 1024 * 1024, '\xff');
+  session->OnCanRead();
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendDatagrams) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+  EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+    EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM);
+    EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
+    return true;
+  });
+  DatagramStatus status = session->SendOrQueueDatagram("test");
+  EXPECT_EQ(status.code, DatagramStatusCode::kSuccess);
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendDatagramsEarly) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  quiche::HttpHeaderBlock outgoing_headers;
+  session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_,
+                            &reader_);
+  EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+    EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM);
+    EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
+    return true;
+  });
+  ASSERT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders);
+  session->SendOrQueueDatagram("test");
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendDatagramsBeforeInitialization) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  quiche::HttpHeaderBlock outgoing_headers;
+  EXPECT_CALL(*this, OnCapsule(_)).Times(0);
+  ASSERT_EQ(session->state(), EncapsulatedSession::kUninitialized);
+  session->SendOrQueueDatagram("test");
+
+  EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+    EXPECT_EQ(capsule.capsule_type(), CapsuleType::DATAGRAM);
+    EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
+    return true;
+  });
+  DefaultHandshakeForClient(*session);
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendDatagramsTooBig) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+  EXPECT_CALL(*this, OnCapsule(_)).Times(0);
+  std::string long_string(16 * 1024, 'a');
+  DatagramStatus status = session->SendOrQueueDatagram(long_string);
+  EXPECT_EQ(status.code, DatagramStatusCode::kTooBig);
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveDatagrams) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+  EXPECT_CALL(*visitor_, OnDatagramReceived(_))
+      .WillOnce([](absl::string_view data) { EXPECT_EQ(data, "test"); });
+  ProcessIncomingCapsule(Capsule::Datagram("test"));
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendDraining) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+  EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+    EXPECT_EQ(capsule.capsule_type(), CapsuleType::DRAIN_WEBTRANSPORT_SESSION);
+    return true;
+  });
+  session->NotifySessionDraining();
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveDraining) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+  testing::MockFunction<void()> callback;
+  session->SetOnDraining(callback.AsStdFunction());
+  EXPECT_CALL(callback, Call());
+  ProcessIncomingCapsule(Capsule(quiche::DrainWebTransportSessionCapsule()));
+}
+
+TEST_F(EncapsulatedWebTransportTest, WriteErrorDatagram) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+  EXPECT_CALL(writer_, Writev(_, _))
+      .WillOnce(Return(absl::InternalError("Test write error")));
+  EXPECT_CALL(fatal_error_callback_, Call(_))
+      .WillOnce([](absl::string_view error) {
+        EXPECT_THAT(error, HasSubstr("Test write error"));
+      });
+  DatagramStatus status = session->SendOrQueueDatagram("test");
+  EXPECT_EQ(status.code, DatagramStatusCode::kInternalError);
+}
+
+TEST_F(EncapsulatedWebTransportTest, WriteErrorControlCapsule) {
+  std::unique_ptr<EncapsulatedSession> session =
+      CreateTransport(Perspective::kClient);
+  DefaultHandshakeForClient(*session);
+  EXPECT_CALL(writer_, Writev(_, _))
+      .WillOnce(Return(absl::InternalError("Test write error")));
+  EXPECT_CALL(fatal_error_callback_, Call(_))
+      .WillOnce([](absl::string_view error) {
+        EXPECT_THAT(error, HasSubstr("Test write error"));
+      });
+  session->NotifySessionDraining();
+}
+
+}  // namespace
+}  // namespace webtransport::test
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h
index be08f89..dbf22f8 100644
--- a/quiche/web_transport/web_transport.h
+++ b/quiche/web_transport/web_transport.h
@@ -24,6 +24,8 @@
 
 namespace webtransport {
 
+enum class Perspective { kClient, kServer };
+
 // A numeric ID uniquely identifying a WebTransport stream. Note that by design,
 // those IDs are not available in the Web API, and the IDs do not necessarily
 // match between client and server perspective, since there may be a proxy