Merge quiche::ReadStream and quiche::WriteStream into webtransport::Stream. In practice, it is much easier to disable one side than to deal with complex inheritance hierarchy. PiperOrigin-RevId: 872115117
diff --git a/build/source_list.bzl b/build/source_list.bzl index c465704..4c3283f 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -59,7 +59,6 @@ "common/quiche_simple_arena.h", "common/quiche_socket_address.h", "common/quiche_status_utils.h", - "common/quiche_stream.h", "common/quiche_string_tuple.h", "common/quiche_text_utils.h", "common/quiche_weak_ptr.h", @@ -415,6 +414,7 @@ "quic/platform/api/quic_thread.h", "web_transport/complete_buffer_visitor.h", "web_transport/encapsulated/encapsulated_web_transport.h", + "web_transport/stream_helpers.h", "web_transport/web_transport.h", "web_transport/web_transport_headers.h", "web_transport/web_transport_priority_scheduler.h", @@ -794,7 +794,6 @@ "common/platform/api/quiche_test.h", "common/platform/api/quiche_test_loopback.h", "common/platform/api/quiche_test_output.h", - "common/test_tools/mock_streams.h", "common/test_tools/quiche_test_utils.h", "http2/adapter/mock_http2_visitor.h", "http2/adapter/recording_http2_visitor.h", @@ -1161,7 +1160,6 @@ "common/simple_buffer_allocator_test.cc", "common/structured_headers_generated_test.cc", "common/structured_headers_test.cc", - "common/test_tools/mock_streams_test.cc", "common/test_tools/quiche_test_utils_test.cc", "common/vectorized_io_utils_test.cc", "common/wire_serialization_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni index 55fd86b..f4b0dd1 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -59,7 +59,6 @@ "src/quiche/common/quiche_simple_arena.h", "src/quiche/common/quiche_socket_address.h", "src/quiche/common/quiche_status_utils.h", - "src/quiche/common/quiche_stream.h", "src/quiche/common/quiche_string_tuple.h", "src/quiche/common/quiche_text_utils.h", "src/quiche/common/quiche_weak_ptr.h", @@ -415,6 +414,7 @@ "src/quiche/quic/platform/api/quic_thread.h", "src/quiche/web_transport/complete_buffer_visitor.h", "src/quiche/web_transport/encapsulated/encapsulated_web_transport.h", + "src/quiche/web_transport/stream_helpers.h", "src/quiche/web_transport/web_transport.h", "src/quiche/web_transport/web_transport_headers.h", "src/quiche/web_transport/web_transport_priority_scheduler.h", @@ -794,7 +794,6 @@ "src/quiche/common/platform/api/quiche_test.h", "src/quiche/common/platform/api/quiche_test_loopback.h", "src/quiche/common/platform/api/quiche_test_output.h", - "src/quiche/common/test_tools/mock_streams.h", "src/quiche/common/test_tools/quiche_test_utils.h", "src/quiche/http2/adapter/mock_http2_visitor.h", "src/quiche/http2/adapter/recording_http2_visitor.h", @@ -1162,7 +1161,6 @@ "src/quiche/common/simple_buffer_allocator_test.cc", "src/quiche/common/structured_headers_generated_test.cc", "src/quiche/common/structured_headers_test.cc", - "src/quiche/common/test_tools/mock_streams_test.cc", "src/quiche/common/test_tools/quiche_test_utils_test.cc", "src/quiche/common/vectorized_io_utils_test.cc", "src/quiche/common/wire_serialization_test.cc",
diff --git a/build/source_list.json b/build/source_list.json index cd25bcd..66a8052 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -58,7 +58,6 @@ "quiche/common/quiche_simple_arena.h", "quiche/common/quiche_socket_address.h", "quiche/common/quiche_status_utils.h", - "quiche/common/quiche_stream.h", "quiche/common/quiche_string_tuple.h", "quiche/common/quiche_text_utils.h", "quiche/common/quiche_weak_ptr.h", @@ -414,6 +413,7 @@ "quiche/quic/platform/api/quic_thread.h", "quiche/web_transport/complete_buffer_visitor.h", "quiche/web_transport/encapsulated/encapsulated_web_transport.h", + "quiche/web_transport/stream_helpers.h", "quiche/web_transport/web_transport.h", "quiche/web_transport/web_transport_headers.h", "quiche/web_transport/web_transport_priority_scheduler.h" @@ -793,7 +793,6 @@ "quiche/common/platform/api/quiche_test.h", "quiche/common/platform/api/quiche_test_loopback.h", "quiche/common/platform/api/quiche_test_output.h", - "quiche/common/test_tools/mock_streams.h", "quiche/common/test_tools/quiche_test_utils.h", "quiche/http2/adapter/mock_http2_visitor.h", "quiche/http2/adapter/recording_http2_visitor.h", @@ -1161,7 +1160,6 @@ "quiche/common/simple_buffer_allocator_test.cc", "quiche/common/structured_headers_generated_test.cc", "quiche/common/structured_headers_test.cc", - "quiche/common/test_tools/mock_streams_test.cc", "quiche/common/test_tools/quiche_test_utils_test.cc", "quiche/common/vectorized_io_utils_test.cc", "quiche/common/wire_serialization_test.cc",
diff --git a/quiche/common/quiche_stream.h b/quiche/common/quiche_stream.h deleted file mode 100644 index c8718bd..0000000 --- a/quiche/common/quiche_stream.h +++ /dev/null
@@ -1,214 +0,0 @@ -// Copyright 2023 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -// General-purpose abstractions for read/write streams. - -#ifndef QUICHE_COMMON_QUICHE_STREAM_H_ -#define QUICHE_COMMON_QUICHE_STREAM_H_ - -#include <cstddef> -#include <string> - -#include "absl/status/status.h" -#include "absl/strings/string_view.h" -#include "absl/types/span.h" -#include "quiche/common/platform/api/quiche_export.h" -#include "quiche/common/quiche_callbacks.h" -#include "quiche/common/quiche_mem_slice.h" - -namespace quiche { - -// A shared base class for read and write stream to support abrupt termination. -class QUICHE_EXPORT TerminableStream { - public: - virtual ~TerminableStream() = default; - - // Abruptly terminate the stream due to an error. If `error` is not OK, it may - // carry the error information that could be potentially communicated to the - // peer in case the stream is remote. If the stream is a duplex stream, both - // ends of the stream are terminated. - virtual void AbruptlyTerminate(absl::Status error) = 0; -}; - -// A general-purpose visitor API that gets notifications for ReadStream-related -// events. -class QUICHE_EXPORT ReadStreamVisitor { - public: - virtual ~ReadStreamVisitor() = default; - - // Called whenever the stream has new data available to read. Unless otherwise - // specified, QUICHE stream reads are level-triggered, which means that the - // callback will be called repeatedly as long as there is still data in the - // buffer. - virtual void OnCanRead() = 0; -}; - -// General purpose abstraction for a stream of data that can be read from the -// network. The class is designed around the idea that a network stream stores -// all of the received data in a sequence of contiguous buffers. Because of -// that, there are two ways to read from a stream: -// - Read() will copy data into a user-provided buffer, reassembling it if it -// is split across multiple buffers internally. -// - PeekNextReadableRegion()/SkipBytes() let the caller access the underlying -// buffers directly, potentially avoiding the copying at the cost of the -// caller having to deal with discontinuities. -class QUICHE_EXPORT ReadStream { - public: - struct QUICHE_EXPORT ReadResult { - // Number of bytes actually read. - size_t bytes_read = 0; - // Whether the FIN has been received; if true, no further data will arrive - // on the stream, and the stream object can be soon potentially garbage - // collected. - bool fin = false; - }; - - struct PeekResult { - // The next available chunk in the sequencer buffer. - absl::string_view peeked_data; - // True if all of the data up to the FIN has been read. - bool fin_next = false; - // True if all of the data up to the FIN has been received (but not - // necessarily read). - bool all_data_received = false; - - // Indicates that `SkipBytes()` will make progress if called. - bool has_data() const { return !peeked_data.empty() || fin_next; } - }; - - virtual ~ReadStream() = default; - - // Reads at most `buffer.size()` bytes into `buffer`. - [[nodiscard]] virtual ReadResult Read(absl::Span<char> buffer) = 0; - - // Reads all available data and appends it to the end of `output`. - [[nodiscard]] virtual ReadResult Read(std::string* output) = 0; - - // Indicates the total number of bytes that can be read from the stream. - virtual size_t ReadableBytes() const = 0; - - // Returns a contiguous buffer to read (or an empty buffer, if there is no - // data to read). See `ProcessAllReadableRegions` below for an example of how - // to use this method while handling FIN correctly. - virtual PeekResult PeekNextReadableRegion() const = 0; - - // Equivalent to reading `bytes`, but does not perform any copying. `bytes` - // must be less than or equal to `ReadableBytes()`. The return value indicates - // if the FIN has been reached. `SkipBytes(0)` can be used to consume the FIN - // if it's the only thing remaining on the stream. - [[nodiscard]] virtual bool SkipBytes(size_t bytes) = 0; -}; - -// Calls `callback` for every contiguous chunk available inside the stream. -// Returns true if the FIN has been reached. -inline bool ProcessAllReadableRegions( - ReadStream& stream, UnretainedCallback<void(absl::string_view)> callback) { - for (;;) { - ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion(); - if (!peek_result.has_data()) { - return false; - } - callback(peek_result.peeked_data); - bool fin = stream.SkipBytes(peek_result.peeked_data.size()); - if (fin) { - return true; - } - } -} - -// A general-purpose visitor API that gets notifications for WriteStream-related -// events. -class QUICHE_EXPORT WriteStreamVisitor { - public: - virtual ~WriteStreamVisitor() {} - - // Called whenever the stream is not write-blocked and can accept new data. - virtual void OnCanWrite() = 0; -}; - -// Options for writing data into a WriteStream. -class QUICHE_EXPORT StreamWriteOptions { - public: - StreamWriteOptions() = default; - - // If send_fin() is set to true, the write operation also sends a FIN on the - // stream. - bool send_fin() const { return send_fin_; } - void set_send_fin(bool send_fin) { send_fin_ = send_fin; } - - // If buffer_unconditionally() is set to true, the write operation will buffer - // data even if the internal buffer limit is exceeded. - bool buffer_unconditionally() const { return buffer_unconditionally_; } - void set_buffer_unconditionally(bool value) { - buffer_unconditionally_ = value; - } - - private: - bool send_fin_ = false; - bool buffer_unconditionally_ = false; -}; - -inline constexpr StreamWriteOptions kDefaultStreamWriteOptions = - StreamWriteOptions(); - -// WriteStream is an object that can accept a stream of bytes. -// -// The writes into a WriteStream are all-or-nothing. A WriteStream object has -// to either accept all data written into it by returning absl::OkStatus, or ask -// the caller to try again once via OnCanWrite() by returning -// absl::UnavailableError. -class QUICHE_EXPORT WriteStream { - public: - virtual ~WriteStream() {} - - // Writes `data` into the stream. If the write succeeds, the ownership is - // transferred to the stream; if it does not, the behavior is undefined -- the - // users of this API should check `CanWrite()` before calling `Writev()`. - virtual absl::Status Writev(absl::Span<QuicheMemSlice> data, - const StreamWriteOptions& options) = 0; - - // Indicates whether it is possible to write into stream right now. - virtual bool CanWrite() const = 0; - - // Legacy convenience method for writing a single string_view. New users - // should use quiche::SendFinOnStream instead, since this method does not - // return useful failure information. - [[nodiscard]] bool SendFin() { - StreamWriteOptions options; - options.set_send_fin(true); - return Writev(absl::Span<QuicheMemSlice>(), options).ok(); - } - - // Legacy convenience method for writing a single string_view. New users - // should use quiche::WriteIntoStream instead, since this method does not - // return useful failure information. - [[nodiscard]] bool Write(absl::string_view data) { - QuicheMemSlice slice = QuicheMemSlice::Copy(data); - return Writev(absl::MakeSpan(&slice, 1), kDefaultStreamWriteOptions).ok(); - } -}; - -// Convenience methods to write a single chunk of data into the stream. -inline absl::Status WriteIntoStream( - WriteStream& stream, QuicheMemSlice slice, - const StreamWriteOptions& options = kDefaultStreamWriteOptions) { - return stream.Writev(absl::MakeSpan(&slice, 1), options); -} -inline absl::Status WriteIntoStream( - WriteStream& stream, absl::string_view data, - const StreamWriteOptions& options = kDefaultStreamWriteOptions) { - QuicheMemSlice slice = QuicheMemSlice::Copy(data); - return stream.Writev(absl::MakeSpan(&slice, 1), options); -} - -// Convenience methods to send a FIN on the stream. -inline absl::Status SendFinOnStream(WriteStream& stream) { - StreamWriteOptions options; - options.set_send_fin(true); - return stream.Writev(absl::Span<QuicheMemSlice>(), options); -} - -} // namespace quiche - -#endif // QUICHE_COMMON_QUICHE_STREAM_H_
diff --git a/quiche/common/test_tools/mock_streams.h b/quiche/common/test_tools/mock_streams.h deleted file mode 100644 index 7290490..0000000 --- a/quiche/common/test_tools/mock_streams.h +++ /dev/null
@@ -1,102 +0,0 @@ -// Copyright (c) 2023 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_ -#define QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_ - -#include <algorithm> -#include <cstddef> -#include <string> -#include <utility> - -#include "absl/status/status.h" -#include "absl/strings/string_view.h" -#include "absl/types/span.h" -#include "quiche/common/platform/api/quiche_test.h" -#include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" - -namespace quiche::test { - -// Mockable stream that stores all of the data into an std::string by default. -class MockWriteStream : public quiche::WriteStream { - public: - MockWriteStream() { - ON_CALL(*this, CanWrite()).WillByDefault(testing::Return(true)); - ON_CALL(*this, Writev(testing::_, testing::_)) - .WillByDefault([&](absl::Span<quiche::QuicheMemSlice> data, - const StreamWriteOptions& options) { - return AppendToData(data, options); - }); - } - - MOCK_METHOD(absl::Status, Writev, - (absl::Span<quiche::QuicheMemSlice> data, - const StreamWriteOptions& options), - (override)); - MOCK_METHOD(bool, CanWrite, (), (const, override)); - - absl::Status AppendToData(absl::Span<quiche::QuicheMemSlice> data, - const StreamWriteOptions& options) { - for (const quiche::QuicheMemSlice& fragment : data) { - data_.append(fragment.data(), fragment.length()); - } - ProcessOptions(options); - return absl::OkStatus(); - } - void ProcessOptions(const StreamWriteOptions& options) { - fin_written_ |= options.send_fin(); - } - - std::string& data() { return data_; } - bool fin_written() { return fin_written_; } - - private: - std::string data_; - bool fin_written_ = false; -}; - -// Reads stream data from an std::string buffer. -class ReadStreamFromString : public ReadStream { - public: - explicit ReadStreamFromString(std::string* data) : data_(data) {} - - ReadResult Read(absl::Span<char> buffer) override { - size_t data_to_copy = std::min(buffer.size(), data_->size()); - std::copy(data_->begin(), data_->begin() + data_to_copy, buffer.begin()); - *data_ = data_->substr(data_to_copy); - return ReadResult{data_to_copy, data_->empty() && fin_}; - } - ReadResult Read(std::string* output) override { - size_t bytes = data_->size(); - output->append(std::move(*data_)); - data_->clear(); - return ReadResult{bytes, fin_}; - } - - size_t ReadableBytes() const override { return data_->size(); } - - virtual PeekResult PeekNextReadableRegion() const override { - PeekResult result; - result.peeked_data = *data_; - result.fin_next = data_->empty() && fin_; - result.all_data_received = fin_; - return result; - } - - bool SkipBytes(size_t bytes) override { - *data_ = data_->substr(bytes); - return data_->empty() && fin_; - } - - void set_fin() { fin_ = true; } - - private: - std::string* data_; - bool fin_ = false; -}; - -} // namespace quiche::test - -#endif // QUICHE_COMMON_TEST_TOOLS_MOCK_STREAMS_H_
diff --git a/quiche/common/test_tools/mock_streams_test.cc b/quiche/common/test_tools/mock_streams_test.cc deleted file mode 100644 index 2e8a2e4..0000000 --- a/quiche/common/test_tools/mock_streams_test.cc +++ /dev/null
@@ -1,63 +0,0 @@ -// Copyright (c) 2023 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "quiche/common/test_tools/mock_streams.h" - -#include <array> -#include <string> - -#include "absl/types/span.h" -#include "quiche/common/platform/api/quiche_test.h" -#include "quiche/common/quiche_stream.h" -#include "quiche/common/test_tools/quiche_test_utils.h" - -namespace quiche::test { -namespace { - -using ::testing::ElementsAre; -using ::testing::IsEmpty; - -TEST(MockWriteStreamTest, DefaultWrite) { - MockWriteStream stream; - QUICHE_EXPECT_OK(quiche::WriteIntoStream(stream, "test")); - EXPECT_EQ(stream.data(), "test"); - EXPECT_FALSE(stream.fin_written()); -} - -TEST(ReadStreamFromStringTest, ReadIntoSpan) { - std::string source = "abcdef"; - std::array<char, 3> buffer; - ReadStreamFromString stream(&source); - EXPECT_EQ(stream.ReadableBytes(), 6); - - stream.Read(absl::MakeSpan(buffer)); - EXPECT_THAT(buffer, ElementsAre('a', 'b', 'c')); - EXPECT_EQ(stream.ReadableBytes(), 3); - - stream.Read(absl::MakeSpan(buffer)); - EXPECT_THAT(buffer, ElementsAre('d', 'e', 'f')); - EXPECT_EQ(stream.ReadableBytes(), 0); - EXPECT_THAT(source, IsEmpty()); -} - -TEST(ReadStreamFromStringTest, ReadIntoString) { - std::string source = "abcdef"; - std::string destination; - ReadStreamFromString stream(&source); - stream.Read(&destination); - EXPECT_EQ(destination, "abcdef"); - EXPECT_THAT(source, IsEmpty()); -} - -TEST(ReadStreamFromStringTest, PeekAndSkip) { - std::string source = "abcdef"; - ReadStreamFromString stream(&source); - EXPECT_EQ(stream.PeekNextReadableRegion().peeked_data, "abcdef"); - stream.SkipBytes(2); - EXPECT_EQ(stream.PeekNextReadableRegion().peeked_data, "cdef"); - EXPECT_EQ(source, "cdef"); -} - -} // namespace -} // namespace quiche::test
diff --git a/quiche/quic/core/http/end_to_end_test.cc b/quiche/quic/core/http/end_to_end_test.cc index 9e30120..9f49142 100644 --- a/quiche/quic/core/http/end_to_end_test.cc +++ b/quiche/quic/core/http/end_to_end_test.cc
@@ -124,9 +124,10 @@ #include "quiche/common/platform/api/quiche_reference_counted.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/simple_buffer_allocator.h" #include "quiche/common/test_tools/quiche_test_utils.h" +#include "quiche/web_transport/stream_helpers.h" +#include "quiche/web_transport/web_transport.h" #include "quiche/web_transport/web_transport_headers.h" using quiche::HttpHeaderBlock; @@ -7653,7 +7654,7 @@ .WillOnce(Assign(&data_acknowledged, true)); outgoing_stream->SetVisitor(std::move(stream_visitor)); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*outgoing_stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*outgoing_stream, "test")); EXPECT_TRUE(outgoing_stream->SendFin()); bool stream_received = false; @@ -7694,7 +7695,7 @@ WebTransportStream* outgoing_stream = session->OpenOutgoingUnidirectionalStream(); ASSERT_TRUE(outgoing_stream != nullptr); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*outgoing_stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*outgoing_stream, "test")); EXPECT_TRUE(outgoing_stream->SendFin()); bool stream_received = false; @@ -7735,7 +7736,7 @@ .WillOnce(Assign(&data_acknowledged, true)); stream->SetVisitor(std::move(stream_visitor_owned)); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test")); EXPECT_TRUE(stream->SendFin()); std::string received_data = @@ -7762,7 +7763,7 @@ WebTransportStream* stream = session->OpenOutgoingBidirectionalStream(); ASSERT_TRUE(stream != nullptr); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test")); EXPECT_TRUE(stream->SendFin()); std::string received_data = ReadDataFromWebTransportStreamUntilFin(stream); @@ -7797,7 +7798,7 @@ quiche::QuicheMemSlice::Copy("bar"), quiche::QuicheMemSlice::Copy("test"), quiche::QuicheMemSlice::Copy(kLongString)}; - quiche::StreamWriteOptions options; + webtransport::StreamWriteOptions options; options.set_send_fin(true); QUICHE_EXPECT_OK(stream->Writev(absl::MakeSpan(write_vector), options)); @@ -7849,7 +7850,7 @@ WebTransportStream* stream = session->OpenOutgoingBidirectionalStream(); ASSERT_TRUE(stream != nullptr); QuicStreamId stream_id = stream->GetStreamId(); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test")); // Keep stream open. bool close_received = false; @@ -7881,7 +7882,7 @@ WebTransportStream* stream = session->OpenOutgoingBidirectionalStream(); ASSERT_TRUE(stream != nullptr); QuicStreamId stream_id = stream->GetStreamId(); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test")); // Keep stream open. bool close_received = false; @@ -7913,7 +7914,7 @@ WebTransportStream* stream = session->OpenOutgoingUnidirectionalStream(); ASSERT_TRUE(stream != nullptr); QuicStreamId stream_id = stream->GetStreamId(); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "42 test error")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "42 test error")); EXPECT_TRUE(stream->SendFin()); // Have some other streams open pending, to ensure they are closed properly. @@ -7945,7 +7946,7 @@ WebTransportStream* stream = session->OpenOutgoingUnidirectionalStream(); ASSERT_TRUE(stream != nullptr); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "DRAIN")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "DRAIN")); EXPECT_TRUE(stream->SendFin()); bool drain_received = false; @@ -7976,7 +7977,7 @@ WebTransportStream* stream = session->OpenOutgoingBidirectionalStream(); QuicStreamId id1 = stream->GetStreamId(); ASSERT_TRUE(stream != nullptr); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test")); stream->ResetWithUserCode(42); // This read fails if the stream is closed in both directions, since that @@ -7987,7 +7988,7 @@ stream = session->OpenOutgoingBidirectionalStream(); QuicStreamId id2 = stream->GetStreamId(); ASSERT_TRUE(stream != nullptr); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test")); stream->SendStopSending(100024); std::array<std::string, 2> expected_log = { @@ -8070,7 +8071,7 @@ WebTransportStream* stream = session->OpenOutgoingBidirectionalStream(); ASSERT_TRUE(stream != nullptr); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test")); EXPECT_TRUE(stream->SendFin()); EXPECT_TRUE(client_->WaitUntil(-1, [this, connect_stream_id]() { @@ -8106,7 +8107,7 @@ WebTransportStream* outgoing_stream = session->OpenOutgoingUnidirectionalStream(); ASSERT_TRUE(outgoing_stream != nullptr); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*outgoing_stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*outgoing_stream, "test")); EXPECT_TRUE(outgoing_stream->SendFin()); EXPECT_CALL(visitor, OnIncomingUnidirectionalStreamAvailable()) @@ -8131,7 +8132,7 @@ MockWebTransportStreamVisitor* stream_visitor = stream_visitor_owned.get(); stream->SetVisitor(std::move(stream_visitor_owned)); - QUICHE_EXPECT_OK(quiche::WriteIntoStream(*stream, "test")); + QUICHE_EXPECT_OK(webtransport::WriteIntoStream(*stream, "test")); EXPECT_TRUE(stream->SendFin()); std::string received_data =
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.cc b/quiche/quic/core/http/web_transport_stream_adapter.cc index 47357c6..5546930 100644 --- a/quiche/quic/core/http/web_transport_stream_adapter.cc +++ b/quiche/quic/core/http/web_transport_stream_adapter.cc
@@ -25,7 +25,6 @@ #include "quiche/quic/platform/api/quic_logging.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/web_transport.h" namespace quic { @@ -66,7 +65,7 @@ absl::Status WebTransportStreamAdapter::Writev( absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { if (data.empty() && !options.send_fin()) { return absl::InvalidArgumentError( "Writev() called without any data or a FIN"); @@ -118,21 +117,12 @@ return CheckBeforeStreamWrite().ok(); } -void WebTransportStreamAdapter::AbruptlyTerminate(absl::Status error) { - QUIC_DLOG(WARNING) << (session_->perspective() == Perspective::IS_CLIENT - ? "Client: " - : "Server: ") - << "Abruptly terminating stream " << stream_->id() - << " due to the following error: " << error; - ResetDueToInternalError(); -} - size_t WebTransportStreamAdapter::ReadableBytes() const { QUICHE_DCHECK(sequencer_->level_triggered()); return sequencer_->ReadableBytes(); } -quiche::ReadStream::PeekResult +webtransport::Stream::PeekResult WebTransportStreamAdapter::PeekNextReadableRegion() const { iovec iov; PeekResult result;
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.h b/quiche/quic/core/http/web_transport_stream_adapter.h index 7107dd9..91192bf 100644 --- a/quiche/quic/core/http/web_transport_stream_adapter.h +++ b/quiche/quic/core/http/web_transport_stream_adapter.h
@@ -24,7 +24,6 @@ #include "quiche/quic/core/web_transport_interface.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/web_transport.h" namespace quic { @@ -41,9 +40,8 @@ ABSL_MUST_USE_RESULT ReadResult Read(absl::Span<char> output) override; ABSL_MUST_USE_RESULT ReadResult Read(std::string* output) override; absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) override; + const webtransport::StreamWriteOptions& options) override; bool CanWrite() const override; - void AbruptlyTerminate(absl::Status error) override; size_t ReadableBytes() const override; PeekResult PeekNextReadableRegion() const override; bool SkipBytes(size_t bytes) override;
diff --git a/quiche/quic/core/quic_generic_session_test.cc b/quiche/quic/core/quic_generic_session_test.cc index 45dfef5..ce6a6b2 100644 --- a/quiche/quic/core/quic_generic_session_test.cc +++ b/quiche/quic/core/quic_generic_session_test.cc
@@ -43,8 +43,8 @@ #include "quiche/quic/tools/web_transport_test_visitors.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/test_tools/quiche_test_utils.h" +#include "quiche/web_transport/stream_helpers.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" #include "quiche/web_transport/web_transport.h" @@ -55,7 +55,7 @@ constexpr char kZeroes[8192] = {0}; -absl::Status SendZeroes(quiche::WriteStream& stream, int num_of_zeroes, +absl::Status SendZeroes(webtransport::Stream& stream, int num_of_zeroes, bool fin = false) { std::vector<quiche::QuicheMemSlice> slices; slices.reserve(num_of_zeroes / sizeof(kZeroes) + 1); @@ -66,7 +66,7 @@ quiche::QuicheMemSlice(kZeroes, chunk_size, +[](absl::string_view) {})); remaining -= chunk_size; } - quiche::StreamWriteOptions options; + webtransport::StreamWriteOptions options; options.set_send_fin(fin); return stream.Writev(absl::MakeSpan(slices), options); } @@ -330,12 +330,13 @@ client_->session()->AcceptIncomingUnidirectionalStream(); ASSERT_TRUE(reply != nullptr); std::string buffer; - quiche::ReadStream::PeekResult peek_result = reply->PeekNextReadableRegion(); + webtransport::Stream::PeekResult peek_result = + reply->PeekNextReadableRegion(); EXPECT_EQ(peek_result.peeked_data, "Stream Two"); EXPECT_EQ(peek_result.fin_next, false); EXPECT_EQ(peek_result.all_data_received, true); - bool fin_received = - quiche::ProcessAllReadableRegions(*reply, [&](absl::string_view chunk) { + bool fin_received = webtransport::ProcessAllReadableRegions( + *reply, [&](absl::string_view chunk) { buffer.append(chunk.data(), chunk.size()); return true; }); @@ -516,17 +517,17 @@ ASSERT_TRUE(stream != nullptr); ASSERT_TRUE(stream->CanWrite()); - absl::Status status = quiche::WriteIntoStream(*stream, buffer); + absl::Status status = webtransport::WriteIntoStream(*stream, buffer); QUICHE_EXPECT_OK(status); EXPECT_FALSE(stream->CanWrite()); - status = quiche::WriteIntoStream(*stream, buffer); + status = webtransport::WriteIntoStream(*stream, buffer); EXPECT_THAT(status, StatusIs(absl::StatusCode::kUnavailable)); - quiche::StreamWriteOptions options; + webtransport::StreamWriteOptions options; options.set_buffer_unconditionally(true); options.set_send_fin(true); - status = quiche::WriteIntoStream(*stream, buffer, options); + status = webtransport::WriteIntoStream(*stream, buffer, options); QUICHE_EXPECT_OK(status); EXPECT_FALSE(stream->CanWrite()); @@ -534,7 +535,7 @@ for (;;) { test_harness_.RunUntilWithDefaultTimeout( [&] { return stream->PeekNextReadableRegion().has_data(); }); - quiche::ReadStream::PeekResult result = stream->PeekNextReadableRegion(); + webtransport::Stream::PeekResult result = stream->PeekNextReadableRegion(); total_received += result.peeked_data.size(); bool fin_consumed = stream->SkipBytes(result.peeked_data.size()); if (fin_consumed) {
diff --git a/quiche/quic/moqt/moqt_bidi_stream.h b/quiche/quic/moqt/moqt_bidi_stream.h index d0ded95..98971f8 100644 --- a/quiche/quic/moqt/moqt_bidi_stream.h +++ b/quiche/quic/moqt/moqt_bidi_stream.h
@@ -27,7 +27,7 @@ #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_callbacks.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" +#include "quiche/web_transport/stream_helpers.h" #include "quiche/web_transport/web_transport.h" namespace moqt { @@ -214,7 +214,7 @@ void Fin() { fin_queued_ = true; if (pending_messages_.empty()) { - if (stream_ != nullptr && !SendFinOnStream(*stream_).ok()) { + if (stream_ != nullptr && !webtransport::SendFinOnStream(*stream_).ok()) { std::move(session_error_callback_)(MoqtError::kInternalError, "Failed to send FIN"); } @@ -260,7 +260,7 @@ private: void SendMessage(quiche::QuicheBuffer message, bool fin) { - quiche::StreamWriteOptions options; + webtransport::StreamWriteOptions options; options.set_send_fin(fin); // TODO: while we buffer unconditionally, we should still at some point tear // down the connection if we've buffered too many control messages;
diff --git a/quiche/quic/moqt/moqt_bidi_stream_test.cc b/quiche/quic/moqt/moqt_bidi_stream_test.cc index cf0cb88..dedd3c8 100644 --- a/quiche/quic/moqt/moqt_bidi_stream_test.cc +++ b/quiche/quic/moqt/moqt_bidi_stream_test.cc
@@ -16,8 +16,8 @@ #include "quiche/quic/moqt/test_tools/moqt_framer_utils.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" +#include "quiche/web_transport/web_transport.h" using ::testing::_; using ::testing::Return; @@ -220,7 +220,7 @@ EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _)) .WillOnce([](absl::Span<quiche::QuicheMemSlice>, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { EXPECT_TRUE(options.send_fin()); return absl::OkStatus(); }); @@ -232,7 +232,7 @@ stream_->set_stream(&mock_stream_); EXPECT_CALL(mock_stream_, Writev) .WillOnce([](absl::Span<quiche::QuicheMemSlice>, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { EXPECT_TRUE(options.send_fin()); return absl::OkStatus(); });
diff --git a/quiche/quic/moqt/moqt_namespace_stream.cc b/quiche/quic/moqt/moqt_namespace_stream.cc index 3ac34a3..f43967b 100644 --- a/quiche/quic/moqt/moqt_namespace_stream.cc +++ b/quiche/quic/moqt/moqt_namespace_stream.cc
@@ -21,7 +21,6 @@ #include "quiche/quic/moqt/moqt_session_callbacks.h" #include "quiche/quic/moqt/session_namespace_tree.h" #include "quiche/common/platform/api/quiche_logging.h" -#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/web_transport.h" namespace moqt {
diff --git a/quiche/quic/moqt/moqt_namespace_stream_test.cc b/quiche/quic/moqt/moqt_namespace_stream_test.cc index e538c27..9b4b2be 100644 --- a/quiche/quic/moqt/moqt_namespace_stream_test.cc +++ b/quiche/quic/moqt/moqt_namespace_stream_test.cc
@@ -25,8 +25,8 @@ #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" +#include "quiche/web_transport/web_transport.h" namespace moqt::test { namespace { @@ -324,7 +324,7 @@ .WillOnce([](TrackNamespace& ns, TransactionType& type) { return kEof; }); EXPECT_CALL(mock_stream_, Writev) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> slices, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { EXPECT_EQ(slices.size(), 1); EXPECT_EQ(slices[0].data()[0], static_cast<uint8_t>(MoqtMessageType::kNamespaceDone)); @@ -332,7 +332,7 @@ return absl::OkStatus(); }) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> slices, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { EXPECT_EQ(slices.size(), 0); EXPECT_TRUE(options.send_fin()); return absl::OkStatus();
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 3e5a446..e1c663b 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -33,7 +33,7 @@ #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_data_reader.h" -#include "quiche/common/quiche_stream.h" +#include "quiche/web_transport/web_transport.h" namespace moqt { @@ -48,11 +48,12 @@ // |fin_read| is set to true if there is a FIN anywhere before the end of the // varint. -std::optional<uint64_t> ReadVarInt62FromStream(quiche::ReadStream& stream, +std::optional<uint64_t> ReadVarInt62FromStream(webtransport::Stream& stream, bool& fin_read) { fin_read = false; - quiche::ReadStream::PeekResult peek_result = stream.PeekNextReadableRegion(); + webtransport::Stream::PeekResult peek_result = + stream.PeekNextReadableRegion(); if (peek_result.peeked_data.empty()) { if (peek_result.fin_next) { fin_read = stream.SkipBytes(0); @@ -73,7 +74,7 @@ char buffer[8]; absl::Span<char> bytes_to_read = absl::MakeSpan(buffer).subspan(0, varint_size); - quiche::ReadStream::ReadResult read_result = stream.Read(bytes_to_read); + webtransport::Stream::ReadResult read_result = stream.Read(bytes_to_read); QUICHE_DCHECK_EQ(read_result.bytes_read, varint_size); fin_read = read_result.fin; @@ -459,7 +460,7 @@ return; } std::array<char, 2> size_bytes; - quiche::ReadStream::ReadResult result = + webtransport::Stream::ReadResult result = stream_.Read(absl::MakeSpan(size_bytes)); if (result.bytes_read != 2) { ParseError(MoqtError::kInternalError, @@ -489,7 +490,7 @@ return; } absl::FixedArray<char> message(*message_size_); - quiche::ReadStream::ReadResult result = + webtransport::Stream::ReadResult result = stream_.Read(absl::MakeSpan(message)); if (result.bytes_read != *message_size_) { ParseError("Stream returned incorrect ReadableBytes"); @@ -1205,7 +1206,7 @@ std::optional<uint8_t> MoqtDataParser::ReadUint8NoFin() { char buffer[1]; - quiche::ReadStream::ReadResult read_result = + webtransport::Stream::ReadResult read_result = stream_.Read(absl::MakeSpan(buffer)); if (read_result.bytes_read == 0) { return std::nullopt; @@ -1421,7 +1422,7 @@ case kExtensionBody: case kData: { while (payload_length_remaining_ > 0) { - quiche::ReadStream::PeekResult peek_result = + webtransport::Stream::PeekResult peek_result = stream_.PeekNextReadableRegion(); if (!peek_result.has_data()) { return;
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index 79c10a1..d0585ea 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -22,7 +22,7 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/quiche_callbacks.h" -#include "quiche/common/quiche_stream.h" +#include "quiche/web_transport/web_transport.h" namespace moqt { @@ -89,7 +89,7 @@ class QUICHE_EXPORT MoqtMessageTypeParser { public: - MoqtMessageTypeParser(quiche::ReadStream* stream) : stream_(*stream) {} + MoqtMessageTypeParser(webtransport::Stream* stream) : stream_(*stream) {} ~MoqtMessageTypeParser() = default; // Returns false if there was a FIN. @@ -97,13 +97,13 @@ std::optional<uint64_t> message_type() const { return message_type_; } private: - quiche::ReadStream& stream_; + webtransport::Stream& stream_; std::optional<uint64_t> message_type_; }; class QUICHE_EXPORT MoqtControlParser { public: - MoqtControlParser(bool uses_web_transport, quiche::ReadStream* stream, + MoqtControlParser(bool uses_web_transport, webtransport::Stream* stream, MoqtControlParserVisitor& visitor) : visitor_(visitor), stream_(*stream), @@ -177,7 +177,7 @@ MessageParameters& out); MoqtControlParserVisitor& visitor_; - quiche::ReadStream& stream_; + webtransport::Stream& stream_; bool uses_web_transport_; bool no_more_data_ = false; // Fatal error or fin. No more parsing. bool parsing_error_ = false; @@ -206,7 +206,7 @@ // `stream` must outlive the parser. The parser does not configure itself as // a listener for the read events of the stream; it is responsibility of the // caller to do so via one of the read methods below. - explicit MoqtDataParser(quiche::ReadStream* stream, + explicit MoqtDataParser(webtransport::Stream* stream, MoqtDataParserVisitor* visitor) : stream_(*stream), visitor_(*visitor) {} @@ -282,7 +282,7 @@ void ParseError(absl::string_view reason); - quiche::ReadStream& stream_; + webtransport::Stream& stream_; MoqtDataParserVisitor& visitor_; bool no_more_data_ = false; // Fatal error or fin. No more parsing.
diff --git a/quiche/quic/moqt/moqt_probe_manager.cc b/quiche/quic/moqt/moqt_probe_manager.cc index fefab71..953fe6e 100644 --- a/quiche/quic/moqt/moqt_probe_manager.cc +++ b/quiche/quic/moqt/moqt_probe_manager.cc
@@ -19,8 +19,8 @@ #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/wire_serialization.h" +#include "quiche/web_transport/stream_helpers.h" #include "quiche/web_transport/web_transport.h" namespace moqt { @@ -73,7 +73,7 @@ } if (!header_sent_) { - absl::Status status = quiche::WriteIntoStream( + absl::Status status = webtransport::WriteIntoStream( *stream_, *quiche::SerializeIntoString(quiche::WireVarInt62( MoqtDataStreamType::Padding().value()))); QUICHE_DCHECK(status.ok()) << status; // Should succeed if CanWrite(). @@ -84,7 +84,7 @@ quic::QuicByteCount chunk_size = std::min(kWriteChunkSize, data_remaining_); quiche::QuicheMemSlice chunk( kZeroes, chunk_size, +[](absl::string_view) {}); - quiche::StreamWriteOptions options; + webtransport::StreamWriteOptions options; options.set_send_fin(chunk_size == data_remaining_); absl::Status status = stream_->Writev(absl::MakeSpan(&chunk, 1), options); QUICHE_DCHECK(status.ok()) << status; // Should succeed if CanWrite().
diff --git a/quiche/quic/moqt/moqt_probe_manager_test.cc b/quiche/quic/moqt/moqt_probe_manager_test.cc index 5b7c2f3..9f97832 100644 --- a/quiche/quic/moqt/moqt_probe_manager_test.cc +++ b/quiche/quic/moqt/moqt_probe_manager_test.cc
@@ -21,7 +21,6 @@ #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" #include "quiche/web_transport/web_transport.h" @@ -49,8 +48,9 @@ MockStream(webtransport::StreamId id) : id_(id) {} webtransport::StreamId GetStreamId() const override { return id_; } - absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) override { + absl::Status Writev( + absl::Span<quiche::QuicheMemSlice> data, + const webtransport::StreamWriteOptions& options) override { QUICHE_CHECK(!fin_) << "FIN written twice."; for (const quiche::QuicheMemSlice& slice : data) { data_.append(slice.AsStringView());
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index fff6197..09354b2 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -50,7 +50,6 @@ #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/quiche_weak_ptr.h" #include "quiche/web_transport/web_transport.h" @@ -681,7 +680,7 @@ case MoqtFetchTask::GetNextObjectResult::kEof: // TODO(martinduke): Either prefetch the next object, or alter the API // so that we're not sending FIN in a separate frame. - if (!quiche::SendFinOnStream(*stream_).ok()) { + if (!webtransport::SendFinOnStream(*stream_).ok()) { QUIC_DVLOG(1) << "Sending FIN onStream " << stream_->GetStreamId() << " failed"; } @@ -2405,7 +2404,7 @@ // memslices so that we can avoid a copy here. std::array write_vector = { quiche::QuicheMemSlice(std::move(serialized_header)), std::move(payload)}; - quiche::StreamWriteOptions options; + webtransport::StreamWriteOptions options; options.set_send_fin(fin); absl::Status write_status = stream->Writev(absl::MakeSpan(write_vector), options);
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 4f0feb7..cb65ccd 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -46,7 +46,6 @@ #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_data_reader.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/quiche_weak_ptr.h" #include "quiche/web_transport/test_tools/in_memory_stream.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" @@ -228,7 +227,7 @@ }); } EXPECT_CALL(*stream, PeekNextReadableRegion()).WillRepeatedly([&]() { - return quiche::ReadStream::PeekResult( + return webtransport::Stream::PeekResult( absl::string_view(buffer.data() + data_read, buffer.size() - data_read), fin && data_read == buffer.size(), fin); @@ -242,7 +241,7 @@ std::min(bytes_to_read.size(), buffer.size() - data_read); memcpy(bytes_to_read.data(), buffer.data() + data_read, read_size); data_read += read_size; - return quiche::ReadStream::ReadResult( + return webtransport::Stream::ReadResult( read_size, fin && data_read == buffer.size()); }); EXPECT_CALL(*stream, SkipBytes(_)).WillRepeatedly([&](size_t bytes) { @@ -326,7 +325,7 @@ }); EXPECT_CALL(mock_stream_, PeekNextReadableRegion()) .WillOnce(Return( - quiche::ReadStream::PeekResult(absl::string_view(), false, false))); + webtransport::Stream::PeekResult(absl::string_view(), false, false))); server_session.OnIncomingBidirectionalStreamAvailable(); } @@ -1320,7 +1319,7 @@ const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f, 0x00, 0x0a}; EXPECT_CALL(mock_stream_, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { correct_message = absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin |= options.send_fin(); @@ -1377,7 +1376,7 @@ const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f}; EXPECT_CALL(mock_stream_, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { correct_message = absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin = options.send_fin(); @@ -1431,7 +1430,7 @@ const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f}; EXPECT_CALL(mock_stream_, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { correct_message = absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin |= options.send_fin(); @@ -1499,7 +1498,7 @@ const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f}; EXPECT_CALL(mock_stream_, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { correct_message = absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin |= options.send_fin(); @@ -1569,7 +1568,7 @@ const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f}; EXPECT_CALL(mock_stream_, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { correct_message = absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin |= options.send_fin(); @@ -1625,7 +1624,7 @@ const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f}; EXPECT_CALL(mock_stream_, Writev) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { correct_message = absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin = options.send_fin(); @@ -1649,7 +1648,7 @@ fin = false; EXPECT_CALL(mock_stream_, Writev) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { EXPECT_TRUE(data.empty()); fin = options.send_fin(); return absl::OkStatus(); @@ -1688,7 +1687,7 @@ const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00}; EXPECT_CALL(mock_stream_, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { correct_message = absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin = options.send_fin(); @@ -1737,7 +1736,7 @@ }); EXPECT_CALL(mock_stream_, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { correct_message = absl::StartsWith(data[0].AsStringView(), kExpectedMessage2); fin = options.send_fin(); @@ -1779,7 +1778,7 @@ const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00}; EXPECT_CALL(mock_stream_, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { correct_message = absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin = options.send_fin(); @@ -2117,7 +2116,7 @@ .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream_, Writev) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { // The stream type omits the priority. EXPECT_TRUE(static_cast<const uint8_t>(data[0].AsStringView()[0]) & MoqtDataStreamType::kDefaultPriority); @@ -2269,21 +2268,21 @@ EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream0, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { // The Group ID is the 3rd byte of the stream. EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0); return absl::OkStatus(); }); EXPECT_CALL(mock_stream1, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { // The Group ID is the 3rd byte of the stream. EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 1); return absl::OkStatus(); }); EXPECT_CALL(mock_stream2, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { // The Group ID is the 3rd byte of the stream. EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 2); return absl::OkStatus(); @@ -2369,7 +2368,7 @@ EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream0, Writev) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { // Check track alias is 14. EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[1]), 14); // Check Group ID is 0 @@ -2405,7 +2404,7 @@ EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream1, Writev(_, _)) .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { // Check track alias is 15. EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[1]), 15); // Check Group ID is 0 @@ -2460,7 +2459,7 @@ if (second_result == MoqtFetchTask::GetNextObjectResult::kEof) { EXPECT_CALL(data_stream, Writev) .WillOnce([](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { quic::QuicDataReader reader(data[0].AsStringView()); uint64_t type; EXPECT_TRUE(reader.ReadVarInt62(&type)); @@ -2469,7 +2468,7 @@ return absl::OkStatus(); }) .WillOnce([](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { EXPECT_TRUE(data.empty()); EXPECT_TRUE(options.send_fin()); return absl::OkStatus(); @@ -2478,7 +2477,7 @@ } EXPECT_CALL(data_stream, Writev) .WillOnce([](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const webtransport::StreamWriteOptions& options) { quic::QuicDataReader reader(data[0].AsStringView()); uint64_t type; EXPECT_TRUE(reader.ReadVarInt62(&type));
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc index 5858001..970ee46 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
@@ -17,8 +17,8 @@ #include "quiche/quic/moqt/moqt_parser.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_buffer_allocator.h" -#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/test_tools/in_memory_stream.h" +#include "quiche/web_transport/web_transport.h" namespace moqt::test { @@ -213,7 +213,7 @@ absl::Status StoreSubscribe::operator()( absl::Span<const absl::string_view> data, - const quiche::StreamWriteOptions& options) const { + const webtransport::StreamWriteOptions& options) const { std::string merged_message = absl::StrJoin(data, ""); std::vector<MoqtGenericFrame> frames = ParseGenericMessage(merged_message); if (frames.size() != 1 || !std::holds_alternative<MoqtSubscribe>(frames[0])) {
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.h b/quiche/quic/moqt/test_tools/moqt_framer_utils.h index cd5135d..6aa81aa 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.h +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
@@ -19,7 +19,7 @@ #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_data_reader.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" +#include "quiche/web_transport/web_transport.h" namespace moqt::test { @@ -83,8 +83,9 @@ : subscribe_(subscribe) {} // quiche::WriteStream::Writev() implementation. - absl::Status operator()(absl::Span<const absl::string_view> data, - const quiche::StreamWriteOptions& options) const; + absl::Status operator()( + absl::Span<const absl::string_view> data, + const webtransport::StreamWriteOptions& options) const; private: std::optional<MoqtSubscribe>* subscribe_;
diff --git a/quiche/quic/tools/web_transport_test_visitors.h b/quiche/quic/tools/web_transport_test_visitors.h index e33eedd..88ee359 100644 --- a/quiche/quic/tools/web_transport_test_visitors.h +++ b/quiche/quic/tools/web_transport_test_visitors.h
@@ -5,16 +5,18 @@ #ifndef QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_ #define QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_ +#include <memory> #include <string> #include "absl/status/status.h" +#include "absl/strings/string_view.h" #include "quiche/quic/core/web_transport_interface.h" #include "quiche/quic/platform/api/quic_logging.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_circular_deque.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/simple_buffer_allocator.h" #include "quiche/web_transport/complete_buffer_visitor.h" +#include "quiche/web_transport/stream_helpers.h" #include "quiche/web_transport/web_transport.h" namespace quic { @@ -32,7 +34,7 @@ << " bytes from WebTransport stream " << stream_->GetStreamId() << ", fin: " << result.fin; if (bidi_ && result.fin) { - absl::Status status = quiche::SendFinOnStream(*stream_); + absl::Status status = webtransport::SendFinOnStream(*stream_); QUICHE_DCHECK(status.ok()) << status; } } @@ -114,7 +116,7 @@ } if (!buffer_.empty()) { - absl::Status status = quiche::WriteIntoStream(*stream_, buffer_); + absl::Status status = webtransport::WriteIntoStream(*stream_, buffer_); QUIC_DVLOG(1) << "Attempted writing on WebTransport bidirectional stream " << stream_->GetStreamId() << ", success: " << status; if (!status.ok()) { @@ -125,7 +127,7 @@ } if (send_fin_ && !fin_sent_) { - absl::Status status = quiche::SendFinOnStream(*stream_); + absl::Status status = webtransport::SendFinOnStream(*stream_); if (status.ok()) { fin_sent_ = true; }
diff --git a/quiche/web_transport/complete_buffer_visitor.cc b/quiche/web_transport/complete_buffer_visitor.cc index 45980c5..a2fbd3f 100644 --- a/quiche/web_transport/complete_buffer_visitor.cc +++ b/quiche/web_transport/complete_buffer_visitor.cc
@@ -7,8 +7,10 @@ #include <string> #include <utility> +#include "absl/status/status.h" #include "quiche/common/platform/api/quiche_logging.h" -#include "quiche/common/quiche_stream.h" +#include "quiche/web_transport/stream_helpers.h" +#include "quiche/web_transport/web_transport.h" namespace webtransport { @@ -30,10 +32,9 @@ if (!stream_->CanWrite()) { return; } - quiche::StreamWriteOptions options; + StreamWriteOptions options; options.set_send_fin(true); - absl::Status status = - quiche::WriteIntoStream(*stream_, *outgoing_data_, options); + absl::Status status = WriteIntoStream(*stream_, *outgoing_data_, options); if (!status.ok()) { QUICHE_DLOG(WARNING) << "Write from OnCanWrite() failed: " << status; return;
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc index 5a00803..ccdf6bb 100644 --- a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc +++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
@@ -11,7 +11,6 @@ #include <cstddef> #include <cstdint> #include <cstring> -#include <iterator> #include <memory> #include <optional> #include <string> @@ -19,7 +18,6 @@ #include <utility> #include <vector> -#include "absl/algorithm/container.h" #include "absl/container/node_hash_map.h" #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -36,8 +34,8 @@ #include "quiche/common/quiche_circular_deque.h" #include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_status_utils.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/vectorized_io_utils.h" +#include "quiche/web_transport/stream_helpers.h" #include "quiche/web_transport/web_transport.h" namespace webtransport { @@ -69,8 +67,7 @@ void EncapsulatedSession::InitializeClient( std::unique_ptr<SessionVisitor> visitor, - quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer, - quiche::ReadStream* reader) { + quiche::HttpHeaderBlock& /*outgoing_headers*/, Stream* underlying) { if (state_ != kUninitialized) { OnFatalError("Called InitializeClient() in an invalid state"); return; @@ -81,16 +78,14 @@ } visitor_ = std::move(visitor); - writer_ = writer; - reader_ = reader; + underlying_ = underlying; state_ = kWaitingForHeaders; } void EncapsulatedSession::InitializeServer( std::unique_ptr<SessionVisitor> visitor, const quiche::HttpHeaderBlock& /*incoming_headers*/, - quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer, - quiche::ReadStream* reader) { + quiche::HttpHeaderBlock& /*outgoing_headers*/, Stream* underlying) { if (state_ != kUninitialized) { OnFatalError("Called InitializeServer() in an invalid state"); return; @@ -101,8 +96,7 @@ } visitor_ = std::move(visitor); - writer_ = writer; - reader_ = reader; + underlying_ = underlying; OpenSession(); } void EncapsulatedSession::ProcessIncomingServerHeaders( @@ -239,7 +233,7 @@ // datagrams are not subject to queueing. case kWaitingForHeaders: case kSessionOpen: - write_blocked = !writer_->CanWrite(); + write_blocked = !underlying_->CanWrite(); break; case kSessionClosing: case kSessionClosed: @@ -261,7 +255,7 @@ std::array spans = {quiche::QuicheMemSlice(std::move(buffer)), quiche::QuicheMemSlice::Copy(datagram)}; absl::Status write_status = - writer_->Writev(absl::MakeSpan(spans), quiche::StreamWriteOptions()); + underlying_->Writev(absl::MakeSpan(spans), StreamWriteOptions()); if (!write_status.ok()) { OnWriteError(write_status); return DatagramStatus{ @@ -281,7 +275,7 @@ } void EncapsulatedSession::OnCanWrite() { - if (state_ == kUninitialized || !writer_) { + if (state_ == kUninitialized || !underlying_) { OnFatalError("Trying to write before the session is initialized"); return; } @@ -291,7 +285,7 @@ } if (state_ == kSessionClosing) { - if (writer_->CanWrite()) { + if (underlying_->CanWrite()) { CloseWebTransportSessionCapsule capsule{ buffered_session_close_.error_code, buffered_session_close_.error_message}; @@ -309,9 +303,9 @@ return; } - while (writer_->CanWrite() && !control_capsule_queue_.empty()) { - absl::Status write_status = quiche::WriteIntoStream( - *writer_, control_capsule_queue_.front().AsStringView()); + while (underlying_->CanWrite() && !control_capsule_queue_.empty()) { + absl::Status write_status = WriteIntoStream( + *underlying_, control_capsule_queue_.front().AsStringView()); if (!write_status.ok()) { OnWriteError(write_status); return; @@ -319,7 +313,7 @@ control_capsule_queue_.pop_front(); } - while (writer_->CanWrite()) { + while (underlying_->CanWrite()) { absl::StatusOr<StreamId> next_id = scheduler_.PopFront(); if (!next_id.ok()) { QUICHE_DCHECK_EQ(next_id.status().code(), absl::StatusCode::kNotFound); @@ -340,8 +334,8 @@ if (state_ == kSessionClosed || state_ == kSessionClosing) { return; } - bool has_fin = quiche::ProcessAllReadableRegions( - *reader_, [&](absl::string_view fragment) { + bool has_fin = + ProcessAllReadableRegions(*underlying_, [&](absl::string_view fragment) { capsule_parser_.IngestCapsuleFragment(fragment); }); if (has_fin) { @@ -488,9 +482,9 @@ absl::Status EncapsulatedSession::SendFin(absl::string_view data) { QUICHE_DCHECK(!fin_sent_); fin_sent_ = true; - quiche::StreamWriteOptions options; + StreamWriteOptions options; options.set_send_fin(true); - return quiche::WriteIntoStream(*writer_, data, options); + return WriteIntoStream(*underlying_, data, options); } void EncapsulatedSession::OnSessionClosed(SessionErrorCode error_code, @@ -516,8 +510,7 @@ } void EncapsulatedSession::OnFatalError(absl::string_view error_message) { - QUICHE_DLOG(ERROR) << "Fatal error in encapsulated WebTransport: " - << error_message; + QUICHE_DLOG(ERROR) << "Fatal error in encapsulated " << error_message; state_ = kSessionClosed; if (fatal_error_callback_) { std::move(fatal_error_callback_)(error_message); @@ -549,7 +542,7 @@ } } -quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read( +Stream::ReadResult EncapsulatedSession::InnerStream::Read( absl::Span<char> output) { const size_t total_size = output.size(); for (const IncomingRead& read : incoming_reads_) { @@ -563,8 +556,7 @@ bool fin_consumed = SkipBytes(total_size); return ReadResult{total_size, fin_consumed}; } -quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read( - std::string* output) { +Stream::ReadResult EncapsulatedSession::InnerStream::Read(std::string* output) { const size_t total_size = ReadableBytes(); const size_t initial_offset = output->size(); output->resize(initial_offset + total_size); @@ -577,8 +569,8 @@ } return total_size; } -quiche::ReadStream::PeekResult -EncapsulatedSession::InnerStream::PeekNextReadableRegion() const { +Stream::PeekResult EncapsulatedSession::InnerStream::PeekNextReadableRegion() + const { if (incoming_reads_.empty()) { return PeekResult{absl::string_view(), fin_received_, fin_received_}; } @@ -614,7 +606,7 @@ absl::Status EncapsulatedSession::InnerStream::Writev( const absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { + const StreamWriteOptions& options) { // TODO: support zero copy. std::vector<absl::string_view> views; views.reserve(data.size()); @@ -641,8 +633,8 @@ session_->OnFatalError("Stream not registered with the scheduler"); return absl::InternalError("Stream not registered with the scheduler"); } - const bool write_blocked = !session_->writer_->CanWrite() || *should_yield || - !pending_write_.empty(); + const bool write_blocked = !session_->underlying_->CanWrite() || + *should_yield || !pending_write_.empty(); if (write_blocked) { fin_buffered_ = options.send_fin(); for (absl::string_view chunk : views) { @@ -681,7 +673,7 @@ void EncapsulatedSession::InnerStream::FlushPendingWrite() { QUICHE_DCHECK(!write_side_closed_); - QUICHE_DCHECK(session_->writer_->CanWrite()); + QUICHE_DCHECK(session_->underlying_->CanWrite()); QUICHE_DCHECK(!pending_write_.empty()); absl::string_view to_write = pending_write_; size_t bytes_written = @@ -716,8 +708,8 @@ // TODO: support zero copy. views_to_write.push_back(quiche::QuicheMemSlice::Copy(view)); } - absl::Status write_status = session_->writer_->Writev( - absl::MakeSpan(views_to_write), quiche::kDefaultStreamWriteOptions); + absl::Status write_status = session_->underlying_->Writev( + absl::MakeSpan(views_to_write), kDefaultStreamWriteOptions); if (!write_status.ok()) { session_->OnWriteError(write_status); return 0; @@ -725,12 +717,6 @@ return total_size; } -void EncapsulatedSession::InnerStream::AbruptlyTerminate(absl::Status error) { - QUICHE_DLOG(INFO) << "Abruptly terminating the stream due to error: " - << error; - ResetDueToInternalError(); -} - void EncapsulatedSession::InnerStream::ResetWithUserCode( StreamErrorCode error) { if (reset_frame_sent_) {
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.h b/quiche/web_transport/encapsulated/encapsulated_web_transport.h index 44f489a..cb0d416 100644 --- a/quiche/web_transport/encapsulated/encapsulated_web_transport.h +++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
@@ -26,7 +26,6 @@ #include "quiche/common/quiche_callbacks.h" #include "quiche/common/quiche_circular_deque.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/simple_buffer_allocator.h" #include "quiche/web_transport/web_transport.h" #include "quiche/web_transport/web_transport_priority_scheduler.h" @@ -47,9 +46,8 @@ // arbitrary bidirectional bytestream that can be prefixed with HTTP headers. // Specification: https://datatracker.ietf.org/doc/draft-ietf-webtrans-http2/ class QUICHE_EXPORT EncapsulatedSession - : public webtransport::Session, - public quiche::WriteStreamVisitor, - public quiche::ReadStreamVisitor, + : public Session, + public StreamVisitor, public quiche::CapsuleParser::Visitor { public: // The state machine of the transport. @@ -77,16 +75,14 @@ // thus, the headers are necessary to initialize the session. void InitializeClient(std::unique_ptr<SessionVisitor> visitor, quiche::HttpHeaderBlock& outgoing_headers, - quiche::WriteStream* writer, - quiche::ReadStream* reader); + Stream* underlying); void InitializeServer(std::unique_ptr<SessionVisitor> visitor, const quiche::HttpHeaderBlock& incoming_headers, quiche::HttpHeaderBlock& outgoing_headers, - quiche::WriteStream* writer, - quiche::ReadStream* reader); + Stream* underlying); void ProcessIncomingServerHeaders(const quiche::HttpHeaderBlock& headers); - // webtransport::Session implementation. + // Session implementation. void CloseSession(SessionErrorCode error_code, absl::string_view error_message) override; Stream* AcceptIncomingBidirectionalStream() override; @@ -105,10 +101,16 @@ void SetOnDraining(quiche::SingleUseCallback<void()> callback) override; std::optional<std::string> GetNegotiatedSubprotocol() const override; - // quiche::WriteStreamVisitor implementation. + // StreamVisitor implementation. void OnCanWrite() override; - // quiche::ReadStreamVisitor implementation. void OnCanRead() override; + void OnResetStreamReceived(StreamErrorCode) override { + OnFatalError("Underlying stream was reset"); + } + void OnStopSendingReceived(StreamErrorCode) override { + OnFatalError("Underlying stream was reset"); + } + void OnWriteSideInDataRecvdState() override {} // quiche::CapsuleParser::Visitor implementation. bool OnCapsule(const quiche::Capsule& capsule) override; void OnCapsuleParseFailure(absl::string_view error_message) override; @@ -147,12 +149,9 @@ // WriteStream implementation. absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) override; + const StreamWriteOptions& options) override; bool CanWrite() const override; - // TerminableStream implementation. - void AbruptlyTerminate(absl::Status error) override; - // Stream implementation. StreamId GetStreamId() const override { return id_; } StreamVisitor* visitor() override { return visitor_.get(); } @@ -224,8 +223,7 @@ std::unique_ptr<SessionVisitor> visitor_ = nullptr; FatalErrorCallback fatal_error_callback_; quiche::SingleUseCallback<void()> draining_callback_; - quiche::WriteStream* writer_ = nullptr; // Not owned. - quiche::ReadStream* reader_ = nullptr; // Not owned. + Stream* underlying_ = nullptr; // Not owned. quiche::QuicheBufferAllocator* allocator_ = quiche::SimpleBufferAllocator::Get(); quiche::CapsuleParser capsule_parser_;
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc index 6f27bc6..f80d932 100644 --- a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc +++ b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
@@ -8,7 +8,6 @@ #include <memory> #include <string> #include <utility> -#include <vector> #include "absl/status/status.h" #include "absl/strings/string_view.h" @@ -17,11 +16,10 @@ #include "quiche/common/http/http_header_block.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_buffer_allocator.h" -#include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/simple_buffer_allocator.h" -#include "quiche/common/test_tools/mock_streams.h" #include "quiche/common/test_tools/quiche_test_utils.h" +#include "quiche/web_transport/stream_helpers.h" +#include "quiche/web_transport/test_tools/in_memory_stream.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" #include "quiche/web_transport/web_transport.h" @@ -41,20 +39,15 @@ class EncapsulatedWebTransportTest : public quiche::test::QuicheTest, public quiche::CapsuleParser::Visitor { public: - EncapsulatedWebTransportTest() : parser_(this), reader_(&read_buffer_) { + EncapsulatedWebTransportTest() : parser_(this), underlying_(0) { ON_CALL(fatal_error_callback_, Call(_)) .WillByDefault([](absl::string_view error) { ADD_FAILURE() << "Fatal session error: " << error; }); - ON_CALL(writer_, Writev(_, _)) - .WillByDefault([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { - for (const quiche::QuicheMemSlice& fragment : data) { - parser_.IngestCapsuleFragment(fragment.AsStringView()); - } - writer_.ProcessOptions(options); - return absl::OkStatus(); - }); + ON_CALL(underlying_, OnWrite).WillByDefault([&](absl::string_view data) { + parser_.IngestCapsuleFragment(data); + return absl::OkStatus(); + }); } std::unique_ptr<EncapsulatedSession> CreateTransport( @@ -80,7 +73,7 @@ void ProcessIncomingCapsule(const Capsule& capsule) { quiche::QuicheBuffer buffer = quiche::SerializeCapsule(capsule, quiche::SimpleBufferAllocator::Get()); - read_buffer_.append(buffer.data(), buffer.size()); + underlying_.Receive(buffer.AsStringView()); session_->OnCanRead(); } @@ -88,23 +81,21 @@ void ProcessIncomingCapsule(const CapsuleType& capsule) { quiche::QuicheBuffer buffer = quiche::SerializeCapsule( quiche::Capsule(capsule), quiche::SimpleBufferAllocator::Get()); - read_buffer_.append(buffer.data(), buffer.size()); + underlying_.Receive(buffer.AsStringView()); session_->OnCanRead(); } void DefaultHandshakeForClient(EncapsulatedSession& session) { quiche::HttpHeaderBlock outgoing_headers, incoming_headers; session.InitializeClient(CreateAndStoreVisitor(), outgoing_headers, - &writer_, &reader_); + &underlying_); EXPECT_CALL(*visitor_, OnSessionReady()); session.ProcessIncomingServerHeaders(incoming_headers); } protected: quiche::CapsuleParser parser_; - quiche::test::MockWriteStream writer_; - std::string read_buffer_; - quiche::test::ReadStreamFromString reader_; + InMemoryStreamWithMockWrite underlying_; MockSessionVisitor* visitor_ = nullptr; EncapsulatedSession* session_ = nullptr; testing::MockFunction<void(absl::string_view)> fatal_error_callback_; @@ -127,8 +118,8 @@ CreateTransport(Perspective::kClient); quiche::HttpHeaderBlock outgoing_headers, incoming_headers; EXPECT_EQ(session->state(), EncapsulatedSession::kUninitialized); - session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_, - &reader_); + session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, + &underlying_); EXPECT_EQ(session->state(), EncapsulatedSession::kWaitingForHeaders); EXPECT_CALL(*visitor_, OnSessionReady()); session->ProcessIncomingServerHeaders(incoming_headers); @@ -143,7 +134,7 @@ std::unique_ptr<SessionVisitor> visitor = CreateAndStoreVisitor(); EXPECT_CALL(*visitor_, OnSessionReady()); session->InitializeServer(std::move(visitor), outgoing_headers, - incoming_headers, &writer_, &reader_); + incoming_headers, &underlying_); EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen); } @@ -162,7 +153,7 @@ EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close"))); session->CloseSession(0x1234, "test close"); EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed); - EXPECT_TRUE(writer_.fin_written()); + EXPECT_TRUE(underlying_.fin_sent()); EXPECT_CALL(fatal_error_callback_, Call(_)) .WillOnce([](absl::string_view error) { @@ -175,7 +166,8 @@ std::unique_ptr<EncapsulatedSession> session = CreateTransport(Perspective::kClient); DefaultHandshakeForClient(*session); - EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false)); + EXPECT_CALL(underlying_, GetWriteStatus) + .WillOnce(Return(absl::UnavailableError("Write-blocked"))); EXPECT_CALL(*this, OnCapsule(_)).Times(0); EXPECT_EQ(session->state(), EncapsulatedSession::kSessionOpen); session->CloseSession(0x1234, "test close"); @@ -188,11 +180,12 @@ "test close"); return true; }); - EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(true)); + EXPECT_CALL(underlying_, GetWriteStatus) + .WillRepeatedly(Return(absl::OkStatus())); EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test close"))); session->OnCanWrite(); EXPECT_EQ(session->state(), EncapsulatedSession::kSessionClosed); - EXPECT_TRUE(writer_.fin_written()); + EXPECT_TRUE(underlying_.fin_sent()); } TEST_F(EncapsulatedWebTransportTest, ReceiveFin) { @@ -201,9 +194,9 @@ DefaultHandshakeForClient(*session); EXPECT_CALL(*visitor_, OnSessionClosed(0, IsEmpty())); - reader_.set_fin(); + underlying_.Receive("", /*fin=*/true); session->OnCanRead(); - EXPECT_TRUE(writer_.fin_written()); + EXPECT_TRUE(underlying_.fin_sent()); } TEST_F(EncapsulatedWebTransportTest, ReceiveCloseSession) { @@ -213,8 +206,8 @@ EXPECT_CALL(*visitor_, OnSessionClosed(0x1234, StrEq("test"))); ProcessIncomingCapsule(Capsule::CloseWebTransportSession(0x1234, "test")); - EXPECT_TRUE(writer_.fin_written()); - reader_.set_fin(); + EXPECT_TRUE(underlying_.fin_sent()); + underlying_.Receive("", /*fin=*/true); session->OnCanRead(); } @@ -225,7 +218,7 @@ EXPECT_CALL(fatal_error_callback_, Call(HasSubstr("too much capsule data"))) .WillOnce([] {}); - read_buffer_ = std::string(2 * 1024 * 1024, '\xff'); + underlying_.Receive(std::string(2 * 1024 * 1024, '\xff')); session->OnCanRead(); } @@ -246,8 +239,8 @@ std::unique_ptr<EncapsulatedSession> session = CreateTransport(Perspective::kClient); quiche::HttpHeaderBlock outgoing_headers; - session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, &writer_, - &reader_); + session->InitializeClient(CreateAndStoreVisitor(), outgoing_headers, + &underlying_); EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) { EXPECT_EQ(capsule.capsule_type(), quiche::CapsuleType::DATAGRAM); EXPECT_EQ(capsule.datagram_capsule().http_datagram_payload, "test"); @@ -317,9 +310,11 @@ std::unique_ptr<EncapsulatedSession> session = CreateTransport(Perspective::kClient); DefaultHandshakeForClient(*session); - EXPECT_CALL(writer_, Writev(_, _)) + // Let the CanWrite() check pass, then fail on the actual write. + EXPECT_CALL(underlying_, GetWriteStatus) + .WillOnce(Return(absl::OkStatus())) .WillOnce(Return(absl::InternalError("Test write error"))); - EXPECT_CALL(fatal_error_callback_, Call(_)) + EXPECT_CALL(fatal_error_callback_, Call) .WillOnce([](absl::string_view error) { EXPECT_THAT(error, HasSubstr("Test write error")); }); @@ -331,9 +326,11 @@ std::unique_ptr<EncapsulatedSession> session = CreateTransport(Perspective::kClient); DefaultHandshakeForClient(*session); - EXPECT_CALL(writer_, Writev(_, _)) + // Let the CanWrite() check pass, then fail on the actual write. + EXPECT_CALL(underlying_, GetWriteStatus) + .WillOnce(Return(absl::OkStatus())) .WillOnce(Return(absl::InternalError("Test write error"))); - EXPECT_CALL(fatal_error_callback_, Call(_)) + EXPECT_CALL(fatal_error_callback_, Call) .WillOnce([](absl::string_view error) { EXPECT_THAT(error, HasSubstr("Test write error")); }); @@ -358,13 +355,13 @@ EXPECT_EQ(stream->visitor(), nullptr); EXPECT_EQ(stream->ReadableBytes(), 4u); - quiche::ReadStream::PeekResult peek = stream->PeekNextReadableRegion(); + Stream::PeekResult peek = stream->PeekNextReadableRegion(); EXPECT_EQ(peek.peeked_data, "test"); EXPECT_FALSE(peek.fin_next); EXPECT_FALSE(peek.all_data_received); std::string buffer; - quiche::ReadStream::ReadResult read = stream->Read(&buffer); + Stream::ReadResult read = stream->Read(&buffer); EXPECT_EQ(read.bytes_read, 4); EXPECT_FALSE(read.fin); EXPECT_EQ(buffer, "test"); @@ -418,7 +415,7 @@ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "ef", true}); - quiche::ReadStream::PeekResult peek = stream->PeekNextReadableRegion(); + Stream::PeekResult peek = stream->PeekNextReadableRegion(); EXPECT_EQ(peek.peeked_data, "abcd"); EXPECT_FALSE(peek.fin_next); EXPECT_TRUE(peek.all_data_received); @@ -449,7 +446,7 @@ EXPECT_EQ(stream->ReadableBytes(), 6u); std::array<char, 3> buffer; - quiche::ReadStream::ReadResult read = stream->Read(absl::MakeSpan(buffer)); + Stream::ReadResult read = stream->Read(absl::MakeSpan(buffer)); EXPECT_THAT(buffer, ElementsAre('a', 'b', 'c')); EXPECT_EQ(read.bytes_read, 3); EXPECT_FALSE(read.fin); @@ -473,7 +470,7 @@ for (int i = 0; i < 64; i++) { std::array<char, 1024> buffer; - quiche::ReadStream::ReadResult read = stream->Read(absl::MakeSpan(buffer)); + Stream::ReadResult read = stream->Read(absl::MakeSpan(buffer)); EXPECT_EQ(read.bytes_read, 1024); EXPECT_EQ(read.fin, i == 63); } @@ -552,7 +549,7 @@ EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; }); EXPECT_CALL(*this, OnCapsule(_)).WillOnce(Return(true)); - quiche::StreamWriteOptions options; + StreamWriteOptions options; options.set_send_fin(true); EXPECT_THAT(stream->Writev(absl::Span<quiche::QuicheMemSlice>(), options), StatusIs(absl::StatusCode::kOk)); @@ -576,7 +573,7 @@ EXPECT_EQ(capsule.web_transport_stream_data().data, "test"); return true; }); - absl::Status status = quiche::WriteIntoStream(*stream, "test"); + absl::Status status = WriteIntoStream(*stream, "test"); EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk)); } @@ -594,10 +591,10 @@ EXPECT_EQ(capsule.web_transport_stream_data().data, "test"); return true; }); - quiche::StreamWriteOptions options; + StreamWriteOptions options; options.set_send_fin(true); EXPECT_TRUE(stream->CanWrite()); - absl::Status status = quiche::WriteIntoStream(*stream, "test", options); + absl::Status status = WriteIntoStream(*stream, "test", options); EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk)); EXPECT_FALSE(stream->CanWrite()); } @@ -616,7 +613,7 @@ EXPECT_EQ(capsule.web_transport_stream_data().data, ""); return true; }); - quiche::StreamWriteOptions options; + StreamWriteOptions options; options.set_send_fin(true); EXPECT_TRUE(stream->CanWrite()); absl::Status status = @@ -632,15 +629,17 @@ Stream* stream = session->OpenOutgoingUnidirectionalStream(); ASSERT_TRUE(stream != nullptr); - EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false)); - absl::Status status = quiche::WriteIntoStream(*stream, "abc"); + EXPECT_CALL(underlying_, GetWriteStatus) + .WillOnce(Return(absl::UnavailableError("Write-blocked"))); + absl::Status status = WriteIntoStream(*stream, "abc"); EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk)); // While the stream cannot be written right now, we should be still able to // buffer data into it. EXPECT_TRUE(stream->CanWrite()); - EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true)); - status = quiche::WriteIntoStream(*stream, "def"); + EXPECT_CALL(underlying_, GetWriteStatus) + .WillRepeatedly(Return(absl::OkStatus())); + status = WriteIntoStream(*stream, "def"); EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk)); EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) { @@ -659,13 +658,16 @@ Stream* stream = session->OpenOutgoingUnidirectionalStream(); ASSERT_TRUE(stream != nullptr); - EXPECT_CALL(writer_, CanWrite()).Times(2).WillRepeatedly(Return(false)); - absl::Status status = quiche::WriteIntoStream(*stream, "abc"); + EXPECT_CALL(underlying_, GetWriteStatus) + .Times(2) + .WillRepeatedly(Return(absl::UnavailableError("Write-blocked"))); + absl::Status status = WriteIntoStream(*stream, "abc"); EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk)); - status = quiche::WriteIntoStream(*stream, "def"); + status = WriteIntoStream(*stream, "def"); EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk)); - EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true)); + EXPECT_CALL(underlying_, GetWriteStatus) + .WillRepeatedly(Return(absl::OkStatus())); EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) { EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM); EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u); @@ -685,12 +687,14 @@ ASSERT_TRUE(stream2 != nullptr); EXPECT_CALL(*this, OnCapsule(_)).Times(0); - EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false)); - absl::Status status = quiche::WriteIntoStream(*stream1, "abc"); + EXPECT_CALL(underlying_, GetWriteStatus) + .WillOnce(Return(absl::UnavailableError("Write-blocked"))); + absl::Status status = WriteIntoStream(*stream1, "abc"); EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk)); // ShouldYield will return false here, causing the write to get buffered. - EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true)); - status = quiche::WriteIntoStream(*stream2, "abc"); + EXPECT_CALL(underlying_, GetWriteStatus) + .WillRepeatedly(Return(absl::OkStatus())); + status = WriteIntoStream(*stream2, "abc"); EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk)); std::vector<StreamId> writes;
diff --git a/quiche/web_transport/stream_helpers.h b/quiche/web_transport/stream_helpers.h new file mode 100644 index 0000000..e7ca5d4 --- /dev/null +++ b/quiche/web_transport/stream_helpers.h
@@ -0,0 +1,59 @@ +// Copyright 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// Helper functions for webtransport::Stream API. + +#ifndef QUICHE_COMMON_QUICHE_STREAM_H_ +#define QUICHE_COMMON_QUICHE_STREAM_H_ + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/types/span.h" +#include "quiche/common/quiche_callbacks.h" +#include "quiche/common/quiche_mem_slice.h" +#include "quiche/web_transport/web_transport.h" + +namespace webtransport { + +// Calls `callback` for every contiguous chunk available inside the stream. +// Returns true if the FIN has been reached. +inline bool ProcessAllReadableRegions( + Stream& stream, + quiche::UnretainedCallback<void(absl::string_view)> callback) { + for (;;) { + Stream::PeekResult peek_result = stream.PeekNextReadableRegion(); + if (!peek_result.has_data()) { + return false; + } + callback(peek_result.peeked_data); + bool fin = stream.SkipBytes(peek_result.peeked_data.size()); + if (fin) { + return true; + } + } +} + +// Convenience methods to write a single chunk of data into the stream. +inline absl::Status WriteIntoStream( + Stream& stream, quiche::QuicheMemSlice slice, + const StreamWriteOptions& options = kDefaultStreamWriteOptions) { + return stream.Writev(absl::MakeSpan(&slice, 1), options); +} +inline absl::Status WriteIntoStream( + Stream& stream, absl::string_view data, + const StreamWriteOptions& options = kDefaultStreamWriteOptions) { + quiche::QuicheMemSlice slice = quiche::QuicheMemSlice::Copy(data); + return stream.Writev(absl::MakeSpan(&slice, 1), options); +} + +// Convenience methods to send a FIN on the stream. +inline absl::Status SendFinOnStream(Stream& stream) { + StreamWriteOptions options; + options.set_send_fin(true); + return stream.Writev(absl::Span<quiche::QuicheMemSlice>(), options); +} + +} // namespace webtransport + +#endif // QUICHE_COMMON_QUICHE_STREAM_H_
diff --git a/quiche/web_transport/test_tools/in_memory_stream.cc b/quiche/web_transport/test_tools/in_memory_stream.cc index 167b90e..87f49a8 100644 --- a/quiche/web_transport/test_tools/in_memory_stream.cc +++ b/quiche/web_transport/test_tools/in_memory_stream.cc
@@ -15,13 +15,12 @@ #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/vectorized_io_utils.h" #include "quiche/web_transport/web_transport.h" namespace webtransport::test { -quiche::ReadStream::ReadResult InMemoryStream::Read(absl::Span<char> output) { +Stream::ReadResult InMemoryStream::Read(absl::Span<char> output) { std::vector<absl::string_view> chunks; for (absl::string_view chunk : buffer_.Chunks()) { chunks.push_back(chunk); @@ -31,7 +30,7 @@ return ReadResult{bytes_read, buffer_.empty() && fin_received_}; } -quiche::ReadStream::ReadResult InMemoryStream::Read(std::string* output) { +Stream::ReadResult InMemoryStream::Read(std::string* output) { ReadResult result; result.bytes_read = buffer_.size(); result.fin = fin_received_; @@ -42,7 +41,7 @@ size_t InMemoryStream::ReadableBytes() const { return buffer_.size(); } -quiche::ReadStream::PeekResult InMemoryStream::PeekNextReadableRegion() const { +Stream::PeekResult InMemoryStream::PeekNextReadableRegion() const { if (buffer_.empty()) { return PeekResult{"", fin_received_, fin_received_}; } @@ -58,8 +57,9 @@ return buffer_.empty() && fin_received_; } -absl::Status InMemoryStream::Writev(absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { +absl::Status InMemoryStream::Writev( + absl::Span<quiche::QuicheMemSlice> data, + const webtransport::StreamWriteOptions& options) { absl::Status status = GetWriteStatusWithExtraChecks(); if (!status.ok()) { return status;
diff --git a/quiche/web_transport/test_tools/in_memory_stream.h b/quiche/web_transport/test_tools/in_memory_stream.h index a35e8ea..5a27bde 100644 --- a/quiche/web_transport/test_tools/in_memory_stream.h +++ b/quiche/web_transport/test_tools/in_memory_stream.h
@@ -18,7 +18,6 @@ #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/web_transport.h" namespace webtransport::test { @@ -31,22 +30,19 @@ public: explicit InMemoryStream(StreamId id) : id_(id) {} - // quiche::ReadStream implementation. + // webtransport::Stream implementation. [[nodiscard]] ReadResult Read(absl::Span<char> output) override; [[nodiscard]] ReadResult Read(std::string* output) override; size_t ReadableBytes() const override; PeekResult PeekNextReadableRegion() const override; bool SkipBytes(size_t bytes) override; - // quiche::WriteStream implementation. absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) override; + const StreamWriteOptions& options) override; bool CanWrite() const override { return GetWriteStatusWithExtraChecks().ok(); } - void AbruptlyTerminate(absl::Status) override { Terminate(); } - // webtransport::Stream implementation. StreamId GetStreamId() const override { return id_; } void ResetWithUserCode(StreamErrorCode) override {
diff --git a/quiche/web_transport/test_tools/in_memory_stream_test.cc b/quiche/web_transport/test_tools/in_memory_stream_test.cc index 2c53df7..8068dfb 100644 --- a/quiche/web_transport/test_tools/in_memory_stream_test.cc +++ b/quiche/web_transport/test_tools/in_memory_stream_test.cc
@@ -13,8 +13,8 @@ #include "absl/types/span.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/common/test_tools/quiche_test_utils.h" +#include "quiche/web_transport/stream_helpers.h" #include "quiche/web_transport/web_transport.h" namespace webtransport::test { @@ -77,7 +77,7 @@ EXPECT_TRUE(result.all_data_received); std::string merged_result; - bool fin_reached = quiche::ProcessAllReadableRegions( + bool fin_reached = ProcessAllReadableRegions( stream, [&](absl::string_view chunk) { absl::StrAppend(&merged_result, chunk); }); EXPECT_EQ(merged_result, absl::StrCat(chunk_a, chunk_b)); @@ -89,8 +89,8 @@ EXPECT_TRUE(stream.CanWrite()); std::array write_vector = {quiche::QuicheMemSlice::Copy("test")}; - quiche::StreamWriteOptions options; EXPECT_CALL(stream, OnWrite("test")); + StreamWriteOptions options; QUICHE_EXPECT_OK(stream.Writev(absl::MakeSpan(write_vector), options)); EXPECT_FALSE(stream.fin_sent()); @@ -114,7 +114,7 @@ EXPECT_TRUE(stream.CanWrite()); std::array write_vector = {quiche::QuicheMemSlice::Copy("foo")}; - quiche::StreamWriteOptions options; + StreamWriteOptions options; QUICHE_EXPECT_OK(stream.Writev(absl::MakeSpan(write_vector), options)); EXPECT_FALSE(stream.fin_sent());
diff --git a/quiche/web_transport/test_tools/mock_web_transport.h b/quiche/web_transport/test_tools/mock_web_transport.h index 77b1595..c3aa295 100644 --- a/quiche/web_transport/test_tools/mock_web_transport.h +++ b/quiche/web_transport/test_tools/mock_web_transport.h
@@ -21,7 +21,6 @@ #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_callbacks.h" #include "quiche/common/quiche_mem_slice.h" -#include "quiche/common/quiche_stream.h" #include "quiche/web_transport/web_transport.h" namespace webtransport { @@ -42,12 +41,11 @@ MOCK_METHOD(ReadResult, Read, (std::string * output), (override)); MOCK_METHOD(absl::Status, Writev, (absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options), + const StreamWriteOptions& options), (override)); MOCK_METHOD(PeekResult, PeekNextReadableRegion, (), (const, override)); MOCK_METHOD(bool, SkipBytes, (size_t bytes), (override)); MOCK_METHOD(bool, CanWrite, (), (const, override)); - MOCK_METHOD(void, AbruptlyTerminate, (absl::Status), (override)); MOCK_METHOD(size_t, ReadableBytes, (), (const, override)); MOCK_METHOD(StreamId, GetStreamId, (), (const, override)); MOCK_METHOD(void, ResetWithUserCode, (StreamErrorCode error), (override));
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h index 2c60d72..1e84efb 100644 --- a/quiche/web_transport/web_transport.h +++ b/quiche/web_transport/web_transport.h
@@ -16,12 +16,13 @@ // The dependencies of this API should be kept minimal and independent of // specific transport implementations. +#include "absl/status/status.h" #include "absl/strings/string_view.h" #include "absl/time/time.h" #include "absl/types/span.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/quiche_callbacks.h" -#include "quiche/common/quiche_stream.h" +#include "quiche/common/quiche_mem_slice.h" namespace webtransport { @@ -117,11 +118,19 @@ // events related to a WebTransport stream. The visitor object is owned by the // stream itself, meaning that if the stream is ever fully closed, the visitor // will be garbage-collected. -class QUICHE_EXPORT StreamVisitor : public quiche::ReadStreamVisitor, - public quiche::WriteStreamVisitor { +class QUICHE_EXPORT StreamVisitor { public: virtual ~StreamVisitor() {} + // Called whenever the stream has new data available to read. Unless otherwise + // specified, QUICHE stream reads are level-triggered, which means that the + // callback will be called repeatedly as long as there is still data in the + // buffer. + virtual void OnCanRead() = 0; + + // Called whenever the stream is not write-blocked and can accept new data. + virtual void OnCanWrite() = 0; + // Called when RESET_STREAM is received for the stream. virtual void OnResetStreamReceived(StreamErrorCode error) = 0; // Called when STOP_SENDING is received for the stream. @@ -132,14 +141,119 @@ virtual void OnWriteSideInDataRecvdState() = 0; }; +// Options for writing data into a WriteStream. +class QUICHE_EXPORT StreamWriteOptions { + public: + StreamWriteOptions() = default; + + // If send_fin() is set to true, the write operation also sends a FIN on the + // stream. + bool send_fin() const { return send_fin_; } + void set_send_fin(bool send_fin) { send_fin_ = send_fin; } + + // If buffer_unconditionally() is set to true, the write operation will buffer + // data even if the internal buffer limit is exceeded. + bool buffer_unconditionally() const { return buffer_unconditionally_; } + void set_buffer_unconditionally(bool value) { + buffer_unconditionally_ = value; + } + + private: + bool send_fin_ = false; + bool buffer_unconditionally_ = false; +}; + +inline constexpr StreamWriteOptions kDefaultStreamWriteOptions = + StreamWriteOptions(); + // A stream (either bidirectional or unidirectional) that is contained within a // WebTransport session. -class QUICHE_EXPORT Stream : public quiche::ReadStream, - public quiche::WriteStream, - public quiche::TerminableStream { +// +// This interface is designed around the idea that a network stream stores all +// of the received data in a sequence of contiguous buffers. Because of that, +// there are two ways to read from a stream: +// - Read() will copy data into a user-provided buffer, reassembling it if it +// is split across multiple buffers internally. +// - PeekNextReadableRegion()/SkipBytes() let the caller access the underlying +// buffers directly, potentially avoiding the copying at the cost of the +// caller having to deal with discontinuities. +// +// The writes into a WebTransport stream are all-or-nothing. A Stream object +// has to either accept all data written into it by returning absl::OkStatus, or +// ask the caller to try again once via OnCanWrite() by returning +// absl::UnavailableError. +class QUICHE_EXPORT Stream { public: + struct QUICHE_EXPORT ReadResult { + // Number of bytes actually read. + size_t bytes_read = 0; + // Whether the FIN has been received; if true, no further data will arrive + // on the stream, and the stream object can be soon potentially garbage + // collected. + bool fin = false; + }; + + struct PeekResult { + // The next available chunk in the sequencer buffer. + absl::string_view peeked_data; + // True if all of the data up to the FIN has been read. + bool fin_next = false; + // True if all of the data up to the FIN has been received (but not + // necessarily read). + bool all_data_received = false; + + // Indicates that `SkipBytes()` will make progress if called. + bool has_data() const { return !peeked_data.empty() || fin_next; } + }; + virtual ~Stream() {} + // Reads at most `buffer.size()` bytes into `buffer`. + [[nodiscard]] virtual ReadResult Read(absl::Span<char> buffer) = 0; + + // Reads all available data and appends it to the end of `output`. + [[nodiscard]] virtual ReadResult Read(std::string* output) = 0; + + // Indicates the total number of bytes that can be read from the stream. + virtual size_t ReadableBytes() const = 0; + + // Returns a contiguous buffer to read (or an empty buffer, if there is no + // data to read). See `ProcessAllReadableRegions` below for an example of how + // to use this method while handling FIN correctly. + virtual PeekResult PeekNextReadableRegion() const = 0; + + // Equivalent to reading `bytes`, but does not perform any copying. `bytes` + // must be less than or equal to `ReadableBytes()`. The return value indicates + // if the FIN has been reached. `SkipBytes(0)` can be used to consume the FIN + // if it's the only thing remaining on the stream. + [[nodiscard]] virtual bool SkipBytes(size_t bytes) = 0; + + // Writes `data` into the stream. If the write succeeds, the ownership is + // transferred to the stream; if it does not, the behavior is undefined -- the + // users of this API should check `CanWrite()` before calling `Writev()`. + virtual absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data, + const StreamWriteOptions& options) = 0; + + // Indicates whether it is possible to write into stream right now. + virtual bool CanWrite() const = 0; + + // Legacy convenience method for writing a single string_view. New users + // should use quiche::SendFinOnStream instead, since this method does not + // return useful failure information. + [[nodiscard]] bool SendFin() { + StreamWriteOptions options; + options.set_send_fin(true); + return Writev(absl::Span<quiche::QuicheMemSlice>(), options).ok(); + } + + // Legacy convenience method for writing a single string_view. New users + // should use quiche::WriteIntoStream instead, since this method does not + // return useful failure information. + [[nodiscard]] bool Write(absl::string_view data) { + quiche::QuicheMemSlice slice = quiche::QuicheMemSlice::Copy(data); + return Writev(absl::MakeSpan(&slice, 1), kDefaultStreamWriteOptions).ok(); + } + // An ID that is unique within the session. Those are not exposed to the user // via the web API, but can be used internally for bookkeeping and // diagnostics.