Implement basic WebTransport over HTTP/2 functionality. This includes datagrams, closing session and draining session, but does not include streams yet. PiperOrigin-RevId: 582768912
diff --git a/build/source_list.bzl b/build/source_list.bzl index 0ba5220..e7806bf 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -392,6 +392,7 @@ "spdy/core/spdy_protocol.h", "spdy/core/zero_copy_output_buffer.h", "web_transport/complete_buffer_visitor.h", + "web_transport/encapsulated/encapsulated_web_transport.h", "web_transport/web_transport.h", ] quiche_core_srcs = [ @@ -680,6 +681,7 @@ "spdy/core/spdy_prefixed_buffer_reader.cc", "spdy/core/spdy_protocol.cc", "web_transport/complete_buffer_visitor.cc", + "web_transport/encapsulated/encapsulated_web_transport.cc", ] quiche_tool_support_hdrs = [ "common/platform/api/quiche_command_line_flags.h", @@ -742,6 +744,7 @@ "common/platform/api/quiche_test.h", "common/platform/api/quiche_test_loopback.h", "common/platform/api/quiche_test_output.h", + "common/test_tools/mock_streams.h", "common/test_tools/quiche_test_utils.h", "http2/adapter/mock_http2_visitor.h", "http2/adapter/recording_http2_visitor.h", @@ -1072,6 +1075,7 @@ "common/simple_buffer_allocator_test.cc", "common/structured_headers_generated_test.cc", "common/structured_headers_test.cc", + "common/test_tools/mock_streams_test.cc", "common/test_tools/quiche_test_utils_test.cc", "common/wire_serialization_test.cc", "http2/adapter/event_forwarder_test.cc", @@ -1302,6 +1306,7 @@ "spdy/core/spdy_pinnable_buffer_piece_test.cc", "spdy/core/spdy_prefixed_buffer_reader_test.cc", "spdy/core/spdy_protocol_test.cc", + "web_transport/encapsulated/encapsulated_web_transport_test.cc", ] io_tests_hdrs = [ ]
diff --git a/build/source_list.gni b/build/source_list.gni index 69e5f27..da2d729 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -392,6 +392,7 @@ "src/quiche/spdy/core/spdy_protocol.h", "src/quiche/spdy/core/zero_copy_output_buffer.h", "src/quiche/web_transport/complete_buffer_visitor.h", + "src/quiche/web_transport/encapsulated/encapsulated_web_transport.h", "src/quiche/web_transport/web_transport.h", ] quiche_core_srcs = [ @@ -680,6 +681,7 @@ "src/quiche/spdy/core/spdy_prefixed_buffer_reader.cc", "src/quiche/spdy/core/spdy_protocol.cc", "src/quiche/web_transport/complete_buffer_visitor.cc", + "src/quiche/web_transport/encapsulated/encapsulated_web_transport.cc", ] quiche_tool_support_hdrs = [ "src/quiche/common/platform/api/quiche_command_line_flags.h", @@ -742,6 +744,7 @@ "src/quiche/common/platform/api/quiche_test.h", "src/quiche/common/platform/api/quiche_test_loopback.h", "src/quiche/common/platform/api/quiche_test_output.h", + "src/quiche/common/test_tools/mock_streams.h", "src/quiche/common/test_tools/quiche_test_utils.h", "src/quiche/http2/adapter/mock_http2_visitor.h", "src/quiche/http2/adapter/recording_http2_visitor.h", @@ -1073,6 +1076,7 @@ "src/quiche/common/simple_buffer_allocator_test.cc", "src/quiche/common/structured_headers_generated_test.cc", "src/quiche/common/structured_headers_test.cc", + "src/quiche/common/test_tools/mock_streams_test.cc", "src/quiche/common/test_tools/quiche_test_utils_test.cc", "src/quiche/common/wire_serialization_test.cc", "src/quiche/http2/adapter/event_forwarder_test.cc", @@ -1303,6 +1307,7 @@ "src/quiche/spdy/core/spdy_pinnable_buffer_piece_test.cc", "src/quiche/spdy/core/spdy_prefixed_buffer_reader_test.cc", "src/quiche/spdy/core/spdy_protocol_test.cc", + "src/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc", ] io_tests_hdrs = [
diff --git a/build/source_list.json b/build/source_list.json index 165f594..42f3fbb 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -391,6 +391,7 @@ "quiche/spdy/core/spdy_protocol.h", "quiche/spdy/core/zero_copy_output_buffer.h", "quiche/web_transport/complete_buffer_visitor.h", + "quiche/web_transport/encapsulated/encapsulated_web_transport.h", "quiche/web_transport/web_transport.h" ], "quiche_core_srcs": [ @@ -678,7 +679,8 @@ "quiche/spdy/core/spdy_pinnable_buffer_piece.cc", "quiche/spdy/core/spdy_prefixed_buffer_reader.cc", "quiche/spdy/core/spdy_protocol.cc", - "quiche/web_transport/complete_buffer_visitor.cc" + "quiche/web_transport/complete_buffer_visitor.cc", + "quiche/web_transport/encapsulated/encapsulated_web_transport.cc" ], "quiche_tool_support_hdrs": [ "quiche/common/platform/api/quiche_command_line_flags.h", @@ -741,6 +743,7 @@ "quiche/common/platform/api/quiche_test.h", "quiche/common/platform/api/quiche_test_loopback.h", "quiche/common/platform/api/quiche_test_output.h", + "quiche/common/test_tools/mock_streams.h", "quiche/common/test_tools/quiche_test_utils.h", "quiche/http2/adapter/mock_http2_visitor.h", "quiche/http2/adapter/recording_http2_visitor.h", @@ -1072,6 +1075,7 @@ "quiche/common/simple_buffer_allocator_test.cc", "quiche/common/structured_headers_generated_test.cc", "quiche/common/structured_headers_test.cc", + "quiche/common/test_tools/mock_streams_test.cc", "quiche/common/test_tools/quiche_test_utils_test.cc", "quiche/common/wire_serialization_test.cc", "quiche/http2/adapter/event_forwarder_test.cc", @@ -1301,7 +1305,8 @@ "quiche/spdy/core/spdy_intrusive_list_test.cc", "quiche/spdy/core/spdy_pinnable_buffer_piece_test.cc", "quiche/spdy/core/spdy_prefixed_buffer_reader_test.cc", - "quiche/spdy/core/spdy_protocol_test.cc" + "quiche/spdy/core/spdy_protocol_test.cc", + "quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc" ], "io_tests_hdrs": [
diff --git a/quiche/common/capsule.cc b/quiche/common/capsule.cc index 66d4a66..421f11d 100644 --- a/quiche/common/capsule.cc +++ b/quiche/common/capsule.cc
@@ -4,8 +4,13 @@ #include "quiche/common/capsule.h" +#include <cstddef> +#include <cstdint> #include <limits> +#include <ostream> +#include <string> #include <type_traits> +#include <utility> #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -350,6 +355,17 @@ } } +QuicheBuffer SerializeDatagramCapsuleHeader(uint64_t datagram_size, + QuicheBufferAllocator* allocator) { + absl::StatusOr<QuicheBuffer> buffer = + SerializeIntoBuffer(allocator, WireVarInt62(CapsuleType::DATAGRAM), + WireVarInt62(datagram_size)); + if (!buffer.ok()) { + return QuicheBuffer(); + } + return *std::move(buffer); +} + QuicheBuffer SerializeCapsule(const Capsule& capsule, quiche::QuicheBufferAllocator* allocator) { absl::StatusOr<QuicheBuffer> serialized =
diff --git a/quiche/common/capsule.h b/quiche/common/capsule.h index b5e1831..3cadc4d 100644 --- a/quiche/common/capsule.h +++ b/quiche/common/capsule.h
@@ -391,6 +391,10 @@ QUICHE_EXPORT quiche::QuicheBuffer SerializeCapsule( const Capsule& capsule, quiche::QuicheBufferAllocator* allocator); +// Serializes the header for a datagram of size |datagram_size|. +QUICHE_EXPORT QuicheBuffer SerializeDatagramCapsuleHeader( + uint64_t datagram_size, QuicheBufferAllocator* allocator); + } // namespace quiche #endif // QUICHE_COMMON_CAPSULE_H_
diff --git a/quiche/common/capsule_test.cc b/quiche/common/capsule_test.cc index ae55aaf..0aed714 100644 --- a/quiche/common/capsule_test.cc +++ b/quiche/common/capsule_test.cc
@@ -10,8 +10,10 @@ #include <vector> #include "absl/strings/escaping.h" +#include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_ip_address.h" #include "quiche/common/simple_buffer_allocator.h" #include "quiche/common/test_tools/quiche_test_utils.h" @@ -86,6 +88,16 @@ TestSerialization(expected_capsule, capsule_fragment); } +TEST_F(CapsuleTest, DatagramCapsuleViaHeader) { + std::string datagram_payload = absl::HexStringToBytes("a1a2a3a4a5a6a7a8"); + quiche::QuicheBuffer expected_capsule = SerializeCapsule( + Capsule::Datagram(datagram_payload), SimpleBufferAllocator::Get()); + quiche::QuicheBuffer actual_header = SerializeDatagramCapsuleHeader( + datagram_payload.size(), SimpleBufferAllocator::Get()); + EXPECT_EQ(expected_capsule.AsStringView(), + absl::StrCat(actual_header.AsStringView(), datagram_payload)); +} + TEST_F(CapsuleTest, LegacyDatagramCapsule) { std::string capsule_fragment = absl::HexStringToBytes( "80ff37a0" // LEGACY_DATAGRAM capsule type
diff --git a/quiche/common/test_tools/mock_streams.h b/quiche/common/test_tools/mock_streams.h new file mode 100644 index 0000000..577a324 --- /dev/null +++ b/quiche/common/test_tools/mock_streams.h
@@ -0,0 +1,101 @@ +// Copyright (c) 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_ +#define QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_ + +#include <algorithm> +#include <cstddef> +#include <string> +#include <utility> + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/types/span.h" +#include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/quiche_stream.h" + +namespace quiche::test { + +// Mockable stream that stores all of the data into an std::string by default. +class MockWriteStream : public quiche::WriteStream { + public: + MockWriteStream() { + ON_CALL(*this, CanWrite()).WillByDefault(testing::Return(true)); + ON_CALL(*this, Writev(testing::_, testing::_)) + .WillByDefault([&](absl::Span<const absl::string_view> data, + const StreamWriteOptions& options) { + return AppendToData(data, options); + }); + } + + MOCK_METHOD(absl::Status, Writev, + (absl::Span<const absl::string_view> data, + const StreamWriteOptions& options), + (override)); + MOCK_METHOD(bool, CanWrite, (), (const, override)); + + absl::Status AppendToData(absl::Span<const absl::string_view> data, + const StreamWriteOptions& options) { + for (absl::string_view fragment : data) { + data_.append(fragment.data(), fragment.size()); + } + ProcessOptions(options); + return absl::OkStatus(); + } + void ProcessOptions(const StreamWriteOptions& options) { + fin_written_ |= options.send_fin(); + } + + std::string& data() { return data_; } + bool fin_written() { return fin_written_; } + + private: + std::string data_; + bool fin_written_ = false; +}; + +// Reads stream data from an std::string buffer. +class ReadStreamFromString : public ReadStream { + public: + explicit ReadStreamFromString(std::string* data) : data_(data) {} + + ReadResult Read(absl::Span<char> buffer) override { + size_t data_to_copy = std::min(buffer.size(), data_->size()); + std::copy(data_->begin(), data_->begin() + data_to_copy, buffer.begin()); + *data_ = data_->substr(data_to_copy); + return ReadResult{data_to_copy, data_->empty() && fin_}; + } + ReadResult Read(std::string* output) override { + size_t bytes = data_->size(); + output->append(std::move(*data_)); + data_->clear(); + return ReadResult{bytes, fin_}; + } + + size_t ReadableBytes() const override { return data_->size(); } + + virtual PeekResult PeekNextReadableRegion() const override { + PeekResult result; + result.peeked_data = *data_; + result.fin_next = data_->empty() && fin_; + result.all_data_received = fin_; + return result; + } + + bool SkipBytes(size_t bytes) override { + *data_ = data_->substr(bytes); + return data_->empty() && fin_; + } + + void set_fin() { fin_ = true; } + + private: + std::string* data_; + bool fin_ = false; +}; + +} // namespace quiche::test + +#endif // QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
diff --git a/quiche/common/test_tools/mock_streams_test.cc b/quiche/common/test_tools/mock_streams_test.cc new file mode 100644 index 0000000..2e8a2e4 --- /dev/null +++ b/quiche/common/test_tools/mock_streams_test.cc
@@ -0,0 +1,63 @@ +// Copyright (c) 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/common/test_tools/mock_streams.h" + +#include <array> +#include <string> + +#include "absl/types/span.h" +#include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/quiche_stream.h" +#include "quiche/common/test_tools/quiche_test_utils.h" + +namespace quiche::test { +namespace { + +using ::testing::ElementsAre; +using ::testing::IsEmpty; + +TEST(MockWriteStreamTest, DefaultWrite) { + MockWriteStream stream; + QUICHE_EXPECT_OK(quiche::WriteIntoStream(stream, "test")); + EXPECT_EQ(stream.data(), "test"); + EXPECT_FALSE(stream.fin_written()); +} + +TEST(ReadStreamFromStringTest, ReadIntoSpan) { + std::string source = "abcdef"; + std::array<char, 3> buffer; + ReadStreamFromString stream(&source); + EXPECT_EQ(stream.ReadableBytes(), 6); + + stream.Read(absl::MakeSpan(buffer)); + EXPECT_THAT(buffer, ElementsAre('a', 'b', 'c')); + EXPECT_EQ(stream.ReadableBytes(), 3); + + stream.Read(absl::MakeSpan(buffer)); + EXPECT_THAT(buffer, ElementsAre('d', 'e', 'f')); + EXPECT_EQ(stream.ReadableBytes(), 0); + EXPECT_THAT(source, IsEmpty()); +} + +TEST(ReadStreamFromStringTest, ReadIntoString) { + std::string source = "abcdef"; + std::string destination; + ReadStreamFromString stream(&source); + stream.Read(&destination); + EXPECT_EQ(destination, "abcdef"); + EXPECT_THAT(source, IsEmpty()); +} + +TEST(ReadStreamFromStringTest, PeekAndSkip) { + std::string source = "abcdef"; + ReadStreamFromString stream(&source); + EXPECT_EQ(stream.PeekNextReadableRegion().peeked_data, "abcdef"); + stream.SkipBytes(2); + EXPECT_EQ(stream.PeekNextReadableRegion().peeked_data, "cdef"); + EXPECT_EQ(source, "cdef"); +} + +} // namespace +} // namespace quiche::test
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc new file mode 100644 index 0000000..0aa3763 --- /dev/null +++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
@@ -0,0 +1,355 @@ +// Copyright (c) 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/web_transport/encapsulated/encapsulated_web_transport.h" + +#include <array> +#include <cstdint> +#include <memory> +#include <string> +#include <utility> + +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/time/time.h" +#include "absl/types/span.h" +#include "quiche/common/capsule.h" +#include "quiche/common/http/http_header_block.h" +#include "quiche/common/platform/api/quiche_logging.h" +#include "quiche/common/quiche_buffer_allocator.h" +#include "quiche/common/quiche_callbacks.h" +#include "quiche/common/quiche_status_utils.h" +#include "quiche/common/quiche_stream.h" +#include "quiche/web_transport/web_transport.h" + +namespace webtransport { + +namespace { + +using ::quiche::Capsule; +using ::quiche::CapsuleType; +using ::quiche::CloseWebTransportSessionCapsule; + +// This is arbitrary, since we don't have any real MTU restriction when running +// over TCP. +constexpr uint64_t kEncapsulatedMaxDatagramSize = 9000; + +} // namespace + +EncapsulatedSession::EncapsulatedSession( + Perspective perspective, FatalErrorCallback fatal_error_callback) + : perspective_(perspective), + fatal_error_callback_(std::move(fatal_error_callback)), + capsule_parser_(this) {} + +void EncapsulatedSession::InitializeClient( + std::unique_ptr<SessionVisitor> visitor, + quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer, + quiche::ReadStream* reader) { + if (state_ != kUninitialized) { + OnFatalError("Called InitializeClient() in an invalid state"); + return; + } + if (perspective_ != Perspective::kClient) { + OnFatalError("Called InitializeClient() on a server session"); + return; + } + + visitor_ = std::move(visitor); + writer_ = writer; + reader_ = reader; + state_ = kWaitingForHeaders; +} + +void EncapsulatedSession::InitializeServer( + std::unique_ptr<SessionVisitor> visitor, + const quiche::HttpHeaderBlock& /*incoming_headers*/, + quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer, + quiche::ReadStream* reader) { + if (state_ != kUninitialized) { + OnFatalError("Called InitializeServer() in an invalid state"); + return; + } + if (perspective_ != Perspective::kServer) { + OnFatalError("Called InitializeServer() on a client session"); + return; + } + + visitor_ = std::move(visitor); + writer_ = writer; + reader_ = reader; + OpenSession(); +} +void EncapsulatedSession::ProcessIncomingServerHeaders( + const quiche::HttpHeaderBlock& /*headers*/) { + if (state_ != kWaitingForHeaders) { + OnFatalError("Called ProcessIncomingServerHeaders() in an invalid state"); + return; + } + OpenSession(); +} + +void EncapsulatedSession::CloseSession(SessionErrorCode error_code, + absl::string_view error_message) { + switch (state_) { + case kUninitialized: + case kWaitingForHeaders: + OnFatalError(absl::StrCat( + "Attempted to close a session before it opened with error 0x", + absl::Hex(error_code), ": ", error_message)); + return; + case kSessionClosing: + case kSessionClosed: + OnFatalError(absl::StrCat( + "Attempted to close a session that is already closed with error 0x", + absl::Hex(error_code), ": ", error_message)); + return; + case kSessionOpen: + break; + } + state_ = kSessionClosing; + buffered_session_close_ = + BufferedClose{error_code, std::string(error_message)}; + OnCanWrite(); +} + +Stream* EncapsulatedSession::AcceptIncomingBidirectionalStream() { + return nullptr; +} +Stream* EncapsulatedSession::AcceptIncomingUnidirectionalStream() { + return nullptr; +} +bool EncapsulatedSession::CanOpenNextOutgoingBidirectionalStream() { + return false; +} +bool EncapsulatedSession::CanOpenNextOutgoingUnidirectionalStream() { + return false; +} +Stream* EncapsulatedSession::OpenOutgoingBidirectionalStream() { + return nullptr; +} +Stream* EncapsulatedSession::OpenOutgoingUnidirectionalStream() { + return nullptr; +} + +Stream* EncapsulatedSession::GetStreamById(StreamId /*id*/) { return nullptr; } +DatagramStats EncapsulatedSession::GetDatagramStats() { + DatagramStats stats; + stats.expired_outgoing = 0; + stats.lost_outgoing = 0; + return stats; +} + +SessionStats EncapsulatedSession::GetSessionStats() { + // We could potentially get stats via tcp_info and similar mechanisms, but + // that would require us knowing what the underlying socket is. + return SessionStats(); +} + +void EncapsulatedSession::NotifySessionDraining() { + control_capsule_queue_.push_back(quiche::SerializeCapsule( + quiche::Capsule(quiche::DrainWebTransportSessionCapsule()), allocator_)); + OnCanWrite(); +} +void EncapsulatedSession::SetOnDraining( + quiche::SingleUseCallback<void()> callback) { + draining_callback_ = std::move(callback); +} + +DatagramStatus EncapsulatedSession::SendOrQueueDatagram( + absl::string_view datagram) { + if (datagram.size() > GetMaxDatagramSize()) { + return DatagramStatus{ + DatagramStatusCode::kTooBig, + absl::StrCat("Datagram is ", datagram.size(), + " bytes long, while the specified maximum size is ", + GetMaxDatagramSize())}; + } + + bool write_blocked; + switch (state_) { + case kUninitialized: + write_blocked = true; + break; + // We can send datagrams before receiving any headers from the peer, since + // datagrams are not subject to queueing. + case kWaitingForHeaders: + case kSessionOpen: + write_blocked = !writer_->CanWrite(); + break; + case kSessionClosing: + case kSessionClosed: + return DatagramStatus{DatagramStatusCode::kInternalError, + "Writing into an already closed session"}; + } + + if (write_blocked) { + // TODO: this *may* be useful to split into a separate queue. + control_capsule_queue_.push_back( + quiche::SerializeCapsule(Capsule::Datagram(datagram), allocator_)); + return DatagramStatus{DatagramStatusCode::kSuccess, ""}; + } + + // We could always write via OnCanWrite() above, but the optimistic path below + // allows us to avoid a copy. + quiche::QuicheBuffer buffer = + quiche::SerializeDatagramCapsuleHeader(datagram.size(), allocator_); + std::array spans = {buffer.AsStringView(), datagram}; + absl::Status write_status = + writer_->Writev(absl::MakeConstSpan(spans), quiche::StreamWriteOptions()); + if (!write_status.ok()) { + OnWriteError(write_status); + return DatagramStatus{ + DatagramStatusCode::kInternalError, + absl::StrCat("Write error for datagram: ", write_status.ToString())}; + } + return DatagramStatus{DatagramStatusCode::kSuccess, ""}; +} + +uint64_t EncapsulatedSession::GetMaxDatagramSize() const { + return kEncapsulatedMaxDatagramSize; +} + +void EncapsulatedSession::SetDatagramMaxTimeInQueue( + absl::Duration /*max_time_in_queue*/) { + // TODO(b/264263113): implement this (requires having a mockable clock). +} + +void EncapsulatedSession::OnCanWrite() { + if (state_ == kUninitialized || !writer_) { + OnFatalError("Trying to write before the session is initialized"); + return; + } + if (state_ == kSessionClosed) { + OnFatalError("Trying to write before the session is closed"); + return; + } + + if (state_ == kSessionClosing) { + if (writer_->CanWrite()) { + CloseWebTransportSessionCapsule capsule{ + buffered_session_close_.error_code, + buffered_session_close_.error_message}; + quiche::QuicheBuffer buffer = + quiche::SerializeCapsule(Capsule(std::move(capsule)), allocator_); + absl::Status write_status = SendFin(buffer.AsStringView()); + if (!write_status.ok()) { + OnWriteError(quiche::AppendToStatus(write_status, + " while writing WT_CLOSE_SESSION")); + return; + } + OnSessionClosed(buffered_session_close_.error_code, + buffered_session_close_.error_message); + } + return; + } + + while (writer_->CanWrite() && !control_capsule_queue_.empty()) { + absl::Status write_status = quiche::WriteIntoStream( + *writer_, control_capsule_queue_.front().AsStringView()); + if (!write_status.ok()) { + OnWriteError(write_status); + return; + } + control_capsule_queue_.pop_front(); + } + + // TODO(b/264263113): send stream data. +} + +void EncapsulatedSession::OnCanRead() { + if (state_ == kSessionClosed || state_ == kSessionClosing) { + return; + } + bool has_fin = quiche::ProcessAllReadableRegions( + *reader_, [&](absl::string_view fragment) { + capsule_parser_.IngestCapsuleFragment(fragment); + }); + if (has_fin) { + capsule_parser_.ErrorIfThereIsRemainingBufferedData(); + OnSessionClosed(0, ""); + } +} + +bool EncapsulatedSession::OnCapsule(const quiche::Capsule& capsule) { + switch (capsule.capsule_type()) { + case CapsuleType::DATAGRAM: + visitor_->OnDatagramReceived( + capsule.datagram_capsule().http_datagram_payload); + break; + case CapsuleType::DRAIN_WEBTRANSPORT_SESSION: + if (draining_callback_) { + std::move(draining_callback_)(); + } + break; + case CapsuleType::CLOSE_WEBTRANSPORT_SESSION: + OnSessionClosed( + capsule.close_web_transport_session_capsule().error_code, + std::string( + capsule.close_web_transport_session_capsule().error_message)); + break; + default: + break; + } + return true; +} + +void EncapsulatedSession::OnCapsuleParseFailure( + absl::string_view error_message) { + OnFatalError(absl::StrCat("Stream parse error: ", error_message)); +} + +void EncapsulatedSession::OpenSession() { + state_ = kSessionOpen; + visitor_->OnSessionReady(); + OnCanWrite(); + OnCanRead(); +} + +absl::Status EncapsulatedSession::SendFin(absl::string_view data) { + QUICHE_DCHECK(!fin_sent_); + fin_sent_ = true; + quiche::StreamWriteOptions options; + options.set_send_fin(true); + return quiche::WriteIntoStream(*writer_, data, options); +} + +void EncapsulatedSession::OnSessionClosed(SessionErrorCode error_code, + const std::string& error_message) { + if (!fin_sent_) { + absl::Status status = SendFin(""); + if (!status.ok()) { + OnWriteError(status); + return; + } + } + + if (session_close_notified_) { + QUICHE_DCHECK_EQ(state_, kSessionClosed); + return; + } + state_ = kSessionClosed; + session_close_notified_ = true; + + if (visitor_ != nullptr) { + visitor_->OnSessionClosed(error_code, error_message); + } +} + +void EncapsulatedSession::OnFatalError(absl::string_view error_message) { + QUICHE_DLOG(ERROR) << "Fatal error in encapsulated WebTransport: " + << error_message; + state_ = kSessionClosed; + if (fatal_error_callback_) { + std::move(fatal_error_callback_)(error_message); + } +} + +void EncapsulatedSession::OnWriteError(absl::Status error) { + OnFatalError(absl::StrCat( + error, " while trying to write encapsulated WebTransport data")); +} + +} // namespace webtransport
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.h b/quiche/web_transport/encapsulated/encapsulated_web_transport.h new file mode 100644 index 0000000..85c14c9 --- /dev/null +++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
@@ -0,0 +1,133 @@ +// Copyright (c) 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_ +#define QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_ + +#include <cstdint> +#include <memory> +#include <string> + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/time/time.h" +#include "quiche/common/capsule.h" +#include "quiche/common/http/http_header_block.h" +#include "quiche/common/platform/api/quiche_export.h" +#include "quiche/common/quiche_buffer_allocator.h" +#include "quiche/common/quiche_callbacks.h" +#include "quiche/common/quiche_circular_deque.h" +#include "quiche/common/quiche_stream.h" +#include "quiche/common/simple_buffer_allocator.h" +#include "quiche/web_transport/web_transport.h" + +namespace webtransport { + +using FatalErrorCallback = quiche::SingleUseCallback<void(absl::string_view)>; + +// Implementation of the WebTransport over HTTP/2 protocol; works over any +// arbitrary bidirectional bytestream that can be prefixed with HTTP headers. +// Specification: https://datatracker.ietf.org/doc/draft-ietf-webtrans-http2/ +class QUICHE_EXPORT EncapsulatedSession + : public webtransport::Session, + public quiche::WriteStreamVisitor, + public quiche::ReadStreamVisitor, + public quiche::CapsuleParser::Visitor { + public: + // The state machine of the transport. + enum State { + // The transport object has been created, but + // InitializeClient/InitializeServer has not been called yet. + kUninitialized, + // The client has sent its own headers, but haven't received a response yet. + kWaitingForHeaders, + // Both the client and the server headers have been processed. + kSessionOpen, + // The session close has been requested, but the CLOSE capsule hasn't been + // sent yet. + kSessionClosing, + // The session has been closed; no further data will be exchanged. + kSessionClosed, + }; + + // The `fatal_error_callback` implies that any state related to the session + // should be torn down after it's been called. + EncapsulatedSession(Perspective perspective, + FatalErrorCallback fatal_error_callback); + + // WebTransport uses HTTP headers in a similar way to how QUIC uses SETTINGS; + // thus, the headers are necessary to initialize the session. + void InitializeClient(std::unique_ptr<SessionVisitor> visitor, + quiche::HttpHeaderBlock& outgoing_headers, + quiche::WriteStream* writer, + quiche::ReadStream* reader); + void InitializeServer(std::unique_ptr<SessionVisitor> visitor, + const quiche::HttpHeaderBlock& incoming_headers, + quiche::HttpHeaderBlock& outgoing_headers, + quiche::WriteStream* writer, + quiche::ReadStream* reader); + void ProcessIncomingServerHeaders(const quiche::HttpHeaderBlock& headers); + + // webtransport::Session implementation. + void CloseSession(SessionErrorCode error_code, + absl::string_view error_message) override; + Stream* AcceptIncomingBidirectionalStream() override; + Stream* AcceptIncomingUnidirectionalStream() override; + bool CanOpenNextOutgoingBidirectionalStream() override; + bool CanOpenNextOutgoingUnidirectionalStream() override; + Stream* OpenOutgoingBidirectionalStream() override; + Stream* OpenOutgoingUnidirectionalStream() override; + DatagramStatus SendOrQueueDatagram(absl::string_view datagram) override; + uint64_t GetMaxDatagramSize() const override; + void SetDatagramMaxTimeInQueue(absl::Duration max_time_in_queue) override; + Stream* GetStreamById(StreamId id) override; + DatagramStats GetDatagramStats() override; + SessionStats GetSessionStats() override; + void NotifySessionDraining() override; + void SetOnDraining(quiche::SingleUseCallback<void()> callback) override; + + // quiche::WriteStreamVisitor implementation. + void OnCanWrite() override; + // quiche::ReadStreamVisitor implementation. + void OnCanRead() override; + // quiche::CapsuleParser::Visitor implementation. + bool OnCapsule(const quiche::Capsule& capsule) override; + void OnCapsuleParseFailure(absl::string_view error_message) override; + + State state() const { return state_; } + + private: + struct BufferedClose { + SessionErrorCode error_code = 0; + std::string error_message; + }; + + Perspective perspective_; + State state_ = kUninitialized; + std::unique_ptr<SessionVisitor> visitor_ = nullptr; + FatalErrorCallback fatal_error_callback_; + quiche::SingleUseCallback<void()> draining_callback_; + quiche::WriteStream* writer_ = nullptr; // Not owned. + quiche::ReadStream* reader_ = nullptr; // Not owned. + quiche::QuicheBufferAllocator* allocator_ = + quiche::SimpleBufferAllocator::Get(); + quiche::CapsuleParser capsule_parser_; + + bool session_close_notified_ = false; + bool fin_sent_ = false; + + BufferedClose buffered_session_close_; + quiche::QuicheCircularDeque<quiche::QuicheBuffer> control_capsule_queue_; + + void OpenSession(); + absl::Status SendFin(absl::string_view data); + void OnSessionClosed(SessionErrorCode error_code, + const std::string& error_message); + void OnFatalError(absl::string_view error_message); + void OnWriteError(absl::Status error); +}; + +} // namespace webtransport + +#endif // QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc new file mode 100644 index 0000000..15728e4 --- /dev/null +++ b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
@@ -0,0 +1,318 @@ +// Copyright (c) 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/web_transport/encapsulated/encapsulated_web_transport.h" + +#include <memory> +#include <string> +#include <utility> + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/types/span.h" +#include "quiche/common/capsule.h" +#include "quiche/common/http/http_header_block.h" +#include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/quiche_buffer_allocator.h" +#include "quiche/common/quiche_stream.h" +#include "quiche/common/simple_buffer_allocator.h" +#include "quiche/common/test_tools/mock_streams.h" +#include "quiche/web_transport/test_tools/mock_web_transport.h" +#include "quiche/web_transport/web_transport.h" + +namespace webtransport::test { +namespace { + +using ::quiche::Capsule; +using ::quiche::CapsuleType; +using ::testing::_; +using ::testing::HasSubstr; +using ::testing::IsEmpty; +using ::testing::Return; +using ::testing::StrEq; + +class EncapsulatedWebTransportTest : public quiche::test::QuicheTest, + public quiche::CapsuleParser::Visitor { + public: + EncapsulatedWebTransportTest() : parser_(this), reader_(&read_buffer_) { + ON_CALL(fatal_error_callback_, Call(_)) + .WillByDefault([](absl::string_view error) { + ADD_FAILURE() << "Fatal session error: " << error; + }); + ON_CALL(writer_, Writev(_, _)) + .WillByDefault([&](absl::Span<const absl::string_view> data, + const quiche::StreamWriteOptions& options) { + for (absl::string_view fragment : data) { + parser_.IngestCapsuleFragment(fragment); + } + writer_.ProcessOptions(options); + return absl::OkStatus(); + }); + } + + std::unique_ptr<EncapsulatedSession> CreateTransport( + Perspective perspective) { + auto transport = std::make_unique<EncapsulatedSession>( + perspective, fatal_error_callback_.AsStdFunction()); + session_ = transport.get(); + return transport; + } + + std::unique_ptr<SessionVisitor> CreateAndStoreVisitor() { + auto visitor = std::make_unique<testing::StrictMock<MockSessionVisitor>>(); + visitor_ = visitor.get(); + return visitor; + } + + MOCK_METHOD(bool, OnCapsule, (const Capsule&), (override)); + + void OnCapsuleParseFailure(absl::string_view error_message) override { + ADD_FAILURE() << "Written an invalid capsule: " << error_message; + } + + void ProcessIncomingCapsule(const Capsule& capsule) { + quiche::QuicheBuffer buffer = + quiche::SerializeCapsule(capsule, quiche::SimpleBufferAllocator::Get()); + read_buffer_.append(buffer.data(), buffer.size()); + session_->OnCanRead(); + } + + void DefaultHandshakeForClient(EncapsulatedSession& session) { + quiche::HttpHeaderBlock outgoing_headers, incoming_headers; + session.InitializeClient(CreateAndStoreVisitor(), outgoing_headers, + &writer_, &reader_); + EXPECT_CALL(*visitor_, OnSessionReady()); + session.ProcessIncomingServerHeaders(incoming_headers); + } + + protected: + quiche::CapsuleParser parser_; + quiche::test::MockWriteStream writer_; + std::string read_buffer_; + quiche::test::ReadStreamFromString reader_; + MockSessionVisitor* visitor_ = nullptr; + EncapsulatedSession* session_ = nullptr; + testing::MockFunction<void(absl::string_view)> fatal_error_callback_; +}; + +TEST_F(EncapsulatedWebTransportTest, SetupClientSession) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + quiche::HttpHeaderBlock outgoing_headers, incoming_headers; + EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized); + session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_, + &reader_); + EXPECT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders); + EXPECT_CALL(*visitor_, OnSessionReady()); + session->ProcessIncomingServerHeaders(incoming_headers); + EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen); +} + +TEST_F(EncapsulatedWebTransportTest, SetupServerSession) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kServer); + quiche::HttpHeaderBlock outgoing_headers, incoming_headers; + EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized); + std::unique_ptr<SessionVisitor> visitor = CreateAndStoreVisitor(); + EXPECT_CALL(*visitor_, OnSessionReady()); + session->InitializeServer(std::move(visitor), outgoing_headers, + incoming_headers, &writer_, &reader_); + EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen); +} + +TEST_F(EncapsulatedWebTransportTest, CloseSession) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) { + EXPECT_EQ(capsule.capsule_type(), CapsuleType::CLOSE_WEBTRANSPORT_SESSION); + EXPECT_EQ(capsule.close_web_transport_session_capsule().error_code, 0x1234); + EXPECT_EQ(capsule.close_web_transport_session_capsule().error_message, + "test close"); + return true; + }); + EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen); + EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close"))); + session->CloseSession(0x1234, "test close"); + EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed); + EXPECT_TRUE(writer_.fin_written()); + + EXPECT_CALL(fatal_error_callback_, Call(_)) + .WillOnce([](absl::string_view error) { + EXPECT_THAT(error, HasSubstr("close a session that is already closed")); + }); + session->CloseSession(0x1234, "test close"); +} + +TEST_F(EncapsulatedWebTransportTest, CloseSessionWriteBlocked) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false)); + EXPECT_CALL(*this, OnCapsule(_)).Times(0); + EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen); + session->CloseSession(0x1234, "test close"); + EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosing); + + EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) { + EXPECT_EQ(capsule.capsule_type(), CapsuleType::CLOSE_WEBTRANSPORT_SESSION); + EXPECT_EQ(capsule.close_web_transport_session_capsule().error_code, 0x1234); + EXPECT_EQ(capsule.close_web_transport_session_capsule().error_message, + "test close"); + return true; + }); + EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(true)); + EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close"))); + session->OnCanWrite(); + EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed); + EXPECT_TRUE(writer_.fin_written()); +} + +TEST_F(EncapsulatedWebTransportTest, ReceiveFin) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + + EXPECT_CALL(*visitor_, OnSessionClosed(0, IsEmpty())); + reader_.set_fin(); + session->OnCanRead(); + EXPECT_TRUE(writer_.fin_written()); +} + +TEST_F(EncapsulatedWebTransportTest, ReceiveCloseSession) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + + EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test"))); + ProcessIncomingCapsule(Capsule::CloseWebTransportSession(0x1234, "test")); + EXPECT_TRUE(writer_.fin_written()); + reader_.set_fin(); + session->OnCanRead(); +} + +TEST_F(EncapsulatedWebTransportTest, ReceiveMalformedData) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + + EXPECT_CALL(fatal_error_callback_, Call(HasSubstr("too much capsule data"))) + .WillOnce([] {}); + read_buffer_ = std::string(2 * 1024 * 1024, '\xff'); + session->OnCanRead(); +} + +TEST_F(EncapsulatedWebTransportTest, SendDatagrams) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) { + EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM); + EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test"); + return true; + }); + DatagramStatus status = session->SendOrQueueDatagram("test"); + EXPECT_EQ(status.code, DatagramStatusCode::kSuccess); +} + +TEST_F(EncapsulatedWebTransportTest, SendDatagramsEarly) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + quiche::HttpHeaderBlock outgoing_headers; + session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_, + &reader_); + EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) { + EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM); + EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test"); + return true; + }); + ASSERT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders); + session->SendOrQueueDatagram("test"); +} + +TEST_F(EncapsulatedWebTransportTest, SendDatagramsBeforeInitialization) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + quiche::HttpHeaderBlock outgoing_headers; + EXPECT_CALL(*this, OnCapsule(_)).Times(0); + ASSERT_EQ(session->state(), EncapsulatedSession::kUninitialized); + session->SendOrQueueDatagram("test"); + + EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) { + EXPECT_EQ(capsule.capsule_type(), CapsuleType::DATAGRAM); + EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test"); + return true; + }); + DefaultHandshakeForClient(*session); +} + +TEST_F(EncapsulatedWebTransportTest, SendDatagramsTooBig) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + EXPECT_CALL(*this, OnCapsule(_)).Times(0); + std::string long_string(16 * 1024, 'a'); + DatagramStatus status = session->SendOrQueueDatagram(long_string); + EXPECT_EQ(status.code, DatagramStatusCode::kTooBig); +} + +TEST_F(EncapsulatedWebTransportTest, ReceiveDatagrams) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + EXPECT_CALL(*visitor_, OnDatagramReceived(_)) + .WillOnce([](absl::string_view data) { EXPECT_EQ(data, "test"); }); + ProcessIncomingCapsule(Capsule::Datagram("test")); +} + +TEST_F(EncapsulatedWebTransportTest, SendDraining) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) { + EXPECT_EQ(capsule.capsule_type(), CapsuleType::DRAIN_WEBTRANSPORT_SESSION); + return true; + }); + session->NotifySessionDraining(); +} + +TEST_F(EncapsulatedWebTransportTest, ReceiveDraining) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + testing::MockFunction<void()> callback; + session->SetOnDraining(callback.AsStdFunction()); + EXPECT_CALL(callback, Call()); + ProcessIncomingCapsule(Capsule(quiche::DrainWebTransportSessionCapsule())); +} + +TEST_F(EncapsulatedWebTransportTest, WriteErrorDatagram) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + EXPECT_CALL(writer_, Writev(_, _)) + .WillOnce(Return(absl::InternalError("Test write error"))); + EXPECT_CALL(fatal_error_callback_, Call(_)) + .WillOnce([](absl::string_view error) { + EXPECT_THAT(error, HasSubstr("Test write error")); + }); + DatagramStatus status = session->SendOrQueueDatagram("test"); + EXPECT_EQ(status.code, DatagramStatusCode::kInternalError); +} + +TEST_F(EncapsulatedWebTransportTest, WriteErrorControlCapsule) { + std::unique_ptr<EncapsulatedSession> session = + CreateTransport(Perspective::kClient); + DefaultHandshakeForClient(*session); + EXPECT_CALL(writer_, Writev(_, _)) + .WillOnce(Return(absl::InternalError("Test write error"))); + EXPECT_CALL(fatal_error_callback_, Call(_)) + .WillOnce([](absl::string_view error) { + EXPECT_THAT(error, HasSubstr("Test write error")); + }); + session->NotifySessionDraining(); +} + +} // namespace +} // namespace webtransport::test
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h index be08f89..dbf22f8 100644 --- a/quiche/web_transport/web_transport.h +++ b/quiche/web_transport/web_transport.h
@@ -24,6 +24,8 @@ namespace webtransport { +enum class Perspective { kClient, kServer }; + // A numeric ID uniquely identifying a WebTransport stream. Note that by design, // those IDs are not available in the Web API, and the IDs do not necessarily // match between client and server perspective, since there may be a proxy