Implement basic WebTransport over HTTP/2 functionality.
This includes datagrams, closing session and draining session, but does not include streams yet.
PiperOrigin-RevId: 582768912
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 0ba5220..e7806bf 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -392,6 +392,7 @@
"spdy/core/spdy_protocol.h",
"spdy/core/zero_copy_output_buffer.h",
"web_transport/complete_buffer_visitor.h",
+ "web_transport/encapsulated/encapsulated_web_transport.h",
"web_transport/web_transport.h",
]
quiche_core_srcs = [
@@ -680,6 +681,7 @@
"spdy/core/spdy_prefixed_buffer_reader.cc",
"spdy/core/spdy_protocol.cc",
"web_transport/complete_buffer_visitor.cc",
+ "web_transport/encapsulated/encapsulated_web_transport.cc",
]
quiche_tool_support_hdrs = [
"common/platform/api/quiche_command_line_flags.h",
@@ -742,6 +744,7 @@
"common/platform/api/quiche_test.h",
"common/platform/api/quiche_test_loopback.h",
"common/platform/api/quiche_test_output.h",
+ "common/test_tools/mock_streams.h",
"common/test_tools/quiche_test_utils.h",
"http2/adapter/mock_http2_visitor.h",
"http2/adapter/recording_http2_visitor.h",
@@ -1072,6 +1075,7 @@
"common/simple_buffer_allocator_test.cc",
"common/structured_headers_generated_test.cc",
"common/structured_headers_test.cc",
+ "common/test_tools/mock_streams_test.cc",
"common/test_tools/quiche_test_utils_test.cc",
"common/wire_serialization_test.cc",
"http2/adapter/event_forwarder_test.cc",
@@ -1302,6 +1306,7 @@
"spdy/core/spdy_pinnable_buffer_piece_test.cc",
"spdy/core/spdy_prefixed_buffer_reader_test.cc",
"spdy/core/spdy_protocol_test.cc",
+ "web_transport/encapsulated/encapsulated_web_transport_test.cc",
]
io_tests_hdrs = [
]
diff --git a/build/source_list.gni b/build/source_list.gni
index 69e5f27..da2d729 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -392,6 +392,7 @@
"src/quiche/spdy/core/spdy_protocol.h",
"src/quiche/spdy/core/zero_copy_output_buffer.h",
"src/quiche/web_transport/complete_buffer_visitor.h",
+ "src/quiche/web_transport/encapsulated/encapsulated_web_transport.h",
"src/quiche/web_transport/web_transport.h",
]
quiche_core_srcs = [
@@ -680,6 +681,7 @@
"src/quiche/spdy/core/spdy_prefixed_buffer_reader.cc",
"src/quiche/spdy/core/spdy_protocol.cc",
"src/quiche/web_transport/complete_buffer_visitor.cc",
+ "src/quiche/web_transport/encapsulated/encapsulated_web_transport.cc",
]
quiche_tool_support_hdrs = [
"src/quiche/common/platform/api/quiche_command_line_flags.h",
@@ -742,6 +744,7 @@
"src/quiche/common/platform/api/quiche_test.h",
"src/quiche/common/platform/api/quiche_test_loopback.h",
"src/quiche/common/platform/api/quiche_test_output.h",
+ "src/quiche/common/test_tools/mock_streams.h",
"src/quiche/common/test_tools/quiche_test_utils.h",
"src/quiche/http2/adapter/mock_http2_visitor.h",
"src/quiche/http2/adapter/recording_http2_visitor.h",
@@ -1073,6 +1076,7 @@
"src/quiche/common/simple_buffer_allocator_test.cc",
"src/quiche/common/structured_headers_generated_test.cc",
"src/quiche/common/structured_headers_test.cc",
+ "src/quiche/common/test_tools/mock_streams_test.cc",
"src/quiche/common/test_tools/quiche_test_utils_test.cc",
"src/quiche/common/wire_serialization_test.cc",
"src/quiche/http2/adapter/event_forwarder_test.cc",
@@ -1303,6 +1307,7 @@
"src/quiche/spdy/core/spdy_pinnable_buffer_piece_test.cc",
"src/quiche/spdy/core/spdy_prefixed_buffer_reader_test.cc",
"src/quiche/spdy/core/spdy_protocol_test.cc",
+ "src/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc",
]
io_tests_hdrs = [
diff --git a/build/source_list.json b/build/source_list.json
index 165f594..42f3fbb 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -391,6 +391,7 @@
"quiche/spdy/core/spdy_protocol.h",
"quiche/spdy/core/zero_copy_output_buffer.h",
"quiche/web_transport/complete_buffer_visitor.h",
+ "quiche/web_transport/encapsulated/encapsulated_web_transport.h",
"quiche/web_transport/web_transport.h"
],
"quiche_core_srcs": [
@@ -678,7 +679,8 @@
"quiche/spdy/core/spdy_pinnable_buffer_piece.cc",
"quiche/spdy/core/spdy_prefixed_buffer_reader.cc",
"quiche/spdy/core/spdy_protocol.cc",
- "quiche/web_transport/complete_buffer_visitor.cc"
+ "quiche/web_transport/complete_buffer_visitor.cc",
+ "quiche/web_transport/encapsulated/encapsulated_web_transport.cc"
],
"quiche_tool_support_hdrs": [
"quiche/common/platform/api/quiche_command_line_flags.h",
@@ -741,6 +743,7 @@
"quiche/common/platform/api/quiche_test.h",
"quiche/common/platform/api/quiche_test_loopback.h",
"quiche/common/platform/api/quiche_test_output.h",
+ "quiche/common/test_tools/mock_streams.h",
"quiche/common/test_tools/quiche_test_utils.h",
"quiche/http2/adapter/mock_http2_visitor.h",
"quiche/http2/adapter/recording_http2_visitor.h",
@@ -1072,6 +1075,7 @@
"quiche/common/simple_buffer_allocator_test.cc",
"quiche/common/structured_headers_generated_test.cc",
"quiche/common/structured_headers_test.cc",
+ "quiche/common/test_tools/mock_streams_test.cc",
"quiche/common/test_tools/quiche_test_utils_test.cc",
"quiche/common/wire_serialization_test.cc",
"quiche/http2/adapter/event_forwarder_test.cc",
@@ -1301,7 +1305,8 @@
"quiche/spdy/core/spdy_intrusive_list_test.cc",
"quiche/spdy/core/spdy_pinnable_buffer_piece_test.cc",
"quiche/spdy/core/spdy_prefixed_buffer_reader_test.cc",
- "quiche/spdy/core/spdy_protocol_test.cc"
+ "quiche/spdy/core/spdy_protocol_test.cc",
+ "quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc"
],
"io_tests_hdrs": [
diff --git a/quiche/common/capsule.cc b/quiche/common/capsule.cc
index 66d4a66..421f11d 100644
--- a/quiche/common/capsule.cc
+++ b/quiche/common/capsule.cc
@@ -4,8 +4,13 @@
#include "quiche/common/capsule.h"
+#include <cstddef>
+#include <cstdint>
#include <limits>
+#include <ostream>
+#include <string>
#include <type_traits>
+#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@@ -350,6 +355,17 @@
}
}
+QuicheBuffer SerializeDatagramCapsuleHeader(uint64_t datagram_size,
+ QuicheBufferAllocator* allocator) {
+ absl::StatusOr<QuicheBuffer> buffer =
+ SerializeIntoBuffer(allocator, WireVarInt62(CapsuleType::DATAGRAM),
+ WireVarInt62(datagram_size));
+ if (!buffer.ok()) {
+ return QuicheBuffer();
+ }
+ return *std::move(buffer);
+}
+
QuicheBuffer SerializeCapsule(const Capsule& capsule,
quiche::QuicheBufferAllocator* allocator) {
absl::StatusOr<QuicheBuffer> serialized =
diff --git a/quiche/common/capsule.h b/quiche/common/capsule.h
index b5e1831..3cadc4d 100644
--- a/quiche/common/capsule.h
+++ b/quiche/common/capsule.h
@@ -391,6 +391,10 @@
QUICHE_EXPORT quiche::QuicheBuffer SerializeCapsule(
const Capsule& capsule, quiche::QuicheBufferAllocator* allocator);
+// Serializes the header for a datagram of size |datagram_size|.
+QUICHE_EXPORT QuicheBuffer SerializeDatagramCapsuleHeader(
+ uint64_t datagram_size, QuicheBufferAllocator* allocator);
+
} // namespace quiche
#endif // QUICHE_COMMON_CAPSULE_H_
diff --git a/quiche/common/capsule_test.cc b/quiche/common/capsule_test.cc
index ae55aaf..0aed714 100644
--- a/quiche/common/capsule_test.cc
+++ b/quiche/common/capsule_test.cc
@@ -10,8 +10,10 @@
#include <vector>
#include "absl/strings/escaping.h"
+#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_ip_address.h"
#include "quiche/common/simple_buffer_allocator.h"
#include "quiche/common/test_tools/quiche_test_utils.h"
@@ -86,6 +88,16 @@
TestSerialization(expected_capsule, capsule_fragment);
}
+TEST_F(CapsuleTest, DatagramCapsuleViaHeader) {
+ std::string datagram_payload = absl::HexStringToBytes("a1a2a3a4a5a6a7a8");
+ quiche::QuicheBuffer expected_capsule = SerializeCapsule(
+ Capsule::Datagram(datagram_payload), SimpleBufferAllocator::Get());
+ quiche::QuicheBuffer actual_header = SerializeDatagramCapsuleHeader(
+ datagram_payload.size(), SimpleBufferAllocator::Get());
+ EXPECT_EQ(expected_capsule.AsStringView(),
+ absl::StrCat(actual_header.AsStringView(), datagram_payload));
+}
+
TEST_F(CapsuleTest, LegacyDatagramCapsule) {
std::string capsule_fragment = absl::HexStringToBytes(
"80ff37a0" // LEGACY_DATAGRAM capsule type
diff --git a/quiche/common/test_tools/mock_streams.h b/quiche/common/test_tools/mock_streams.h
new file mode 100644
index 0000000..577a324
--- /dev/null
+++ b/quiche/common/test_tools/mock_streams.h
@@ -0,0 +1,101 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
+#define QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
+
+#include <algorithm>
+#include <cstddef>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_stream.h"
+
+namespace quiche::test {
+
+// Mockable stream that stores all of the data into an std::string by default.
+class MockWriteStream : public quiche::WriteStream {
+ public:
+ MockWriteStream() {
+ ON_CALL(*this, CanWrite()).WillByDefault(testing::Return(true));
+ ON_CALL(*this, Writev(testing::_, testing::_))
+ .WillByDefault([&](absl::Span<const absl::string_view> data,
+ const StreamWriteOptions& options) {
+ return AppendToData(data, options);
+ });
+ }
+
+ MOCK_METHOD(absl::Status, Writev,
+ (absl::Span<const absl::string_view> data,
+ const StreamWriteOptions& options),
+ (override));
+ MOCK_METHOD(bool, CanWrite, (), (const, override));
+
+ absl::Status AppendToData(absl::Span<const absl::string_view> data,
+ const StreamWriteOptions& options) {
+ for (absl::string_view fragment : data) {
+ data_.append(fragment.data(), fragment.size());
+ }
+ ProcessOptions(options);
+ return absl::OkStatus();
+ }
+ void ProcessOptions(const StreamWriteOptions& options) {
+ fin_written_ |= options.send_fin();
+ }
+
+ std::string& data() { return data_; }
+ bool fin_written() { return fin_written_; }
+
+ private:
+ std::string data_;
+ bool fin_written_ = false;
+};
+
+// Reads stream data from an std::string buffer.
+class ReadStreamFromString : public ReadStream {
+ public:
+ explicit ReadStreamFromString(std::string* data) : data_(data) {}
+
+ ReadResult Read(absl::Span<char> buffer) override {
+ size_t data_to_copy = std::min(buffer.size(), data_->size());
+ std::copy(data_->begin(), data_->begin() + data_to_copy, buffer.begin());
+ *data_ = data_->substr(data_to_copy);
+ return ReadResult{data_to_copy, data_->empty() && fin_};
+ }
+ ReadResult Read(std::string* output) override {
+ size_t bytes = data_->size();
+ output->append(std::move(*data_));
+ data_->clear();
+ return ReadResult{bytes, fin_};
+ }
+
+ size_t ReadableBytes() const override { return data_->size(); }
+
+ virtual PeekResult PeekNextReadableRegion() const override {
+ PeekResult result;
+ result.peeked_data = *data_;
+ result.fin_next = data_->empty() && fin_;
+ result.all_data_received = fin_;
+ return result;
+ }
+
+ bool SkipBytes(size_t bytes) override {
+ *data_ = data_->substr(bytes);
+ return data_->empty() && fin_;
+ }
+
+ void set_fin() { fin_ = true; }
+
+ private:
+ std::string* data_;
+ bool fin_ = false;
+};
+
+} // namespace quiche::test
+
+#endif // QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
diff --git a/quiche/common/test_tools/mock_streams_test.cc b/quiche/common/test_tools/mock_streams_test.cc
new file mode 100644
index 0000000..2e8a2e4
--- /dev/null
+++ b/quiche/common/test_tools/mock_streams_test.cc
@@ -0,0 +1,63 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/common/test_tools/mock_streams.h"
+
+#include <array>
+#include <string>
+
+#include "absl/types/span.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/common/test_tools/quiche_test_utils.h"
+
+namespace quiche::test {
+namespace {
+
+using ::testing::ElementsAre;
+using ::testing::IsEmpty;
+
+TEST(MockWriteStreamTest, DefaultWrite) {
+ MockWriteStream stream;
+ QUICHE_EXPECT_OK(quiche::WriteIntoStream(stream, "test"));
+ EXPECT_EQ(stream.data(), "test");
+ EXPECT_FALSE(stream.fin_written());
+}
+
+TEST(ReadStreamFromStringTest, ReadIntoSpan) {
+ std::string source = "abcdef";
+ std::array<char, 3> buffer;
+ ReadStreamFromString stream(&source);
+ EXPECT_EQ(stream.ReadableBytes(), 6);
+
+ stream.Read(absl::MakeSpan(buffer));
+ EXPECT_THAT(buffer, ElementsAre('a', 'b', 'c'));
+ EXPECT_EQ(stream.ReadableBytes(), 3);
+
+ stream.Read(absl::MakeSpan(buffer));
+ EXPECT_THAT(buffer, ElementsAre('d', 'e', 'f'));
+ EXPECT_EQ(stream.ReadableBytes(), 0);
+ EXPECT_THAT(source, IsEmpty());
+}
+
+TEST(ReadStreamFromStringTest, ReadIntoString) {
+ std::string source = "abcdef";
+ std::string destination;
+ ReadStreamFromString stream(&source);
+ stream.Read(&destination);
+ EXPECT_EQ(destination, "abcdef");
+ EXPECT_THAT(source, IsEmpty());
+}
+
+TEST(ReadStreamFromStringTest, PeekAndSkip) {
+ std::string source = "abcdef";
+ ReadStreamFromString stream(&source);
+ EXPECT_EQ(stream.PeekNextReadableRegion().peeked_data, "abcdef");
+ stream.SkipBytes(2);
+ EXPECT_EQ(stream.PeekNextReadableRegion().peeked_data, "cdef");
+ EXPECT_EQ(source, "cdef");
+}
+
+} // namespace
+} // namespace quiche::test
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
new file mode 100644
index 0000000..0aa3763
--- /dev/null
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
@@ -0,0 +1,355 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
+
+#include <array>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/time.h"
+#include "absl/types/span.h"
+#include "quiche/common/capsule.h"
+#include "quiche/common/http/http_header_block.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_callbacks.h"
+#include "quiche/common/quiche_status_utils.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+
+namespace {
+
+using ::quiche::Capsule;
+using ::quiche::CapsuleType;
+using ::quiche::CloseWebTransportSessionCapsule;
+
+// This is arbitrary, since we don't have any real MTU restriction when running
+// over TCP.
+constexpr uint64_t kEncapsulatedMaxDatagramSize = 9000;
+
+} // namespace
+
+EncapsulatedSession::EncapsulatedSession(
+ Perspective perspective, FatalErrorCallback fatal_error_callback)
+ : perspective_(perspective),
+ fatal_error_callback_(std::move(fatal_error_callback)),
+ capsule_parser_(this) {}
+
+void EncapsulatedSession::InitializeClient(
+ std::unique_ptr<SessionVisitor> visitor,
+ quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
+ quiche::ReadStream* reader) {
+ if (state_ != kUninitialized) {
+ OnFatalError("Called InitializeClient() in an invalid state");
+ return;
+ }
+ if (perspective_ != Perspective::kClient) {
+ OnFatalError("Called InitializeClient() on a server session");
+ return;
+ }
+
+ visitor_ = std::move(visitor);
+ writer_ = writer;
+ reader_ = reader;
+ state_ = kWaitingForHeaders;
+}
+
+void EncapsulatedSession::InitializeServer(
+ std::unique_ptr<SessionVisitor> visitor,
+ const quiche::HttpHeaderBlock& /*incoming_headers*/,
+ quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
+ quiche::ReadStream* reader) {
+ if (state_ != kUninitialized) {
+ OnFatalError("Called InitializeServer() in an invalid state");
+ return;
+ }
+ if (perspective_ != Perspective::kServer) {
+ OnFatalError("Called InitializeServer() on a client session");
+ return;
+ }
+
+ visitor_ = std::move(visitor);
+ writer_ = writer;
+ reader_ = reader;
+ OpenSession();
+}
+void EncapsulatedSession::ProcessIncomingServerHeaders(
+ const quiche::HttpHeaderBlock& /*headers*/) {
+ if (state_ != kWaitingForHeaders) {
+ OnFatalError("Called ProcessIncomingServerHeaders() in an invalid state");
+ return;
+ }
+ OpenSession();
+}
+
+void EncapsulatedSession::CloseSession(SessionErrorCode error_code,
+ absl::string_view error_message) {
+ switch (state_) {
+ case kUninitialized:
+ case kWaitingForHeaders:
+ OnFatalError(absl::StrCat(
+ "Attempted to close a session before it opened with error 0x",
+ absl::Hex(error_code), ": ", error_message));
+ return;
+ case kSessionClosing:
+ case kSessionClosed:
+ OnFatalError(absl::StrCat(
+ "Attempted to close a session that is already closed with error 0x",
+ absl::Hex(error_code), ": ", error_message));
+ return;
+ case kSessionOpen:
+ break;
+ }
+ state_ = kSessionClosing;
+ buffered_session_close_ =
+ BufferedClose{error_code, std::string(error_message)};
+ OnCanWrite();
+}
+
+Stream* EncapsulatedSession::AcceptIncomingBidirectionalStream() {
+ return nullptr;
+}
+Stream* EncapsulatedSession::AcceptIncomingUnidirectionalStream() {
+ return nullptr;
+}
+bool EncapsulatedSession::CanOpenNextOutgoingBidirectionalStream() {
+ return false;
+}
+bool EncapsulatedSession::CanOpenNextOutgoingUnidirectionalStream() {
+ return false;
+}
+Stream* EncapsulatedSession::OpenOutgoingBidirectionalStream() {
+ return nullptr;
+}
+Stream* EncapsulatedSession::OpenOutgoingUnidirectionalStream() {
+ return nullptr;
+}
+
+Stream* EncapsulatedSession::GetStreamById(StreamId /*id*/) { return nullptr; }
+DatagramStats EncapsulatedSession::GetDatagramStats() {
+ DatagramStats stats;
+ stats.expired_outgoing = 0;
+ stats.lost_outgoing = 0;
+ return stats;
+}
+
+SessionStats EncapsulatedSession::GetSessionStats() {
+ // We could potentially get stats via tcp_info and similar mechanisms, but
+ // that would require us knowing what the underlying socket is.
+ return SessionStats();
+}
+
+void EncapsulatedSession::NotifySessionDraining() {
+ control_capsule_queue_.push_back(quiche::SerializeCapsule(
+ quiche::Capsule(quiche::DrainWebTransportSessionCapsule()), allocator_));
+ OnCanWrite();
+}
+void EncapsulatedSession::SetOnDraining(
+ quiche::SingleUseCallback<void()> callback) {
+ draining_callback_ = std::move(callback);
+}
+
+DatagramStatus EncapsulatedSession::SendOrQueueDatagram(
+ absl::string_view datagram) {
+ if (datagram.size() > GetMaxDatagramSize()) {
+ return DatagramStatus{
+ DatagramStatusCode::kTooBig,
+ absl::StrCat("Datagram is ", datagram.size(),
+ " bytes long, while the specified maximum size is ",
+ GetMaxDatagramSize())};
+ }
+
+ bool write_blocked;
+ switch (state_) {
+ case kUninitialized:
+ write_blocked = true;
+ break;
+ // We can send datagrams before receiving any headers from the peer, since
+ // datagrams are not subject to queueing.
+ case kWaitingForHeaders:
+ case kSessionOpen:
+ write_blocked = !writer_->CanWrite();
+ break;
+ case kSessionClosing:
+ case kSessionClosed:
+ return DatagramStatus{DatagramStatusCode::kInternalError,
+ "Writing into an already closed session"};
+ }
+
+ if (write_blocked) {
+ // TODO: this *may* be useful to split into a separate queue.
+ control_capsule_queue_.push_back(
+ quiche::SerializeCapsule(Capsule::Datagram(datagram), allocator_));
+ return DatagramStatus{DatagramStatusCode::kSuccess, ""};
+ }
+
+ // We could always write via OnCanWrite() above, but the optimistic path below
+ // allows us to avoid a copy.
+ quiche::QuicheBuffer buffer =
+ quiche::SerializeDatagramCapsuleHeader(datagram.size(), allocator_);
+ std::array spans = {buffer.AsStringView(), datagram};
+ absl::Status write_status =
+ writer_->Writev(absl::MakeConstSpan(spans), quiche::StreamWriteOptions());
+ if (!write_status.ok()) {
+ OnWriteError(write_status);
+ return DatagramStatus{
+ DatagramStatusCode::kInternalError,
+ absl::StrCat("Write error for datagram: ", write_status.ToString())};
+ }
+ return DatagramStatus{DatagramStatusCode::kSuccess, ""};
+}
+
+uint64_t EncapsulatedSession::GetMaxDatagramSize() const {
+ return kEncapsulatedMaxDatagramSize;
+}
+
+void EncapsulatedSession::SetDatagramMaxTimeInQueue(
+ absl::Duration /*max_time_in_queue*/) {
+ // TODO(b/264263113): implement this (requires having a mockable clock).
+}
+
+void EncapsulatedSession::OnCanWrite() {
+ if (state_ == kUninitialized || !writer_) {
+ OnFatalError("Trying to write before the session is initialized");
+ return;
+ }
+ if (state_ == kSessionClosed) {
+ OnFatalError("Trying to write before the session is closed");
+ return;
+ }
+
+ if (state_ == kSessionClosing) {
+ if (writer_->CanWrite()) {
+ CloseWebTransportSessionCapsule capsule{
+ buffered_session_close_.error_code,
+ buffered_session_close_.error_message};
+ quiche::QuicheBuffer buffer =
+ quiche::SerializeCapsule(Capsule(std::move(capsule)), allocator_);
+ absl::Status write_status = SendFin(buffer.AsStringView());
+ if (!write_status.ok()) {
+ OnWriteError(quiche::AppendToStatus(write_status,
+ " while writing WT_CLOSE_SESSION"));
+ return;
+ }
+ OnSessionClosed(buffered_session_close_.error_code,
+ buffered_session_close_.error_message);
+ }
+ return;
+ }
+
+ while (writer_->CanWrite() && !control_capsule_queue_.empty()) {
+ absl::Status write_status = quiche::WriteIntoStream(
+ *writer_, control_capsule_queue_.front().AsStringView());
+ if (!write_status.ok()) {
+ OnWriteError(write_status);
+ return;
+ }
+ control_capsule_queue_.pop_front();
+ }
+
+ // TODO(b/264263113): send stream data.
+}
+
+void EncapsulatedSession::OnCanRead() {
+ if (state_ == kSessionClosed || state_ == kSessionClosing) {
+ return;
+ }
+ bool has_fin = quiche::ProcessAllReadableRegions(
+ *reader_, [&](absl::string_view fragment) {
+ capsule_parser_.IngestCapsuleFragment(fragment);
+ });
+ if (has_fin) {
+ capsule_parser_.ErrorIfThereIsRemainingBufferedData();
+ OnSessionClosed(0, "");
+ }
+}
+
+bool EncapsulatedSession::OnCapsule(const quiche::Capsule& capsule) {
+ switch (capsule.capsule_type()) {
+ case CapsuleType::DATAGRAM:
+ visitor_->OnDatagramReceived(
+ capsule.datagram_capsule().http_datagram_payload);
+ break;
+ case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
+ if (draining_callback_) {
+ std::move(draining_callback_)();
+ }
+ break;
+ case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
+ OnSessionClosed(
+ capsule.close_web_transport_session_capsule().error_code,
+ std::string(
+ capsule.close_web_transport_session_capsule().error_message));
+ break;
+ default:
+ break;
+ }
+ return true;
+}
+
+void EncapsulatedSession::OnCapsuleParseFailure(
+ absl::string_view error_message) {
+ OnFatalError(absl::StrCat("Stream parse error: ", error_message));
+}
+
+void EncapsulatedSession::OpenSession() {
+ state_ = kSessionOpen;
+ visitor_->OnSessionReady();
+ OnCanWrite();
+ OnCanRead();
+}
+
+absl::Status EncapsulatedSession::SendFin(absl::string_view data) {
+ QUICHE_DCHECK(!fin_sent_);
+ fin_sent_ = true;
+ quiche::StreamWriteOptions options;
+ options.set_send_fin(true);
+ return quiche::WriteIntoStream(*writer_, data, options);
+}
+
+void EncapsulatedSession::OnSessionClosed(SessionErrorCode error_code,
+ const std::string& error_message) {
+ if (!fin_sent_) {
+ absl::Status status = SendFin("");
+ if (!status.ok()) {
+ OnWriteError(status);
+ return;
+ }
+ }
+
+ if (session_close_notified_) {
+ QUICHE_DCHECK_EQ(state_, kSessionClosed);
+ return;
+ }
+ state_ = kSessionClosed;
+ session_close_notified_ = true;
+
+ if (visitor_ != nullptr) {
+ visitor_->OnSessionClosed(error_code, error_message);
+ }
+}
+
+void EncapsulatedSession::OnFatalError(absl::string_view error_message) {
+ QUICHE_DLOG(ERROR) << "Fatal error in encapsulated WebTransport: "
+ << error_message;
+ state_ = kSessionClosed;
+ if (fatal_error_callback_) {
+ std::move(fatal_error_callback_)(error_message);
+ }
+}
+
+void EncapsulatedSession::OnWriteError(absl::Status error) {
+ OnFatalError(absl::StrCat(
+ error, " while trying to write encapsulated WebTransport data"));
+}
+
+} // namespace webtransport
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.h b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
new file mode 100644
index 0000000..85c14c9
--- /dev/null
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
@@ -0,0 +1,133 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
+#define QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/time.h"
+#include "quiche/common/capsule.h"
+#include "quiche/common/http/http_header_block.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_callbacks.h"
+#include "quiche/common/quiche_circular_deque.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/common/simple_buffer_allocator.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+
+using FatalErrorCallback = quiche::SingleUseCallback<void(absl::string_view)>;
+
+// Implementation of the WebTransport over HTTP/2 protocol; works over any
+// arbitrary bidirectional bytestream that can be prefixed with HTTP headers.
+// Specification: https://datatracker.ietf.org/doc/draft-ietf-webtrans-http2/
+class QUICHE_EXPORT EncapsulatedSession
+ : public webtransport::Session,
+ public quiche::WriteStreamVisitor,
+ public quiche::ReadStreamVisitor,
+ public quiche::CapsuleParser::Visitor {
+ public:
+ // The state machine of the transport.
+ enum State {
+ // The transport object has been created, but
+ // InitializeClient/InitializeServer has not been called yet.
+ kUninitialized,
+ // The client has sent its own headers, but haven't received a response yet.
+ kWaitingForHeaders,
+ // Both the client and the server headers have been processed.
+ kSessionOpen,
+ // The session close has been requested, but the CLOSE capsule hasn't been
+ // sent yet.
+ kSessionClosing,
+ // The session has been closed; no further data will be exchanged.
+ kSessionClosed,
+ };
+
+ // The `fatal_error_callback` implies that any state related to the session
+ // should be torn down after it's been called.
+ EncapsulatedSession(Perspective perspective,
+ FatalErrorCallback fatal_error_callback);
+
+ // WebTransport uses HTTP headers in a similar way to how QUIC uses SETTINGS;
+ // thus, the headers are necessary to initialize the session.
+ void InitializeClient(std::unique_ptr<SessionVisitor> visitor,
+ quiche::HttpHeaderBlock& outgoing_headers,
+ quiche::WriteStream* writer,
+ quiche::ReadStream* reader);
+ void InitializeServer(std::unique_ptr<SessionVisitor> visitor,
+ const quiche::HttpHeaderBlock& incoming_headers,
+ quiche::HttpHeaderBlock& outgoing_headers,
+ quiche::WriteStream* writer,
+ quiche::ReadStream* reader);
+ void ProcessIncomingServerHeaders(const quiche::HttpHeaderBlock& headers);
+
+ // webtransport::Session implementation.
+ void CloseSession(SessionErrorCode error_code,
+ absl::string_view error_message) override;
+ Stream* AcceptIncomingBidirectionalStream() override;
+ Stream* AcceptIncomingUnidirectionalStream() override;
+ bool CanOpenNextOutgoingBidirectionalStream() override;
+ bool CanOpenNextOutgoingUnidirectionalStream() override;
+ Stream* OpenOutgoingBidirectionalStream() override;
+ Stream* OpenOutgoingUnidirectionalStream() override;
+ DatagramStatus SendOrQueueDatagram(absl::string_view datagram) override;
+ uint64_t GetMaxDatagramSize() const override;
+ void SetDatagramMaxTimeInQueue(absl::Duration max_time_in_queue) override;
+ Stream* GetStreamById(StreamId id) override;
+ DatagramStats GetDatagramStats() override;
+ SessionStats GetSessionStats() override;
+ void NotifySessionDraining() override;
+ void SetOnDraining(quiche::SingleUseCallback<void()> callback) override;
+
+ // quiche::WriteStreamVisitor implementation.
+ void OnCanWrite() override;
+ // quiche::ReadStreamVisitor implementation.
+ void OnCanRead() override;
+ // quiche::CapsuleParser::Visitor implementation.
+ bool OnCapsule(const quiche::Capsule& capsule) override;
+ void OnCapsuleParseFailure(absl::string_view error_message) override;
+
+ State state() const { return state_; }
+
+ private:
+ struct BufferedClose {
+ SessionErrorCode error_code = 0;
+ std::string error_message;
+ };
+
+ Perspective perspective_;
+ State state_ = kUninitialized;
+ std::unique_ptr<SessionVisitor> visitor_ = nullptr;
+ FatalErrorCallback fatal_error_callback_;
+ quiche::SingleUseCallback<void()> draining_callback_;
+ quiche::WriteStream* writer_ = nullptr; // Not owned.
+ quiche::ReadStream* reader_ = nullptr; // Not owned.
+ quiche::QuicheBufferAllocator* allocator_ =
+ quiche::SimpleBufferAllocator::Get();
+ quiche::CapsuleParser capsule_parser_;
+
+ bool session_close_notified_ = false;
+ bool fin_sent_ = false;
+
+ BufferedClose buffered_session_close_;
+ quiche::QuicheCircularDeque<quiche::QuicheBuffer> control_capsule_queue_;
+
+ void OpenSession();
+ absl::Status SendFin(absl::string_view data);
+ void OnSessionClosed(SessionErrorCode error_code,
+ const std::string& error_message);
+ void OnFatalError(absl::string_view error_message);
+ void OnWriteError(absl::Status error);
+};
+
+} // namespace webtransport
+
+#endif // QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
new file mode 100644
index 0000000..15728e4
--- /dev/null
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
@@ -0,0 +1,318 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/common/capsule.h"
+#include "quiche/common/http/http_header_block.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/common/simple_buffer_allocator.h"
+#include "quiche/common/test_tools/mock_streams.h"
+#include "quiche/web_transport/test_tools/mock_web_transport.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport::test {
+namespace {
+
+using ::quiche::Capsule;
+using ::quiche::CapsuleType;
+using ::testing::_;
+using ::testing::HasSubstr;
+using ::testing::IsEmpty;
+using ::testing::Return;
+using ::testing::StrEq;
+
+class EncapsulatedWebTransportTest : public quiche::test::QuicheTest,
+ public quiche::CapsuleParser::Visitor {
+ public:
+ EncapsulatedWebTransportTest() : parser_(this), reader_(&read_buffer_) {
+ ON_CALL(fatal_error_callback_, Call(_))
+ .WillByDefault([](absl::string_view error) {
+ ADD_FAILURE() << "Fatal session error: " << error;
+ });
+ ON_CALL(writer_, Writev(_, _))
+ .WillByDefault([&](absl::Span<const absl::string_view> data,
+ const quiche::StreamWriteOptions& options) {
+ for (absl::string_view fragment : data) {
+ parser_.IngestCapsuleFragment(fragment);
+ }
+ writer_.ProcessOptions(options);
+ return absl::OkStatus();
+ });
+ }
+
+ std::unique_ptr<EncapsulatedSession> CreateTransport(
+ Perspective perspective) {
+ auto transport = std::make_unique<EncapsulatedSession>(
+ perspective, fatal_error_callback_.AsStdFunction());
+ session_ = transport.get();
+ return transport;
+ }
+
+ std::unique_ptr<SessionVisitor> CreateAndStoreVisitor() {
+ auto visitor = std::make_unique<testing::StrictMock<MockSessionVisitor>>();
+ visitor_ = visitor.get();
+ return visitor;
+ }
+
+ MOCK_METHOD(bool, OnCapsule, (const Capsule&), (override));
+
+ void OnCapsuleParseFailure(absl::string_view error_message) override {
+ ADD_FAILURE() << "Written an invalid capsule: " << error_message;
+ }
+
+ void ProcessIncomingCapsule(const Capsule& capsule) {
+ quiche::QuicheBuffer buffer =
+ quiche::SerializeCapsule(capsule, quiche::SimpleBufferAllocator::Get());
+ read_buffer_.append(buffer.data(), buffer.size());
+ session_->OnCanRead();
+ }
+
+ void DefaultHandshakeForClient(EncapsulatedSession& session) {
+ quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
+ session.InitializeClient(CreateAndStoreVisitor(), outgoing_headers,
+ &writer_, &reader_);
+ EXPECT_CALL(*visitor_, OnSessionReady());
+ session.ProcessIncomingServerHeaders(incoming_headers);
+ }
+
+ protected:
+ quiche::CapsuleParser parser_;
+ quiche::test::MockWriteStream writer_;
+ std::string read_buffer_;
+ quiche::test::ReadStreamFromString reader_;
+ MockSessionVisitor* visitor_ = nullptr;
+ EncapsulatedSession* session_ = nullptr;
+ testing::MockFunction<void(absl::string_view)> fatal_error_callback_;
+};
+
+TEST_F(EncapsulatedWebTransportTest, SetupClientSession) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
+ EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized);
+ session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_,
+ &reader_);
+ EXPECT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders);
+ EXPECT_CALL(*visitor_, OnSessionReady());
+ session->ProcessIncomingServerHeaders(incoming_headers);
+ EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
+}
+
+TEST_F(EncapsulatedWebTransportTest, SetupServerSession) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kServer);
+ quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
+ EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized);
+ std::unique_ptr<SessionVisitor> visitor = CreateAndStoreVisitor();
+ EXPECT_CALL(*visitor_, OnSessionReady());
+ session->InitializeServer(std::move(visitor), outgoing_headers,
+ incoming_headers, &writer_, &reader_);
+ EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
+}
+
+TEST_F(EncapsulatedWebTransportTest, CloseSession) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::CLOSE_WEBTRANSPORT_SESSION);
+ EXPECT_EQ(capsule.close_web_transport_session_capsule().error_code, 0x1234);
+ EXPECT_EQ(capsule.close_web_transport_session_capsule().error_message,
+ "test close");
+ return true;
+ });
+ EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
+ EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close")));
+ session->CloseSession(0x1234, "test close");
+ EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed);
+ EXPECT_TRUE(writer_.fin_written());
+
+ EXPECT_CALL(fatal_error_callback_, Call(_))
+ .WillOnce([](absl::string_view error) {
+ EXPECT_THAT(error, HasSubstr("close a session that is already closed"));
+ });
+ session->CloseSession(0x1234, "test close");
+}
+
+TEST_F(EncapsulatedWebTransportTest, CloseSessionWriteBlocked) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
+ EXPECT_CALL(*this, OnCapsule(_)).Times(0);
+ EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen);
+ session->CloseSession(0x1234, "test close");
+ EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosing);
+
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::CLOSE_WEBTRANSPORT_SESSION);
+ EXPECT_EQ(capsule.close_web_transport_session_capsule().error_code, 0x1234);
+ EXPECT_EQ(capsule.close_web_transport_session_capsule().error_message,
+ "test close");
+ return true;
+ });
+ EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(true));
+ EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close")));
+ session->OnCanWrite();
+ EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed);
+ EXPECT_TRUE(writer_.fin_written());
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveFin) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+
+ EXPECT_CALL(*visitor_, OnSessionClosed(0, IsEmpty()));
+ reader_.set_fin();
+ session->OnCanRead();
+ EXPECT_TRUE(writer_.fin_written());
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveCloseSession) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+
+ EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test")));
+ ProcessIncomingCapsule(Capsule::CloseWebTransportSession(0x1234, "test"));
+ EXPECT_TRUE(writer_.fin_written());
+ reader_.set_fin();
+ session->OnCanRead();
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveMalformedData) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+
+ EXPECT_CALL(fatal_error_callback_, Call(HasSubstr("too much capsule data")))
+ .WillOnce([] {});
+ read_buffer_ = std::string(2 * 1024 * 1024, '\xff');
+ session->OnCanRead();
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendDatagrams) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM);
+ EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
+ return true;
+ });
+ DatagramStatus status = session->SendOrQueueDatagram("test");
+ EXPECT_EQ(status.code, DatagramStatusCode::kSuccess);
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendDatagramsEarly) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ quiche::HttpHeaderBlock outgoing_headers;
+ session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_,
+ &reader_);
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM);
+ EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
+ return true;
+ });
+ ASSERT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders);
+ session->SendOrQueueDatagram("test");
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendDatagramsBeforeInitialization) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ quiche::HttpHeaderBlock outgoing_headers;
+ EXPECT_CALL(*this, OnCapsule(_)).Times(0);
+ ASSERT_EQ(session->state(), EncapsulatedSession::kUninitialized);
+ session->SendOrQueueDatagram("test");
+
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::DATAGRAM);
+ EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test");
+ return true;
+ });
+ DefaultHandshakeForClient(*session);
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendDatagramsTooBig) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*this, OnCapsule(_)).Times(0);
+ std::string long_string(16 * 1024, 'a');
+ DatagramStatus status = session->SendOrQueueDatagram(long_string);
+ EXPECT_EQ(status.code, DatagramStatusCode::kTooBig);
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveDatagrams) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnDatagramReceived(_))
+ .WillOnce([](absl::string_view data) { EXPECT_EQ(data, "test"); });
+ ProcessIncomingCapsule(Capsule::Datagram("test"));
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendDraining) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::DRAIN_WEBTRANSPORT_SESSION);
+ return true;
+ });
+ session->NotifySessionDraining();
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveDraining) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ testing::MockFunction<void()> callback;
+ session->SetOnDraining(callback.AsStdFunction());
+ EXPECT_CALL(callback, Call());
+ ProcessIncomingCapsule(Capsule(quiche::DrainWebTransportSessionCapsule()));
+}
+
+TEST_F(EncapsulatedWebTransportTest, WriteErrorDatagram) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(writer_, Writev(_, _))
+ .WillOnce(Return(absl::InternalError("Test write error")));
+ EXPECT_CALL(fatal_error_callback_, Call(_))
+ .WillOnce([](absl::string_view error) {
+ EXPECT_THAT(error, HasSubstr("Test write error"));
+ });
+ DatagramStatus status = session->SendOrQueueDatagram("test");
+ EXPECT_EQ(status.code, DatagramStatusCode::kInternalError);
+}
+
+TEST_F(EncapsulatedWebTransportTest, WriteErrorControlCapsule) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(writer_, Writev(_, _))
+ .WillOnce(Return(absl::InternalError("Test write error")));
+ EXPECT_CALL(fatal_error_callback_, Call(_))
+ .WillOnce([](absl::string_view error) {
+ EXPECT_THAT(error, HasSubstr("Test write error"));
+ });
+ session->NotifySessionDraining();
+}
+
+} // namespace
+} // namespace webtransport::test
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h
index be08f89..dbf22f8 100644
--- a/quiche/web_transport/web_transport.h
+++ b/quiche/web_transport/web_transport.h
@@ -24,6 +24,8 @@
namespace webtransport {
+enum class Perspective { kClient, kServer };
+
// A numeric ID uniquely identifying a WebTransport stream. Note that by design,
// those IDs are not available in the Web API, and the IDs do not necessarily
// match between client and server perspective, since there may be a proxy