Add an option to buffer data unconditionally for WebTransport. This is useful for applications that would buffer data regardless on its own layer (e.g. control messages in HTTP/3 or MoQT). PiperOrigin-RevId: 602729685
diff --git a/quiche/common/quiche_stream.h b/quiche/common/quiche_stream.h index ca1ef7b..187e362 100644 --- a/quiche/common/quiche_stream.h +++ b/quiche/common/quiche_stream.h
@@ -131,13 +131,21 @@ public: StreamWriteOptions() = default; - // If send_fin() is sent to true, the write operation also sends a FIN on the + // If send_fin() is set to true, the write operation also sends a FIN on the // stream. bool send_fin() const { return send_fin_; } void set_send_fin(bool send_fin) { send_fin_ = send_fin; } + // If buffer_unconditionally() is set to true, the write operation will buffer + // data even if the internal buffer limit is exceeded. + bool buffer_unconditionally() const { return buffer_unconditionally_; } + void set_buffer_unconditionally(bool value) { + buffer_unconditionally_ = value; + } + private: bool send_fin_ = false; + bool buffer_unconditionally_ = false; }; inline constexpr StreamWriteOptions kDefaultStreamWriteOptions =
diff --git a/quiche/quic/core/http/web_transport_stream_adapter.cc b/quiche/quic/core/http/web_transport_stream_adapter.cc index ff2faba..de21723 100644 --- a/quiche/quic/core/http/web_transport_stream_adapter.cc +++ b/quiche/quic/core/http/web_transport_stream_adapter.cc
@@ -65,7 +65,9 @@ "Writev() called without any data or a FIN"); } const absl::Status initial_check_status = CheckBeforeStreamWrite(); - if (!initial_check_status.ok()) { + if (!initial_check_status.ok() && + !(initial_check_status.code() == absl::StatusCode::kUnavailable && + options.buffer_unconditionally())) { return initial_check_status; } @@ -82,8 +84,9 @@ iovecs.data(), iovecs.size(), session_->connection()->helper()->GetStreamSendBufferAllocator(), GetQuicFlag(quic_send_buffer_max_data_slice_size)); - QuicConsumedData consumed = - stream_->WriteMemSlices(storage.ToSpan(), /*fin=*/options.send_fin()); + QuicConsumedData consumed = stream_->WriteMemSlices( + storage.ToSpan(), /*fin=*/options.send_fin(), + /*buffer_uncondtionally=*/options.buffer_unconditionally()); if (consumed.bytes_consumed == total_size) { return absl::OkStatus();
diff --git a/quiche/quic/core/quic_generic_session_test.cc b/quiche/quic/core/quic_generic_session_test.cc index 52812e9..ff32c4d 100644 --- a/quiche/quic/core/quic_generic_session_test.cc +++ b/quiche/quic/core/quic_generic_session_test.cc
@@ -23,6 +23,7 @@ #include "quiche/quic/core/quic_constants.h" #include "quiche/quic/core/quic_datagram_queue.h" #include "quiche/quic/core/quic_error_codes.h" +#include "quiche/quic/core/quic_stream.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/core/web_transport_interface.h" #include "quiche/quic/platform/api/quic_test.h" @@ -42,6 +43,7 @@ enum ServerType { kDiscardServer, kEchoServer }; +using quiche::test::StatusIs; using simulator::Simulator; using testing::_; using testing::Assign; @@ -467,5 +469,44 @@ EXPECT_EQ(received + client_lost + server_lost, 1000u); } +TEST_F(QuicGenericSessionTest, WriteWhenBufferFull) { + CreateDefaultEndpoints(kEchoServer); + WireUpEndpoints(); + RunHandshake(); + + const std::string buffer(64 * 1024 + 1, 'q'); + webtransport::Stream* stream = + client_->session()->OpenOutgoingBidirectionalStream(); + ASSERT_TRUE(stream != nullptr); + + ASSERT_TRUE(stream->CanWrite()); + absl::Status status = quiche::WriteIntoStream(*stream, buffer); + QUICHE_EXPECT_OK(status); + EXPECT_FALSE(stream->CanWrite()); + + status = quiche::WriteIntoStream(*stream, buffer); + EXPECT_THAT(status, StatusIs(absl::StatusCode::kUnavailable)); + + quiche::StreamWriteOptions options; + options.set_buffer_unconditionally(true); + options.set_send_fin(true); + status = quiche::WriteIntoStream(*stream, buffer, options); + QUICHE_EXPECT_OK(status); + EXPECT_FALSE(stream->CanWrite()); + + QuicByteCount total_received = 0; + for (;;) { + test_harness_.RunUntilWithDefaultTimeout( + [&] { return stream->PeekNextReadableRegion().has_data(); }); + quiche::ReadStream::PeekResult result = stream->PeekNextReadableRegion(); + total_received += result.peeked_data.size(); + bool fin_consumed = stream->SkipBytes(result.peeked_data.size()); + if (fin_consumed) { + break; + } + } + EXPECT_EQ(total_received, 128u * 1024u + 2); +} + } // namespace } // namespace quic::test
diff --git a/quiche/quic/core/quic_stream.cc b/quiche/quic/core/quic_stream.cc index 1db3cd3..4de14d7 100644 --- a/quiche/quic/core/quic_stream.cc +++ b/quiche/quic/core/quic_stream.cc
@@ -763,7 +763,8 @@ } QuicConsumedData QuicStream::WriteMemSlices( - absl::Span<quiche::QuicheMemSlice> span, bool fin) { + absl::Span<quiche::QuicheMemSlice> span, bool fin, + bool buffer_unconditionally) { QuicConsumedData consumed_data(0, false); if (span.empty() && !fin) { QUIC_BUG(quic_bug_10586_6) << "span.empty() && !fin"; @@ -786,7 +787,7 @@ } bool had_buffered_data = HasBufferedData(); - if (CanWriteNewData() || span.empty()) { + if (CanWriteNewData() || span.empty() || buffer_unconditionally) { consumed_data.fin_consumed = fin; if (!span.empty()) { // Buffer all data if buffered data size is below limit.
diff --git a/quiche/quic/core/quic_stream.h b/quiche/quic/core/quic_stream.h index b7ea9e7..c846fe0 100644 --- a/quiche/quic/core/quic_stream.h +++ b/quiche/quic/core/quic_stream.h
@@ -353,9 +353,10 @@ // Commits data into the stream write buffer, and potentially sends it over // the wire. This method has all-or-nothing semantics: if the write buffer is // not full, all of the memslices in |span| are moved into it; otherwise, - // nothing happens. + // nothing happens. If `buffer_unconditionally` is set to true, behaves + // similar to `WriteOrBufferData()` in terms of buffering. QuicConsumedData WriteMemSlices(absl::Span<quiche::QuicheMemSlice> span, - bool fin); + bool fin, bool buffer_uncondtionally = false); QuicConsumedData WriteMemSlice(quiche::QuicheMemSlice span, bool fin); // Returns true if any stream data is lost (including fin) and needs to be