Refactor the read part of WebTransport Stream API into ReadStream. The following changes are included: - Read()/ReadableBytes() are now a part of their own interface. - Introduce PeekNextAvaliableRegion()/MarkConsumed() as a zero-copy alternative to read. - Introduce a utility method for to use the peek API while taking care of reading FIN correctly. - Make webtransport::Stream a direct subclass of TerminableStream to avoid diamond inheritance problem. The long-term plan here is to abstract multiple kinds of streams for the goals of public APIs, including incoming QUIC streams, incoming WebTransport streams, and incoming HTTP request/response bodies. PiperOrigin-RevId: 568695001
diff --git a/quiche/common/quiche_stream.h b/quiche/common/quiche_stream.h index 072b3ca..9de876b 100644 --- a/quiche/common/quiche_stream.h +++ b/quiche/common/quiche_stream.h
@@ -2,15 +2,19 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -// General-purpose abstractions for a write stream. +// General-purpose abstractions for read/write streams. #ifndef QUICHE_COMMON_QUICHE_STREAM_H_ #define QUICHE_COMMON_QUICHE_STREAM_H_ +#include <cstddef> +#include <string> + #include "absl/status/status.h" #include "absl/strings/string_view.h" #include "absl/types/span.h" #include "quiche/common/platform/api/quiche_export.h" +#include "quiche/common/quiche_callbacks.h" namespace quiche { @@ -26,6 +30,92 @@ virtual void AbruptlyTerminate(absl::Status error) = 0; }; +// A general-purpose visitor API that gets notifications for ReadStream-related +// events. +class QUICHE_EXPORT ReadStreamVisitor { + public: + virtual ~ReadStreamVisitor() = default; + + // Called whenever the stream has new data available to read. Unless otherwise + // specified, QUICHE stream reads are level-triggered, which means that the + // callback will be called repeatedly as long as there is still data in the + // buffer. + virtual void OnCanRead() = 0; +}; + +// General purpose abstraction for a stream of data that can be read from the +// network. The class is designed around the idea that a network stream stores +// all of the received data in a sequence of contiguous buffers. Because of +// that, there are two ways to read from a stream: +// - Read() will copy data into a user-provided buffer, reassembling it if it +// is split across multiple buffers internally. +// - PeekNextReadableRegion()/SkipBytes() let the caller access the underlying +// buffers directly, potentially avoiding the copying at the cost of the +// caller having to deal with discontinuities. +class QUICHE_EXPORT ReadStream { + public: + struct QUICHE_EXPORT ReadResult { + // Number of bytes actually read. + size_t bytes_read = 0; + // Whether the FIN has been received; if true, no further data will arrive + // on the stream, and the stream object can be soon potentially garbage + // collected. + bool fin = false; + }; + + struct PeekResult { + // The next available chunk in the sequencer buffer. + absl::string_view peeked_data; + // True if all of the data up to the FIN has been read. + bool fin_next = false; + // True if all of the data up to the FIN has been received (but not + // necessarily read). + bool all_data_received = false; + + // Indicates that `SkipBytes()` will make progress if called. + bool has_data() const { return !peeked_data.empty() || fin_next; } + }; + + virtual ~ReadStream() = default; + + // Reads at most `buffer.size()` bytes into `buffer`. + [[nodiscard]] virtual ReadResult Read(absl::Span<char> buffer) = 0; + + // Reads all available data and appends it to the end of `output`. + [[nodiscard]] virtual ReadResult Read(std::string* output) = 0; + + // Indicates the total number of bytes that can be read from the stream. + virtual size_t ReadableBytes() const = 0; + + // Returns a contiguous buffer to read (or an empty buffer, if there is no + // data to read). See `ProcessAllReadableRegions` below for an example of how + // to use this method while handling FIN correctly. + virtual PeekResult PeekNextReadableRegion() const = 0; + + // Equivalent to reading `bytes`, but does not perform any copying. `bytes` + // must be less than or equal to `ReadableBytes()`. The return value indicates + // if the FIN has been reached. `SkipBytes(0)` can be used to consume the FIN + // if it's the only thing remaining on the stream. + [[nodiscard]] virtual bool SkipBytes(size_t bytes) = 0; +}; + +// Calls `callback` for every contiguous chunk available inside the stream. +// Returns true if the FIN has been reached. +inline bool ProcessAllReadableRegions( + ReadStream& stream, UnretainedCallback<void(absl::string_view)> callback) { + for (;;) { + ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion(); + if (!peek_result.has_data()) { + return false; + } + callback(peek_result.peeked_data); + bool fin = stream.SkipBytes(peek_result.peeked_data.size()); + if (fin) { + return true; + } + } +} + // A general-purpose visitor API that gets notifications for WriteStream-related // events. class QUICHE_EXPORT WriteStreamVisitor { @@ -59,7 +149,7 @@ // to either accept all data written into it by returning absl::OkStatus, or ask // the caller to try again once via OnCanWrite() by returning // absl::UnavailableError. -class QUICHE_EXPORT WriteStream : public TerminableStream { +class QUICHE_EXPORT WriteStream { public: virtual ~WriteStream() {}
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.cc b/quiche/quic/core/http/web_transport_stream_adapter.cc index f484249..18928f3 100644 --- a/quiche/quic/core/http/web_transport_stream_adapter.cc +++ b/quiche/quic/core/http/web_transport_stream_adapter.cc
@@ -4,13 +4,26 @@ #include "quiche/quic/core/http/web_transport_stream_adapter.h" +#include <cstddef> +#include <string> +#include <vector> + #include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/string_view.h" +#include "absl/types/span.h" #include "quiche/quic/core/http/web_transport_http3.h" #include "quiche/quic/core/quic_error_codes.h" +#include "quiche/quic/core/quic_session.h" +#include "quiche/quic/core/quic_stream.h" +#include "quiche/quic/core/quic_stream_sequencer.h" #include "quiche/quic/core/quic_types.h" -#include "quiche/common/platform/api/quiche_mem_slice.h" -#include "quiche/common/quiche_buffer_allocator.h" +#include "quiche/quic/core/web_transport_interface.h" +#include "quiche/quic/platform/api/quic_bug_tracker.h" +#include "quiche/quic/platform/api/quic_flags.h" +#include "quiche/quic/platform/api/quic_logging.h" #include "quiche/common/quiche_mem_slice_storage.h" +#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/web_transport.h" namespace quic { @@ -120,6 +133,28 @@ return sequencer_->ReadableBytes(); } +quiche::ReadStream::PeekResult +WebTransportStreamAdapter::PeekNextReadableRegion() const { + iovec iov; + PeekResult result; + if (sequencer_->GetReadableRegion(&iov)) { + result.peeked_data = + absl::string_view(static_cast<const char*>(iov.iov_base), iov.iov_len); + } + result.fin_next = sequencer_->IsClosed(); + result.all_data_received = sequencer_->IsAllDataAvailable(); + return result; +} + +bool WebTransportStreamAdapter::SkipBytes(size_t bytes) { + sequencer_->MarkConsumed(bytes); + if (!fin_read_ && sequencer_->IsClosed()) { + fin_read_ = true; + stream_->OnFinRead(); + } + return sequencer_->IsClosed(); +} + void WebTransportStreamAdapter::OnDataAvailable() { if (visitor_ == nullptr) { return;
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.h b/quiche/quic/core/http/web_transport_stream_adapter.h index 24c250a..004a62f 100644 --- a/quiche/quic/core/http/web_transport_stream_adapter.h +++ b/quiche/quic/core/http/web_transport_stream_adapter.h
@@ -5,18 +5,30 @@ #ifndef QUICHE_QUIC_CORE_HTTP_WEB_TRANSPORT_STREAM_ADAPTER_H_ #define QUICHE_QUIC_CORE_HTTP_WEB_TRANSPORT_STREAM_ADAPTER_H_ +#include <cstddef> +#include <memory> +#include <string> +#include <utility> + +#include "absl/base/attributes.h" +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/types/span.h" +#include "quiche/quic/core/quic_error_codes.h" #include "quiche/quic/core/quic_session.h" #include "quiche/quic/core/quic_stream.h" #include "quiche/quic/core/quic_stream_sequencer.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/core/web_transport_interface.h" +#include "quiche/common/platform/api/quiche_export.h" +#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/web_transport.h" namespace quic { // Converts WebTransportStream API calls into QuicStream API calls. The users // of this class can either subclass it, or wrap around it. -class QUICHE_EXPORT WebTransportStreamAdapter : public WebTransportStream { +class QUICHE_EXPORT WebTransportStreamAdapter : public webtransport::Stream { public: WebTransportStreamAdapter(QuicSession* session, QuicStream* stream, QuicStreamSequencer* sequencer); @@ -29,6 +41,8 @@ bool CanWrite() const override; void AbruptlyTerminate(absl::Status error) override; size_t ReadableBytes() const override; + PeekResult PeekNextReadableRegion() const override; + bool SkipBytes(size_t bytes) override; void SetVisitor(std::unique_ptr<WebTransportStreamVisitor> visitor) override { visitor_ = std::move(visitor); }
diff --git a/quiche/quic/core/quic_generic_session_test.cc b/quiche/quic/core/quic_generic_session_test.cc index 16febf0..ce5d184 100644 --- a/quiche/quic/core/quic_generic_session_test.cc +++ b/quiche/quic/core/quic_generic_session_test.cc
@@ -33,6 +33,8 @@ #include "quiche/quic/test_tools/simulator/test_harness.h" #include "quiche/quic/test_tools/web_transport_test_tools.h" #include "quiche/quic/tools/web_transport_test_visitors.h" +#include "quiche/common/quiche_stream.h" +#include "quiche/common/test_tools/quiche_test_utils.h" #include "quiche/web_transport/web_transport.h" namespace quic::test { @@ -263,6 +265,76 @@ EXPECT_EQ(buffer, "Stream One"); } +TEST_F(QuicGenericSessionTest, EchoStreamsUsingPeekApi) { + CreateDefaultEndpoints(kEchoServer); + WireUpEndpoints(); + RunHandshake(); + + // Send two streams, a bidirectional and a unidirectional one, but only send + // FIN on the second one. + webtransport::Stream* stream1 = + client_->session()->OpenOutgoingBidirectionalStream(); + EXPECT_TRUE(stream1->Write("Stream One")); + webtransport::Stream* stream2 = + client_->session()->OpenOutgoingUnidirectionalStream(); + EXPECT_TRUE(stream2->Write("Stream Two")); + EXPECT_TRUE(stream2->SendFin()); + + // Wait until the unidirectional stream is received back. + bool stream_received_unidi = false; + EXPECT_CALL(*client_->visitor(), OnIncomingUnidirectionalStreamAvailable()) + .WillOnce(Assign(&stream_received_unidi, true)); + ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout( + [&]() { return stream_received_unidi; })); + + // Receive the unidirectional echo reply. + webtransport::Stream* reply = + client_->session()->AcceptIncomingUnidirectionalStream(); + ASSERT_TRUE(reply != nullptr); + std::string buffer; + quiche::ReadStream::PeekResult peek_result = reply->PeekNextReadableRegion(); + EXPECT_EQ(peek_result.peeked_data, "Stream Two"); + EXPECT_EQ(peek_result.fin_next, false); + EXPECT_EQ(peek_result.all_data_received, true); + bool fin_received = + quiche::ProcessAllReadableRegions(*reply, [&](absl::string_view chunk) { + buffer.append(chunk.data(), chunk.size()); + return true; + }); + EXPECT_TRUE(fin_received); + EXPECT_EQ(buffer, "Stream Two"); + + // Receive the bidirectional stream reply without a FIN. + ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout( + [&]() { return stream1->PeekNextReadableRegion().has_data(); })); + peek_result = stream1->PeekNextReadableRegion(); + EXPECT_EQ(peek_result.peeked_data, "Stream One"); + EXPECT_EQ(peek_result.fin_next, false); + EXPECT_EQ(peek_result.all_data_received, false); + fin_received = stream1->SkipBytes(strlen("Stream One")); + EXPECT_FALSE(fin_received); + peek_result = stream1->PeekNextReadableRegion(); + EXPECT_EQ(peek_result.peeked_data, ""); + EXPECT_EQ(peek_result.fin_next, false); + EXPECT_EQ(peek_result.all_data_received, false); + + // Send FIN on the first stream, and expect to receive it back. + EXPECT_TRUE(stream1->SendFin()); + ASSERT_TRUE(test_harness_.RunUntilWithDefaultTimeout( + [&]() { return stream1->PeekNextReadableRegion().all_data_received; })); + peek_result = stream1->PeekNextReadableRegion(); + EXPECT_EQ(peek_result.peeked_data, ""); + EXPECT_EQ(peek_result.fin_next, true); + EXPECT_EQ(peek_result.all_data_received, true); + + // Read FIN and expect the stream to get garbage collected. + webtransport::StreamId id = stream1->GetStreamId(); + EXPECT_TRUE(client_->session()->GetStreamById(id) != nullptr); + fin_received = stream1->SkipBytes(0); + EXPECT_TRUE(fin_received); + EXPECT_TRUE(client_->session()->GetStreamById(id) == nullptr); +} + TEST_F(QuicGenericSessionTest, EchoDatagram) { CreateDefaultEndpoints(kEchoServer); WireUpEndpoints();
diff --git a/quiche/web_transport/test_tools/mock_web_transport.h b/quiche/web_transport/test_tools/mock_web_transport.h index 8855c9c..9ebf239 100644 --- a/quiche/web_transport/test_tools/mock_web_transport.h +++ b/quiche/web_transport/test_tools/mock_web_transport.h
@@ -42,6 +42,8 @@ (absl::Span<const absl::string_view> data, const quiche::StreamWriteOptions& options), (override)); + MOCK_METHOD(PeekResult, PeekNextReadableRegion, (), (const, override)); + MOCK_METHOD(bool, SkipBytes, (size_t bytes), (override)); MOCK_METHOD(bool, CanWrite, (), (const, override)); MOCK_METHOD(void, AbruptlyTerminate, (absl::Status), (override)); MOCK_METHOD(size_t, ReadableBytes, (), (const, override));
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h index 741be73..be08f89 100644 --- a/quiche/web_transport/web_transport.h +++ b/quiche/web_transport/web_transport.h
@@ -93,13 +93,11 @@ // events related to a WebTransport stream. The visitor object is owned by the // stream itself, meaning that if the stream is ever fully closed, the visitor // will be garbage-collected. -class QUICHE_EXPORT StreamVisitor : public quiche::WriteStreamVisitor { +class QUICHE_EXPORT StreamVisitor : public quiche::ReadStreamVisitor, + public quiche::WriteStreamVisitor { public: virtual ~StreamVisitor() {} - // Called whenever the stream has readable data available. - virtual void OnCanRead() = 0; - // Called when RESET_STREAM is received for the stream. virtual void OnResetStreamReceived(StreamErrorCode error) = 0; // Called when STOP_SENDING is received for the stream. @@ -112,27 +110,12 @@ // A stream (either bidirectional or unidirectional) that is contained within a // WebTransport session. -class QUICHE_EXPORT Stream : public quiche::WriteStream { +class QUICHE_EXPORT Stream : public quiche::ReadStream, + public quiche::WriteStream, + public quiche::TerminableStream { public: - struct QUICHE_EXPORT ReadResult { - // Number of bytes actually read. - size_t bytes_read; - // Whether the FIN has been received; if true, no further data will arrive - // on the stream, and the stream object can be soon potentially garbage - // collected. - bool fin; - }; - virtual ~Stream() {} - // Reads at most |buffer.size()| bytes into |buffer|. - [[nodiscard]] virtual ReadResult Read(absl::Span<char> buffer) = 0; - // Reads all available data and appends it to the end of |output|. - [[nodiscard]] virtual ReadResult Read(std::string* output) = 0; - - // Indicates the number of bytes that can be read from the stream. - virtual size_t ReadableBytes() const = 0; - // An ID that is unique within the session. Those are not exposed to the user // via the web API, but can be used internally for bookkeeping and // diagnostics.