Support zero-copy writes in WebTransport. This adds support for passing a span of MemSlices into a WebTransport stream instead of string_views; the semantics are the same as the QUIC stream writes. PiperOrigin-RevId: 777921033
diff --git a/quiche/common/quiche_stream.h b/quiche/common/quiche_stream.h index 5a8a1f7..c8718bd 100644 --- a/quiche/common/quiche_stream.h +++ b/quiche/common/quiche_stream.h
@@ -15,6 +15,7 @@ #include "absl/types/span.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/quiche_callbacks.h" +#include "quiche/common/quiche_mem_slice.h" namespace quiche { @@ -161,42 +162,51 @@ public: virtual ~WriteStream() {} - // Writes |data| into the stream. - virtual absl::Status Writev(absl::Span<const absl::string_view> data, + // Writes `data` into the stream. If the write succeeds, the ownership is + // transferred to the stream; if it does not, the behavior is undefined -- the + // users of this API should check `CanWrite()` before calling `Writev()`. + virtual absl::Status Writev(absl::Span<QuicheMemSlice> data, const StreamWriteOptions& options) = 0; // Indicates whether it is possible to write into stream right now. virtual bool CanWrite() const = 0; // Legacy convenience method for writing a single string_view. New users - // should use quiche::WriteIntoStream instead, since this method does not + // should use quiche::SendFinOnStream instead, since this method does not // return useful failure information. [[nodiscard]] bool SendFin() { StreamWriteOptions options; options.set_send_fin(true); - return Writev(absl::Span<const absl::string_view>(), options).ok(); + return Writev(absl::Span<QuicheMemSlice>(), options).ok(); } // Legacy convenience method for writing a single string_view. New users // should use quiche::WriteIntoStream instead, since this method does not // return useful failure information. [[nodiscard]] bool Write(absl::string_view data) { - return Writev(absl::MakeSpan(&data, 1), kDefaultStreamWriteOptions).ok(); + QuicheMemSlice slice = QuicheMemSlice::Copy(data); + return Writev(absl::MakeSpan(&slice, 1), kDefaultStreamWriteOptions).ok(); } }; // Convenience methods to write a single chunk of data into the stream. inline absl::Status WriteIntoStream( + WriteStream& stream, QuicheMemSlice slice, + const StreamWriteOptions& options = kDefaultStreamWriteOptions) { + return stream.Writev(absl::MakeSpan(&slice, 1), options); +} +inline absl::Status WriteIntoStream( WriteStream& stream, absl::string_view data, const StreamWriteOptions& options = kDefaultStreamWriteOptions) { - return stream.Writev(absl::MakeSpan(&data, 1), options); + QuicheMemSlice slice = QuicheMemSlice::Copy(data); + return stream.Writev(absl::MakeSpan(&slice, 1), options); } // Convenience methods to send a FIN on the stream. inline absl::Status SendFinOnStream(WriteStream& stream) { StreamWriteOptions options; options.set_send_fin(true); - return stream.Writev(absl::Span<const absl::string_view>(), options); + return stream.Writev(absl::Span<QuicheMemSlice>(), options); } } // namespace quiche
diff --git a/quiche/common/test_tools/mock_streams.h b/quiche/common/test_tools/mock_streams.h index 577a324..7290490 100644 --- a/quiche/common/test_tools/mock_streams.h +++ b/quiche/common/test_tools/mock_streams.h
@@ -14,6 +14,7 @@ #include "absl/strings/string_view.h" #include "absl/types/span.h" #include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" namespace quiche::test { @@ -24,22 +25,22 @@ MockWriteStream() { ON_CALL(*this, CanWrite()).WillByDefault(testing::Return(true)); ON_CALL(*this, Writev(testing::_, testing::_)) - .WillByDefault([&](absl::Span<const absl::string_view> data, + .WillByDefault([&](absl::Span<quiche::QuicheMemSlice> data, const StreamWriteOptions& options) { return AppendToData(data, options); }); } MOCK_METHOD(absl::Status, Writev, - (absl::Span<const absl::string_view> data, + (absl::Span<quiche::QuicheMemSlice> data, const StreamWriteOptions& options), (override)); MOCK_METHOD(bool, CanWrite, (), (const, override)); - absl::Status AppendToData(absl::Span<const absl::string_view> data, + absl::Status AppendToData(absl::Span<quiche::QuicheMemSlice> data, const StreamWriteOptions& options) { - for (absl::string_view fragment : data) { - data_.append(fragment.data(), fragment.size()); + for (const quiche::QuicheMemSlice& fragment : data) { + data_.append(fragment.data(), fragment.length()); } ProcessOptions(options); return absl::OkStatus();
diff --git a/quiche/quic/core/http/end_to_end_test.cc b/quiche/quic/core/http/end_to_end_test.cc index 87420ca..ac7c036 100644 --- a/quiche/quic/core/http/end_to_end_test.cc +++ b/quiche/quic/core/http/end_to_end_test.cc
@@ -116,6 +116,7 @@ #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/platform/api/quiche_reference_counted.h" #include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" #include "quiche/common/simple_buffer_allocator.h" #include "quiche/common/test_tools/quiche_test_utils.h" @@ -7651,11 +7652,13 @@ ASSERT_TRUE(stream != nullptr); // Test the full Writev() API. const std::string kLongString = std::string(16 * 1024, 'a'); - std::vector<absl::string_view> write_vector = {"foo", "bar", "test", - kLongString}; + std::array write_vector = {quiche::QuicheMemSlice::Copy("foo"), + quiche::QuicheMemSlice::Copy("bar"), + quiche::QuicheMemSlice::Copy("test"), + quiche::QuicheMemSlice::Copy(kLongString)}; quiche::StreamWriteOptions options; options.set_send_fin(true); - QUICHE_EXPECT_OK(stream->Writev(absl::MakeConstSpan(write_vector), options)); + QUICHE_EXPECT_OK(stream->Writev(absl::MakeSpan(write_vector), options)); std::string received_data = ReadDataFromWebTransportStreamUntilFin(stream); EXPECT_EQ(received_data, absl::StrCat("foobartest", kLongString));
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.cc b/quiche/quic/core/http/web_transport_stream_adapter.cc index 4c26f37..fe68dfa 100644 --- a/quiche/quic/core/http/web_transport_stream_adapter.cc +++ b/quiche/quic/core/http/web_transport_stream_adapter.cc
@@ -8,10 +8,8 @@ #include <limits> #include <optional> #include <string> -#include <utility> #include "absl/status/status.h" -#include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "absl/types/span.h" #include "quiche/quic/core/http/web_transport_http3.h" @@ -21,15 +19,13 @@ #include "quiche/quic/core/quic_stream_priority.h" #include "quiche/quic/core/quic_stream_sequencer.h" #include "quiche/quic/core/quic_types.h" +#include "quiche/quic/core/quic_utils.h" #include "quiche/quic/core/web_transport_interface.h" #include "quiche/quic/platform/api/quic_bug_tracker.h" -#include "quiche/quic/platform/api/quic_flags.h" #include "quiche/quic/platform/api/quic_logging.h" #include "quiche/common/platform/api/quiche_logging.h" -#include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" -#include "quiche/common/vectorized_io_utils.h" #include "quiche/web_transport/web_transport.h" namespace quic { @@ -69,7 +65,7 @@ } absl::Status WebTransportStreamAdapter::Writev( - absl::Span<const absl::string_view> data, + absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { if (data.empty() && !options.send_fin()) { return absl::InvalidArgumentError( @@ -82,21 +78,10 @@ return initial_check_status; } - size_t total_size = quiche::TotalStringViewSpanSize(data); - quiche::QuicheMemSlice slice; - if (total_size > 0) { - quiche::QuicheBuffer buffer( - session_->connection()->helper()->GetStreamSendBufferAllocator(), - total_size); - size_t bytes_copied = quiche::GatherStringViewSpan(data, buffer.AsSpan()); - QUICHE_DCHECK_EQ(total_size, bytes_copied); - slice = quiche::QuicheMemSlice(std::move(buffer)); - } + size_t total_size = MemSliceSpanTotalSize(data); QuicConsumedData consumed = stream_->WriteMemSlices( - slice.empty() ? absl::Span<quiche::QuicheMemSlice>() - : absl::MakeSpan(&slice, 1), - /*fin=*/options.send_fin(), - /*buffer_uncondtionally=*/options.buffer_unconditionally()); + data, /*fin=*/options.send_fin(), + /*buffer_unconditionally=*/options.buffer_unconditionally()); if (consumed.bytes_consumed == total_size) { return absl::OkStatus();
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.h b/quiche/quic/core/http/web_transport_stream_adapter.h index 94f3ba4..7107dd9 100644 --- a/quiche/quic/core/http/web_transport_stream_adapter.h +++ b/quiche/quic/core/http/web_transport_stream_adapter.h
@@ -23,6 +23,7 @@ #include "quiche/quic/core/quic_types.h" #include "quiche/quic/core/web_transport_interface.h" #include "quiche/common/platform/api/quiche_export.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" #include "quiche/web_transport/web_transport.h" @@ -39,7 +40,7 @@ // WebTransportStream implementation. ABSL_MUST_USE_RESULT ReadResult Read(absl::Span<char> output) override; ABSL_MUST_USE_RESULT ReadResult Read(std::string* output) override; - absl::Status Writev(absl::Span<const absl::string_view> data, + absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) override; bool CanWrite() const override; void AbruptlyTerminate(absl::Status error) override;
diff --git a/quiche/quic/moqt/moqt_probe_manager.cc b/quiche/quic/moqt/moqt_probe_manager.cc index c3745b3..c3d02ff 100644 --- a/quiche/quic/moqt/moqt_probe_manager.cc +++ b/quiche/quic/moqt/moqt_probe_manager.cc
@@ -18,6 +18,7 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" #include "quiche/common/wire_serialization.h" #include "quiche/web_transport/web_transport.h" @@ -80,7 +81,8 @@ while (stream_->CanWrite() && data_remaining_ > 0) { quic::QuicByteCount chunk_size = std::min(kWriteChunkSize, data_remaining_); - absl::string_view chunk(kZeroes, chunk_size); + quiche::QuicheMemSlice chunk( + kZeroes, chunk_size, +[](absl::string_view) {}); quiche::StreamWriteOptions options; options.set_send_fin(chunk_size == data_remaining_); absl::Status status = stream_->Writev(absl::MakeSpan(&chunk, 1), options);
diff --git a/quiche/quic/moqt/moqt_probe_manager_test.cc b/quiche/quic/moqt/moqt_probe_manager_test.cc index a60cfa2..9e41f9b 100644 --- a/quiche/quic/moqt/moqt_probe_manager_test.cc +++ b/quiche/quic/moqt/moqt_probe_manager_test.cc
@@ -19,6 +19,7 @@ #include "quiche/quic/test_tools/quic_test_utils.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" #include "quiche/web_transport/web_transport.h" @@ -47,11 +48,11 @@ MockStream(webtransport::StreamId id) : id_(id) {} webtransport::StreamId GetStreamId() const override { return id_; } - absl::Status Writev(absl::Span<const absl::string_view> data, + absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) override { QUICHE_CHECK(!fin_) << "FIN written twice."; - for (absl::string_view chunk : data) { - data_.append(chunk); + for (const quiche::QuicheMemSlice& slice : data) { + data_.append(slice.AsStringView()); } fin_ = options.send_fin(); return absl::OkStatus();
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 8f9f585..ef1450b 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -598,7 +598,8 @@ continue; } if (fetch->session_->WriteObjectToStream( - stream_, fetch->fetch_id_, object, + stream_, fetch->fetch_id_, object.metadata, + std::move(object.payload), MoqtDataStreamType::kStreamHeaderFetch, !stream_header_written_, /*fin=*/false)) { stream_header_written_ = true; @@ -1566,7 +1567,7 @@ // down the connection if we've buffered too many control messages; otherwise, // there is potential for memory exhaustion attacks. options.set_buffer_unconditionally(true); - std::array<absl::string_view, 1> write_vector = {message.AsStringView()}; + std::array write_vector = {quiche::QuicheMemSlice(std::move(message))}; absl::Status success = stream_->Writev(absl::MakeSpan(write_vector), options); if (!success.ok()) { session_->Error(MoqtError::kInternalError, @@ -2237,7 +2238,8 @@ } if (!session_->WriteObjectToStream( - stream_, subscription.track_alias(), *object, + stream_, subscription.track_alias(), object->metadata, + std::move(object->payload), MoqtDataStreamType::kStreamHeaderSubgroup, !stream_header_written_, object->fin_after_this)) { // WriteObjectToStream() closes the connection on error, meaning that @@ -2277,28 +2279,30 @@ } bool MoqtSession::WriteObjectToStream(webtransport::Stream* stream, uint64_t id, - const PublishedObject& object, + const PublishedObjectMetadata& metadata, + quiche::QuicheMemSlice payload, MoqtDataStreamType type, bool is_first_on_stream, bool fin) { QUICHE_DCHECK(stream->CanWrite()); MoqtObject header; header.track_alias = id; - header.group_id = object.metadata.location.group; - header.subgroup_id = object.metadata.subgroup; - header.object_id = object.metadata.location.object; - header.publisher_priority = object.metadata.publisher_priority; - header.object_status = object.metadata.status; - header.payload_length = object.payload.length(); + header.group_id = metadata.location.group; + header.subgroup_id = metadata.subgroup; + header.object_id = metadata.location.object; + header.publisher_priority = metadata.publisher_priority; + header.object_status = metadata.status; + header.payload_length = payload.length(); quiche::QuicheBuffer serialized_header = framer_.SerializeObjectHeader(header, type, is_first_on_stream); // TODO(vasilvv): add a version of WebTransport write API that accepts // memslices so that we can avoid a copy here. - std::array<absl::string_view, 2> write_vector = { - serialized_header.AsStringView(), object.payload.AsStringView()}; + std::array write_vector = { + quiche::QuicheMemSlice(std::move(serialized_header)), std::move(payload)}; quiche::StreamWriteOptions options; options.set_send_fin(fin); - absl::Status write_status = stream->Writev(write_vector, options); + absl::Status write_status = + stream->Writev(absl::MakeSpan(write_vector), options); if (!write_status.ok()) { QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed) << "Writing into MoQT stream failed despite CanWrite() being true " @@ -2309,7 +2313,7 @@ } QUIC_DVLOG(1) << "Stream " << stream->GetStreamId() << " successfully wrote " - << object.metadata.location << ", fin = " << fin; + << metadata.location << ", fin = " << fin; return true; }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 63cbab8..356a4eb 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -36,6 +36,7 @@ #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_callbacks.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_weak_ptr.h" #include "quiche/web_transport/web_transport.h" @@ -670,7 +671,8 @@ // and metadata in |object|. Not for use with datagrams. Returns |true| if // the write was successful. bool WriteObjectToStream(webtransport::Stream* stream, uint64_t id, - const PublishedObject& object, + const PublishedObjectMetadata& metadata, + quiche::QuicheMemSlice payload, MoqtDataStreamType type, bool is_first_on_stream, bool fin);
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 8131aa2..80b3520 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -1272,9 +1272,10 @@ bool correct_message = false; const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x00, 0x7f}; EXPECT_CALL(mock_stream_, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { - correct_message = absl::StartsWith(data[0], kExpectedMessage); + correct_message = + absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin |= options.send_fin(); return absl::OkStatus(); }); @@ -1324,9 +1325,10 @@ bool correct_message = false; const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x00, 0x7f}; EXPECT_CALL(mock_stream_, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { - correct_message = absl::StartsWith(data[0], kExpectedMessage); + correct_message = + absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin = options.send_fin(); return absl::OkStatus(); }); @@ -1374,9 +1376,10 @@ bool correct_message = false; const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x00, 0x7f}; EXPECT_CALL(mock_stream_, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { - correct_message = absl::StartsWith(data[0], kExpectedMessage); + correct_message = + absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin |= options.send_fin(); return absl::OkStatus(); }); @@ -1427,9 +1430,10 @@ bool correct_message = false; const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x00, 0x7f}; EXPECT_CALL(mock_stream_, Writev) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { - correct_message = absl::StartsWith(data[0], kExpectedMessage); + correct_message = + absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin = options.send_fin(); return absl::OkStatus(); }); @@ -1447,7 +1451,7 @@ EXPECT_FALSE(fin); fin = false; EXPECT_CALL(mock_stream_, Writev) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { EXPECT_TRUE(data.empty()); fin = options.send_fin(); @@ -1486,9 +1490,10 @@ bool correct_message = false; const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00}; EXPECT_CALL(mock_stream_, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { - correct_message = absl::StartsWith(data[0], kExpectedMessage); + correct_message = + absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin = options.send_fin(); return absl::OkStatus(); }); @@ -1527,9 +1532,10 @@ return std::optional<PublishedObject>(); }); EXPECT_CALL(mock_stream_, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { - correct_message = absl::StartsWith(data[0], kExpectedMessage2); + correct_message = + absl::StartsWith(data[0].AsStringView(), kExpectedMessage2); fin = options.send_fin(); return absl::OkStatus(); }); @@ -1568,9 +1574,10 @@ bool correct_message = false; const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00}; EXPECT_CALL(mock_stream_, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { - correct_message = absl::StartsWith(data[0], kExpectedMessage); + correct_message = + absl::StartsWith(data[0].AsStringView(), kExpectedMessage); fin = options.send_fin(); return absl::OkStatus(); }); @@ -2012,24 +2019,24 @@ EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream0, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { // The Group ID is the 3rd byte of the stream. - EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 0); + EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0); return absl::OkStatus(); }); EXPECT_CALL(mock_stream1, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { // The Group ID is the 3rd byte of the stream. - EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 1); + EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 1); return absl::OkStatus(); }); EXPECT_CALL(mock_stream2, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { // The Group ID is the 3rd byte of the stream. - EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 2); + EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 2); return absl::OkStatus(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -2114,12 +2121,12 @@ EXPECT_CALL(*track1, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream0, Writev) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { // Check track alias is 14. - EXPECT_EQ(static_cast<const uint8_t>(data[0][1]), 14); + EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[1]), 14); // Check Group ID is 0 - EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 0); + EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0); return absl::OkStatus(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -2150,12 +2157,12 @@ EXPECT_CALL(*track2, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream1, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { // Check track alias is 15. - EXPECT_EQ(static_cast<const uint8_t>(data[0][1]), 15); + EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[1]), 15); // Check Group ID is 0 - EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 0); + EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0); return absl::OkStatus(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -2207,9 +2214,9 @@ Invoke([=](PublishedObject& /*output*/) { return second_result; })); if (second_result == MoqtFetchTask::GetNextObjectResult::kEof) { EXPECT_CALL(data_stream, Writev) - .WillOnce(Invoke([](absl::Span<const absl::string_view> data, + .WillOnce(Invoke([](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { - quic::QuicDataReader reader(data[0]); + quic::QuicDataReader reader(data[0].AsStringView()); uint64_t type; EXPECT_TRUE(reader.ReadVarInt62(&type)); EXPECT_EQ(type, static_cast<uint64_t>( @@ -2217,7 +2224,7 @@ EXPECT_FALSE(options.send_fin()); // fin_after_this is ignored. return absl::OkStatus(); })) - .WillOnce(Invoke([](absl::Span<const absl::string_view> data, + .WillOnce(Invoke([](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { EXPECT_TRUE(data.empty()); EXPECT_TRUE(options.send_fin()); @@ -2226,9 +2233,9 @@ return; } EXPECT_CALL(data_stream, Writev) - .WillOnce(Invoke([](absl::Span<const absl::string_view> data, + .WillOnce(Invoke([](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { - quic::QuicDataReader reader(data[0]); + quic::QuicDataReader reader(data[0].AsStringView()); uint64_t type; EXPECT_TRUE(reader.ReadVarInt62(&type)); EXPECT_EQ(type, static_cast<uint64_t>( @@ -3460,10 +3467,10 @@ EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream_)); bool correct_message = false; EXPECT_CALL(mock_stream_, Writev(_, _)) - .WillOnce([&](absl::Span<const absl::string_view> data, + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { correct_message = true; - EXPECT_EQ(*ExtractMessageType(data[0]), + EXPECT_EQ(*ExtractMessageType(data[0].AsStringView()), MoqtMessageType::kSubscribeDone); return absl::OkStatus(); });
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.h b/quiche/quic/moqt/test_tools/moqt_framer_utils.h index 8e704f0..b008f2e 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.h +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
@@ -18,6 +18,7 @@ #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_data_reader.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" namespace moqt::test { @@ -45,13 +46,23 @@ MATCHER_P(SerializedControlMessage, message, "Matches against a specific expected MoQT message") { - std::string merged_message = absl::StrJoin(arg, ""); + std::vector<absl::string_view> data_written; + data_written.reserve(arg.size()); + for (const quiche::QuicheMemSlice& slice : arg) { + data_written.push_back(slice.AsStringView()); + } + std::string merged_message = absl::StrJoin(data_written, ""); return merged_message == SerializeGenericMessage(message); } MATCHER_P(ControlMessageOfType, expected_type, "Matches against an MoQT message of a specific type") { - std::string merged_message = absl::StrJoin(arg, ""); + std::vector<absl::string_view> data_written; + data_written.reserve(arg.size()); + for (const quiche::QuicheMemSlice& slice : arg) { + data_written.push_back(slice.AsStringView()); + } + std::string merged_message = absl::StrJoin(data_written, ""); quiche::QuicheDataReader reader(merged_message); uint64_t type_raw; if (!reader.ReadVarInt62(&type_raw)) {
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc index 7371945..5a00803 100644 --- a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc +++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
@@ -34,6 +34,7 @@ #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_callbacks.h" #include "quiche/common/quiche_circular_deque.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_status_utils.h" #include "quiche/common/quiche_stream.h" #include "quiche/common/vectorized_io_utils.h" @@ -257,9 +258,10 @@ // allows us to avoid a copy. quiche::QuicheBuffer buffer = quiche::SerializeDatagramCapsuleHeader(datagram.size(), allocator_); - std::array spans = {buffer.AsStringView(), datagram}; + std::array spans = {quiche::QuicheMemSlice(std::move(buffer)), + quiche::QuicheMemSlice::Copy(datagram)}; absl::Status write_status = - writer_->Writev(absl::MakeConstSpan(spans), quiche::StreamWriteOptions()); + writer_->Writev(absl::MakeSpan(spans), quiche::StreamWriteOptions()); if (!write_status.ok()) { OnWriteError(write_status); return DatagramStatus{ @@ -611,8 +613,15 @@ } absl::Status EncapsulatedSession::InnerStream::Writev( - const absl::Span<const absl::string_view> data, + const absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { + // TODO: support zero copy. + std::vector<absl::string_view> views; + views.reserve(data.size()); + for (const quiche::QuicheMemSlice& slice : data) { + views.push_back(slice.AsStringView()); + } + if (write_side_closed_) { return absl::FailedPreconditionError( "Trying to write into an already-closed stream"); @@ -636,7 +645,7 @@ !pending_write_.empty(); if (write_blocked) { fin_buffered_ = options.send_fin(); - for (absl::string_view chunk : data) { + for (absl::string_view chunk : views) { absl::StrAppend(&pending_write_, chunk); } absl::Status status = session_->scheduler_.Schedule(id_); @@ -648,12 +657,12 @@ return absl::OkStatus(); } - size_t bytes_written = WriteInner(data, options.send_fin()); + size_t bytes_written = WriteInner(views, options.send_fin()); // TODO: handle partial writes when flow control requires those. QUICHE_DCHECK(bytes_written == 0 || - bytes_written == quiche::TotalStringViewSpanSize(data)); + bytes_written == quiche::TotalStringViewSpanSize(views)); if (bytes_written == 0) { - for (absl::string_view chunk : data) { + for (absl::string_view chunk : views) { absl::StrAppend(&pending_write_, chunk); } } @@ -700,12 +709,15 @@ quiche::QuicheBuffer header = quiche::SerializeWebTransportStreamCapsuleHeader(id_, fin, total_size, session_->allocator_); - std::vector<absl::string_view> views_to_write; + std::vector<quiche::QuicheMemSlice> views_to_write; views_to_write.reserve(data.size() + 1); - views_to_write.push_back(header.AsStringView()); - absl::c_copy(data, std::back_inserter(views_to_write)); + views_to_write.push_back(quiche::QuicheMemSlice(std::move(header))); + for (absl::string_view view : data) { + // TODO: support zero copy. + views_to_write.push_back(quiche::QuicheMemSlice::Copy(view)); + } absl::Status write_status = session_->writer_->Writev( - views_to_write, quiche::kDefaultStreamWriteOptions); + absl::MakeSpan(views_to_write), quiche::kDefaultStreamWriteOptions); if (!write_status.ok()) { session_->OnWriteError(write_status); return 0;
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.h b/quiche/web_transport/encapsulated/encapsulated_web_transport.h index e4e7f7a..44f489a 100644 --- a/quiche/web_transport/encapsulated/encapsulated_web_transport.h +++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
@@ -25,6 +25,7 @@ #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_callbacks.h" #include "quiche/common/quiche_circular_deque.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" #include "quiche/common/simple_buffer_allocator.h" #include "quiche/web_transport/web_transport.h" @@ -145,7 +146,7 @@ bool SkipBytes(size_t bytes) override; // WriteStream implementation. - absl::Status Writev(absl::Span<const absl::string_view> data, + absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) override; bool CanWrite() const override;
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc index 8c0c79f..6f27bc6 100644 --- a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc +++ b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
@@ -17,6 +17,7 @@ #include "quiche/common/http/http_header_block.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_buffer_allocator.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" #include "quiche/common/simple_buffer_allocator.h" #include "quiche/common/test_tools/mock_streams.h" @@ -46,10 +47,10 @@ ADD_FAILURE() << "Fatal session error: " << error; }); ON_CALL(writer_, Writev(_, _)) - .WillByDefault([&](absl::Span<const absl::string_view> data, + .WillByDefault([&](absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) { - for (absl::string_view fragment : data) { - parser_.IngestCapsuleFragment(fragment); + for (const quiche::QuicheMemSlice& fragment : data) { + parser_.IngestCapsuleFragment(fragment.AsStringView()); } writer_.ProcessOptions(options); return absl::OkStatus(); @@ -553,7 +554,7 @@ quiche::StreamWriteOptions options; options.set_send_fin(true); - EXPECT_THAT(stream->Writev(absl::Span<const absl::string_view>(), options), + EXPECT_THAT(stream->Writev(absl::Span<quiche::QuicheMemSlice>(), options), StatusIs(absl::StatusCode::kOk)); session->GarbageCollectStreams(); EXPECT_TRUE(deleted); @@ -619,7 +620,7 @@ options.set_send_fin(true); EXPECT_TRUE(stream->CanWrite()); absl::Status status = - stream->Writev(absl::Span<const absl::string_view>(), options); + stream->Writev(absl::Span<quiche::QuicheMemSlice>(), options); EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk)); EXPECT_FALSE(stream->CanWrite()); }
diff --git a/quiche/web_transport/test_tools/in_memory_stream.h b/quiche/web_transport/test_tools/in_memory_stream.h index 48da99a..078ca94 100644 --- a/quiche/web_transport/test_tools/in_memory_stream.h +++ b/quiche/web_transport/test_tools/in_memory_stream.h
@@ -15,6 +15,7 @@ #include "absl/strings/string_view.h" #include "absl/types/span.h" #include "quiche/common/platform/api/quiche_logging.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" #include "quiche/web_transport/web_transport.h" @@ -34,7 +35,7 @@ bool SkipBytes(size_t bytes) override; // quiche::WriteStream implementation. - absl::Status Writev(absl::Span<const absl::string_view> data, + absl::Status Writev(absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options) override { QUICHE_NOTREACHED() << "Writev called on a read-only stream"; return absl::UnimplementedError("Writev called on a read-only stream");
diff --git a/quiche/web_transport/test_tools/mock_web_transport.h b/quiche/web_transport/test_tools/mock_web_transport.h index 656c25c..77b1595 100644 --- a/quiche/web_transport/test_tools/mock_web_transport.h +++ b/quiche/web_transport/test_tools/mock_web_transport.h
@@ -20,6 +20,7 @@ #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_callbacks.h" +#include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_stream.h" #include "quiche/web_transport/web_transport.h" @@ -40,7 +41,7 @@ MOCK_METHOD(ReadResult, Read, (absl::Span<char> buffer), (override)); MOCK_METHOD(ReadResult, Read, (std::string * output), (override)); MOCK_METHOD(absl::Status, Writev, - (absl::Span<const absl::string_view> data, + (absl::Span<quiche::QuicheMemSlice> data, const quiche::StreamWriteOptions& options), (override)); MOCK_METHOD(PeekResult, PeekNextReadableRegion, (), (const, override));