In WebTransport over HTTP/2, implement basic stream support.
This code supports ~all stream logic that does not involve flow control.
PiperOrigin-RevId: 599530029
diff --git a/quiche/common/capsule.cc b/quiche/common/capsule.cc
index 421f11d..6cfcd4c 100644
--- a/quiche/common/capsule.cc
+++ b/quiche/common/capsule.cc
@@ -20,6 +20,7 @@
#include "absl/types/span.h"
#include "absl/types/variant.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_data_reader.h"
@@ -366,6 +367,21 @@
return *std::move(buffer);
}
+QUICHE_EXPORT QuicheBuffer SerializeWebTransportStreamCapsuleHeader(
+ webtransport::StreamId stream_id, bool fin, uint64_t write_size,
+ QuicheBufferAllocator* allocator) {
+ absl::StatusOr<QuicheBuffer> buffer = SerializeIntoBuffer(
+ allocator,
+ WireVarInt62(fin ? CapsuleType::WT_STREAM_WITH_FIN
+ : CapsuleType::WT_STREAM),
+ WireVarInt62(write_size + QuicheDataWriter::GetVarInt62Len(stream_id)),
+ WireVarInt62(stream_id));
+ if (!buffer.ok()) {
+ return QuicheBuffer();
+ }
+ return *std::move(buffer);
+}
+
QuicheBuffer SerializeCapsule(const Capsule& capsule,
quiche::QuicheBufferAllocator* allocator) {
absl::StatusOr<QuicheBuffer> serialized =
diff --git a/quiche/common/capsule.h b/quiche/common/capsule.h
index 3cadc4d..aa42530 100644
--- a/quiche/common/capsule.h
+++ b/quiche/common/capsule.h
@@ -395,6 +395,11 @@
QUICHE_EXPORT QuicheBuffer SerializeDatagramCapsuleHeader(
uint64_t datagram_size, QuicheBufferAllocator* allocator);
+// Serializes the header for a WT_STREAM or a WT_STREAM_WITH_FIN capsule.
+QUICHE_EXPORT QuicheBuffer SerializeWebTransportStreamCapsuleHeader(
+ webtransport::StreamId stream_id, bool fin, uint64_t write_size,
+ QuicheBufferAllocator* allocator);
+
} // namespace quiche
#endif // QUICHE_COMMON_CAPSULE_H_
diff --git a/quiche/common/capsule_test.cc b/quiche/common/capsule_test.cc
index 0aed714..fd07608 100644
--- a/quiche/common/capsule_test.cc
+++ b/quiche/common/capsule_test.cc
@@ -297,6 +297,19 @@
ValidateParserIsEmpty();
TestSerialization(expected_capsule, capsule_fragment);
}
+TEST_F(CapsuleTest, WebTransportStreamDataHeader) {
+ std::string capsule_fragment = absl::HexStringToBytes(
+ "990b4d3b" // WT_STREAM without FIN
+ "04" // capsule length
+ "17" // stream ID
+ // three bytes of stream payload implied below
+ );
+ QuicheBufferAllocator* allocator = SimpleBufferAllocator::Get();
+ QuicheBuffer capsule_header =
+ quiche::SerializeWebTransportStreamCapsuleHeader(0x17, /*fin=*/false, 3,
+ allocator);
+ EXPECT_EQ(capsule_header.AsStringView(), capsule_fragment);
+}
TEST_F(CapsuleTest, WebTransportStreamDataWithFin) {
std::string capsule_fragment = absl::HexStringToBytes(
"990b4d3c" // data with FIN
diff --git a/quiche/common/quiche_stream.h b/quiche/common/quiche_stream.h
index 9de876b..ca1ef7b 100644
--- a/quiche/common/quiche_stream.h
+++ b/quiche/common/quiche_stream.h
@@ -191,6 +191,15 @@
return stream.Writev(absl::Span<const absl::string_view>(), options);
}
+inline size_t TotalStringViewSpanSize(
+ absl::Span<const absl::string_view> span) {
+ size_t total = 0;
+ for (absl::string_view view : span) {
+ total += view.size();
+ }
+ return total;
+}
+
} // namespace quiche
#endif // QUICHE_COMMON_QUICHE_STREAM_H_
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
index 0aa3763..adbbb53 100644
--- a/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.cc
@@ -4,22 +4,36 @@
#include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
-#include <array>
-#include <cstdint>
-#include <memory>
-#include <string>
-#include <utility>
+#include <stdbool.h>
+#include <algorithm>
+#include <array>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <iterator>
+#include <memory>
+#include <optional>
+#include <string>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "absl/container/node_hash_map.h"
#include "absl/status/status.h"
+#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "quiche/common/capsule.h"
#include "quiche/common/http/http_header_block.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_callbacks.h"
+#include "quiche/common/quiche_circular_deque.h"
#include "quiche/common/quiche_status_utils.h"
#include "quiche/common/quiche_stream.h"
#include "quiche/web_transport/web_transport.h"
@@ -36,13 +50,20 @@
// over TCP.
constexpr uint64_t kEncapsulatedMaxDatagramSize = 9000;
+constexpr StreamPriority kDefaultPriority = StreamPriority{0, 0};
+
} // namespace
EncapsulatedSession::EncapsulatedSession(
Perspective perspective, FatalErrorCallback fatal_error_callback)
: perspective_(perspective),
fatal_error_callback_(std::move(fatal_error_callback)),
- capsule_parser_(this) {}
+ capsule_parser_(this),
+ next_outgoing_bidi_stream_(perspective == Perspective::kClient ? 0 : 1),
+ next_outgoing_unidi_stream_(perspective == Perspective::kClient ? 2 : 3) {
+ QUICHE_DCHECK(IsIdOpenedBy(next_outgoing_bidi_stream_, perspective));
+ QUICHE_DCHECK(IsIdOpenedBy(next_outgoing_unidi_stream_, perspective));
+}
void EncapsulatedSession::InitializeClient(
std::unique_ptr<SessionVisitor> visitor,
@@ -115,26 +136,66 @@
OnCanWrite();
}
-Stream* EncapsulatedSession::AcceptIncomingBidirectionalStream() {
- return nullptr;
-}
-Stream* EncapsulatedSession::AcceptIncomingUnidirectionalStream() {
- return nullptr;
-}
-bool EncapsulatedSession::CanOpenNextOutgoingBidirectionalStream() {
- return false;
-}
-bool EncapsulatedSession::CanOpenNextOutgoingUnidirectionalStream() {
- return false;
-}
-Stream* EncapsulatedSession::OpenOutgoingBidirectionalStream() {
- return nullptr;
-}
-Stream* EncapsulatedSession::OpenOutgoingUnidirectionalStream() {
+Stream* EncapsulatedSession::AcceptIncomingStream(
+ quiche::QuicheCircularDeque<StreamId>& queue) {
+ while (!queue.empty()) {
+ StreamId id = queue.front();
+ queue.pop_front();
+ Stream* stream = GetStreamById(id);
+ if (stream == nullptr) {
+ // Stream got reset and garbage collected before the peer ever had a
+ // chance to look at it.
+ continue;
+ }
+ return stream;
+ }
return nullptr;
}
-Stream* EncapsulatedSession::GetStreamById(StreamId /*id*/) { return nullptr; }
+Stream* EncapsulatedSession::AcceptIncomingBidirectionalStream() {
+ return AcceptIncomingStream(incoming_bidirectional_streams_);
+}
+Stream* EncapsulatedSession::AcceptIncomingUnidirectionalStream() {
+ return AcceptIncomingStream(incoming_unidirectional_streams_);
+}
+bool EncapsulatedSession::CanOpenNextOutgoingBidirectionalStream() {
+ // TODO: implement flow control.
+ return true;
+}
+bool EncapsulatedSession::CanOpenNextOutgoingUnidirectionalStream() {
+ // TODO: implement flow control.
+ return true;
+}
+Stream* EncapsulatedSession::OpenOutgoingStream(StreamId& counter) {
+ StreamId stream_id = counter;
+ counter += 4;
+ auto [it, inserted] = streams_.emplace(
+ std::piecewise_construct, std::forward_as_tuple(stream_id),
+ std::forward_as_tuple(this, stream_id));
+ QUICHE_DCHECK(inserted);
+ return &it->second;
+}
+Stream* EncapsulatedSession::OpenOutgoingBidirectionalStream() {
+ if (!CanOpenNextOutgoingBidirectionalStream()) {
+ return nullptr;
+ }
+ return OpenOutgoingStream(next_outgoing_bidi_stream_);
+}
+Stream* EncapsulatedSession::OpenOutgoingUnidirectionalStream() {
+ if (!CanOpenNextOutgoingUnidirectionalStream()) {
+ return nullptr;
+ }
+ return OpenOutgoingStream(next_outgoing_unidi_stream_);
+}
+
+Stream* EncapsulatedSession::GetStreamById(StreamId id) {
+ auto it = streams_.find(id);
+ if (it == streams_.end()) {
+ return nullptr;
+ }
+ return &it->second;
+}
+
DatagramStats EncapsulatedSession::GetDatagramStats() {
DatagramStats stats;
stats.expired_outgoing = 0;
@@ -149,8 +210,7 @@
}
void EncapsulatedSession::NotifySessionDraining() {
- control_capsule_queue_.push_back(quiche::SerializeCapsule(
- quiche::Capsule(quiche::DrainWebTransportSessionCapsule()), allocator_));
+ SendControlCapsule(quiche::DrainWebTransportSessionCapsule());
OnCanWrite();
}
void EncapsulatedSession::SetOnDraining(
@@ -256,7 +316,21 @@
control_capsule_queue_.pop_front();
}
- // TODO(b/264263113): send stream data.
+ while (writer_->CanWrite()) {
+ absl::StatusOr<StreamId> next_id = scheduler_.PopFront();
+ if (!next_id.ok()) {
+ QUICHE_DCHECK_EQ(next_id.status().code(), absl::StatusCode::kNotFound);
+ return;
+ }
+ auto it = streams_.find(*next_id);
+ if (it == streams_.end()) {
+ QUICHE_BUG(WT_H2_NextStreamNotInTheMap);
+ OnFatalError("Next scheduled stream is not in the map");
+ return;
+ }
+ QUICHE_DCHECK(it->second.HasPendingWrite());
+ it->second.FlushPendingWrite();
+ }
}
void EncapsulatedSession::OnCanRead() {
@@ -271,6 +345,9 @@
capsule_parser_.ErrorIfThereIsRemainingBufferedData();
OnSessionClosed(0, "");
}
+ if (state_ == kSessionOpen) {
+ GarbageCollectStreams();
+ }
}
bool EncapsulatedSession::OnCapsule(const quiche::Capsule& capsule) {
@@ -290,17 +367,114 @@
std::string(
capsule.close_web_transport_session_capsule().error_message));
break;
+ case CapsuleType::WT_STREAM:
+ case CapsuleType::WT_STREAM_WITH_FIN:
+ ProcessStreamCapsule(capsule,
+ capsule.web_transport_stream_data().stream_id);
+ break;
+ case CapsuleType::WT_RESET_STREAM:
+ ProcessStreamCapsule(capsule,
+ capsule.web_transport_reset_stream().stream_id);
+ break;
+ case CapsuleType::WT_STOP_SENDING:
+ ProcessStreamCapsule(capsule,
+ capsule.web_transport_stop_sending().stream_id);
+ break;
default:
break;
}
- return true;
+ return state_ != kSessionClosed;
}
void EncapsulatedSession::OnCapsuleParseFailure(
absl::string_view error_message) {
+ if (state_ == kSessionClosed) {
+ return;
+ }
OnFatalError(absl::StrCat("Stream parse error: ", error_message));
}
+void EncapsulatedSession::ProcessStreamCapsule(const quiche::Capsule& capsule,
+ StreamId stream_id) {
+ bool new_stream_created = false;
+ auto it = streams_.find(stream_id);
+ if (it == streams_.end()) {
+ if (IsOutgoing(stream_id)) {
+ // Ignore this frame, as it is possible that it refers to an outgoing
+ // stream that has been closed.
+ return;
+ }
+ // TODO: check flow control here.
+ it = streams_.emplace_hint(it, std::piecewise_construct,
+ std::forward_as_tuple(stream_id),
+ std::forward_as_tuple(this, stream_id));
+ new_stream_created = true;
+ }
+ InnerStream& stream = it->second;
+ stream.ProcessCapsule(capsule);
+ if (new_stream_created) {
+ if (IsBidirectionalId(stream_id)) {
+ incoming_bidirectional_streams_.push_back(stream_id);
+ visitor_->OnIncomingBidirectionalStreamAvailable();
+ } else {
+ incoming_unidirectional_streams_.push_back(stream_id);
+ visitor_->OnIncomingUnidirectionalStreamAvailable();
+ }
+ }
+}
+
+void EncapsulatedSession::InnerStream::ProcessCapsule(
+ const quiche::Capsule& capsule) {
+ switch (capsule.capsule_type()) {
+ case CapsuleType::WT_STREAM:
+ case CapsuleType::WT_STREAM_WITH_FIN: {
+ if (fin_received_) {
+ session_->OnFatalError(
+ "Received stream data for a stream that has already received a "
+ "FIN");
+ return;
+ }
+ if (read_side_closed_) {
+ // It is possible that we sent STOP_SENDING but it has not been received
+ // yet. Ignore.
+ return;
+ }
+ fin_received_ = capsule.capsule_type() == CapsuleType::WT_STREAM_WITH_FIN;
+ const quiche::WebTransportStreamDataCapsule& data =
+ capsule.web_transport_stream_data();
+ if (!data.data.empty()) {
+ incoming_reads_.push_back(IncomingRead{data.data, std::string()});
+ }
+ // Fast path: if the visitor consumes all of the incoming reads, we don't
+ // need to copy data from the capsule parser.
+ if (visitor_ != nullptr) {
+ visitor_->OnCanRead();
+ }
+ // Slow path: copy all data that the visitor have not consumed.
+ for (IncomingRead& read : incoming_reads_) {
+ QUICHE_DCHECK(!read.data.empty());
+ if (read.storage.empty()) {
+ read.storage = std::string(read.data);
+ read.data = read.storage;
+ }
+ }
+ return;
+ }
+ case CapsuleType::WT_RESET_STREAM:
+ CloseReadSide(capsule.web_transport_reset_stream().error_code);
+ return;
+ case CapsuleType::WT_STOP_SENDING:
+ CloseWriteSide(capsule.web_transport_stop_sending().error_code);
+ return;
+ default:
+ QUICHE_BUG(WT_H2_ProcessStreamCapsule_Unknown)
+ << "Unexpected capsule dispatched to InnerStream: " << capsule;
+ session_->OnFatalError(
+ "Internal error: Unexpected capsule dispatched to InnerStream");
+ return;
+ }
+}
+
void EncapsulatedSession::OpenSession() {
state_ = kSessionOpen;
visitor_->OnSessionReady();
@@ -344,6 +518,7 @@
state_ = kSessionClosed;
if (fatal_error_callback_) {
std::move(fatal_error_callback_)(error_message);
+ fatal_error_callback_ = nullptr;
}
}
@@ -352,4 +527,260 @@
error, " while trying to write encapsulated WebTransport data"));
}
+EncapsulatedSession::InnerStream::InnerStream(EncapsulatedSession* session,
+ StreamId id)
+ : session_(session),
+ id_(id),
+ read_side_closed_(IsUnidirectionalId(id) &&
+ IsIdOpenedBy(id, session->perspective_)),
+ write_side_closed_(IsUnidirectionalId(id) &&
+ !IsIdOpenedBy(id, session->perspective_)) {
+ if (!write_side_closed_) {
+ absl::Status status = session_->scheduler_.Register(id_, kDefaultPriority);
+ if (!status.ok()) {
+ QUICHE_BUG(WT_H2_FailedToRegisterNewStream) << status;
+ session_->OnFatalError(
+ "Failed to register new stream with the scheduler");
+ return;
+ }
+ }
+}
+
+quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read(
+ absl::Span<char> output) {
+ const size_t total_size = output.size();
+ for (const IncomingRead& read : incoming_reads_) {
+ size_t size_to_read = std::min(read.size(), output.size());
+ if (size_to_read == 0) {
+ break;
+ }
+ memcpy(output.data(), read.data.data(), size_to_read);
+ output = output.subspan(size_to_read);
+ }
+ bool fin_consumed = SkipBytes(total_size);
+ return ReadResult{total_size, fin_consumed};
+}
+quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read(
+ std::string* output) {
+ const size_t total_size = ReadableBytes();
+ const size_t initial_offset = output->size();
+ output->resize(initial_offset + total_size);
+ return Read(absl::Span<char>(&((*output)[initial_offset]), total_size));
+}
+size_t EncapsulatedSession::InnerStream::ReadableBytes() const {
+ size_t total_size = 0;
+ for (const IncomingRead& read : incoming_reads_) {
+ total_size += read.size();
+ }
+ return total_size;
+}
+quiche::ReadStream::PeekResult
+EncapsulatedSession::InnerStream::PeekNextReadableRegion() const {
+ if (incoming_reads_.empty()) {
+ return PeekResult{absl::string_view(), fin_received_, fin_received_};
+ }
+ return PeekResult{incoming_reads_.front().data,
+ fin_received_ && incoming_reads_.size() == 1,
+ fin_received_};
+}
+
+bool EncapsulatedSession::InnerStream::SkipBytes(size_t bytes) {
+ size_t remaining = bytes;
+ while (remaining > 0) {
+ if (incoming_reads_.empty()) {
+ QUICHE_BUG(WT_H2_SkipBytes_toomuch)
+ << "Requested to skip " << remaining
+ << " bytes that are not present in the read buffer.";
+ return false;
+ }
+ IncomingRead& current = incoming_reads_.front();
+ if (remaining < current.size()) {
+ current.data = current.data.substr(remaining);
+ return false;
+ }
+ remaining -= current.size();
+ incoming_reads_.pop_front();
+ }
+ if (incoming_reads_.empty() && fin_received_) {
+ fin_consumed_ = true;
+ CloseReadSide(std::nullopt);
+ return true;
+ }
+ return false;
+}
+
+absl::Status EncapsulatedSession::InnerStream::Writev(
+ const absl::Span<const absl::string_view> data,
+ const quiche::StreamWriteOptions& options) {
+ if (write_side_closed_) {
+ return absl::FailedPreconditionError(
+ "Trying to write into an already-closed stream");
+ }
+ if (fin_buffered_) {
+ return absl::FailedPreconditionError("FIN already buffered");
+ }
+ if (!CanWrite()) {
+ return absl::FailedPreconditionError(
+ "Trying to write into a stream when CanWrite() = false");
+ }
+
+ const absl::StatusOr<bool> should_yield =
+ session_->scheduler_.ShouldYield(id_);
+ if (!should_yield.ok()) {
+ QUICHE_BUG(WT_H2_Writev_NotRegistered) << should_yield.status();
+ session_->OnFatalError("Stream not registered with the scheduler");
+ return absl::InternalError("Stream not registered with the scheduler");
+ }
+ const bool write_blocked = !session_->writer_->CanWrite() || *should_yield ||
+ !pending_write_.empty();
+ if (write_blocked) {
+ fin_buffered_ = options.send_fin();
+ for (absl::string_view chunk : data) {
+ absl::StrAppend(&pending_write_, chunk);
+ }
+ absl::Status status = session_->scheduler_.Schedule(id_);
+ if (!status.ok()) {
+ QUICHE_BUG(WT_H2_Writev_CantSchedule) << status;
+ session_->OnFatalError("Could not schedule a write-blocked stream");
+ return absl::InternalError("Could not schedule a write-blocked stream");
+ }
+ return absl::OkStatus();
+ }
+
+ size_t bytes_written = WriteInner(data, options.send_fin());
+ // TODO: handle partial writes when flow control requires those.
+ QUICHE_DCHECK(bytes_written == 0 ||
+ bytes_written == quiche::TotalStringViewSpanSize(data));
+ if (bytes_written == 0) {
+ for (absl::string_view chunk : data) {
+ absl::StrAppend(&pending_write_, chunk);
+ }
+ }
+
+ if (options.send_fin()) {
+ CloseWriteSide(std::nullopt);
+ }
+ return absl::OkStatus();
+}
+
+bool EncapsulatedSession::InnerStream::CanWrite() const {
+ return session_->state_ != EncapsulatedSession::kSessionClosed &&
+ !write_side_closed_ &&
+ (pending_write_.size() <= session_->max_stream_data_buffered_);
+}
+
+void EncapsulatedSession::InnerStream::FlushPendingWrite() {
+ QUICHE_DCHECK(!write_side_closed_);
+ QUICHE_DCHECK(session_->writer_->CanWrite());
+ QUICHE_DCHECK(!pending_write_.empty());
+ absl::string_view to_write = pending_write_;
+ size_t bytes_written =
+ WriteInner(absl::MakeSpan(&to_write, 1), fin_buffered_);
+ if (bytes_written < to_write.size()) {
+ pending_write_ = pending_write_.substr(bytes_written);
+ return;
+ }
+ pending_write_.clear();
+ if (fin_buffered_) {
+ CloseWriteSide(std::nullopt);
+ }
+ if (!write_side_closed_ && visitor_ != nullptr) {
+ visitor_->OnCanWrite();
+ }
+}
+
+size_t EncapsulatedSession::InnerStream::WriteInner(
+ absl::Span<const absl::string_view> data, bool fin) {
+ size_t total_size = quiche::TotalStringViewSpanSize(data);
+ if (total_size == 0 && !fin) {
+ session_->OnFatalError("Attempted to make an empty write with fin=false");
+ return 0;
+ }
+ quiche::QuicheBuffer header =
+ quiche::SerializeWebTransportStreamCapsuleHeader(id_, fin, total_size,
+ session_->allocator_);
+ std::vector<absl::string_view> views_to_write;
+ views_to_write.reserve(data.size() + 1);
+ views_to_write.push_back(header.AsStringView());
+ absl::c_copy(data, std::back_inserter(views_to_write));
+ absl::Status write_status = session_->writer_->Writev(
+ views_to_write, quiche::kDefaultStreamWriteOptions);
+ if (!write_status.ok()) {
+ session_->OnWriteError(write_status);
+ return 0;
+ }
+ return total_size;
+}
+
+void EncapsulatedSession::InnerStream::AbruptlyTerminate(absl::Status error) {
+ QUICHE_DLOG(INFO) << "Abruptly terminating the stream due to error: "
+ << error;
+ ResetDueToInternalError();
+}
+
+void EncapsulatedSession::InnerStream::ResetWithUserCode(
+ StreamErrorCode error) {
+ if (reset_frame_sent_) {
+ return;
+ }
+ reset_frame_sent_ = true;
+
+ session_->SendControlCapsule(
+ quiche::WebTransportResetStreamCapsule{id_, error});
+ CloseWriteSide(std::nullopt);
+}
+
+void EncapsulatedSession::InnerStream::SendStopSending(StreamErrorCode error) {
+ if (stop_sending_sent_) {
+ return;
+ }
+ stop_sending_sent_ = true;
+
+ session_->SendControlCapsule(
+ quiche::WebTransportStopSendingCapsule{id_, error});
+ CloseReadSide(std::nullopt);
+}
+
+void EncapsulatedSession::InnerStream::CloseReadSide(
+ std::optional<StreamErrorCode> error) {
+ if (read_side_closed_) {
+ return;
+ }
+ read_side_closed_ = true;
+ incoming_reads_.clear();
+ if (error.has_value() && visitor_ != nullptr) {
+ visitor_->OnResetStreamReceived(*error);
+ }
+ if (CanBeGarbageCollected()) {
+ session_->streams_to_garbage_collect_.push_back(id_);
+ }
+}
+
+void EncapsulatedSession::InnerStream::CloseWriteSide(
+ std::optional<StreamErrorCode> error) {
+ if (write_side_closed_) {
+ return;
+ }
+ write_side_closed_ = true;
+ pending_write_.clear();
+ absl::Status status = session_->scheduler_.Unregister(id_);
+ if (!status.ok()) {
+ session_->OnFatalError("Failed to unregister closed stream");
+ return;
+ }
+ if (error.has_value() && visitor_ != nullptr) {
+ visitor_->OnStopSendingReceived(*error);
+ }
+ if (CanBeGarbageCollected()) {
+ session_->streams_to_garbage_collect_.push_back(id_);
+ }
+}
+
+void EncapsulatedSession::GarbageCollectStreams() {
+ for (StreamId id : streams_to_garbage_collect_) {
+ streams_.erase(id);
+ }
+ streams_to_garbage_collect_.clear();
+}
+
} // namespace webtransport
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport.h b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
index 85c14c9..3247267 100644
--- a/quiche/web_transport/encapsulated/encapsulated_web_transport.h
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport.h
@@ -5,13 +5,20 @@
#ifndef QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
#define QUICHE_WEB_TRANSPORT_ENCAPSULATED_ENCAPSULATED_WEB_TRANSPORT_H_
+#include <cstddef>
#include <cstdint>
#include <memory>
+#include <optional>
#include <string>
+#include <utility>
+#include <vector>
+#include "absl/base/attributes.h"
+#include "absl/container/node_hash_map.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
+#include "absl/types/span.h"
#include "quiche/common/capsule.h"
#include "quiche/common/http/http_header_block.h"
#include "quiche/common/platform/api/quiche_export.h"
@@ -21,9 +28,18 @@
#include "quiche/common/quiche_stream.h"
#include "quiche/common/simple_buffer_allocator.h"
#include "quiche/web_transport/web_transport.h"
+#include "quiche/web_transport/web_transport_priority_scheduler.h"
namespace webtransport {
+constexpr bool IsUnidirectionalId(StreamId id) { return id & 0b10; }
+constexpr bool IsBidirectionalId(StreamId id) {
+ return !IsUnidirectionalId(id);
+}
+constexpr bool IsIdOpenedBy(StreamId id, Perspective perspective) {
+ return (id & 0b01) ^ (perspective == Perspective::kClient);
+}
+
using FatalErrorCallback = quiche::SingleUseCallback<void(absl::string_view)>;
// Implementation of the WebTransport over HTTP/2 protocol; works over any
@@ -97,7 +113,103 @@
State state() const { return state_; }
+ // Cleans up the state for all of the streams that have been closed. QUIC
+ // uses timers to safely delete closed streams while minimizing the risk that
+ // something on stack holds an active pointer to them; WebTransport over
+ // HTTP/2 does not have any timers in it, making that approach inapplicable
+ // here. This class does automatically run garbage collection at the end of
+ // every OnCanRead() call (since it's a top-level entrypoint that is likely to
+ // come directly from I/O handler), but if the application does not happen to
+ // read data frequently, manual calls to this function may be requried.
+ void GarbageCollectStreams();
+
private:
+ // If the amount of data buffered in the socket exceeds the amount specified
+ // here, CanWrite() will start returning false.
+ static constexpr size_t kDefaultMaxBufferedStreamData = 16 * 1024;
+
+ class InnerStream : public Stream {
+ public:
+ InnerStream(EncapsulatedSession* session, StreamId id);
+ InnerStream(const InnerStream&) = delete;
+ InnerStream(InnerStream&&) = delete;
+ InnerStream& operator=(const InnerStream&) = delete;
+ InnerStream& operator=(InnerStream&&) = delete;
+
+ // ReadStream implementation.
+ ABSL_MUST_USE_RESULT ReadResult Read(absl::Span<char> output) override;
+ ABSL_MUST_USE_RESULT ReadResult Read(std::string* output) override;
+ size_t ReadableBytes() const override;
+ PeekResult PeekNextReadableRegion() const override;
+ bool SkipBytes(size_t bytes) override;
+
+ // WriteStream implementation.
+ absl::Status Writev(absl::Span<const absl::string_view> data,
+ const quiche::StreamWriteOptions& options) override;
+ bool CanWrite() const override;
+
+ // TerminableStream implementation.
+ void AbruptlyTerminate(absl::Status error) override;
+
+ // Stream implementation.
+ StreamId GetStreamId() const override { return id_; }
+ StreamVisitor* visitor() override { return visitor_.get(); }
+ void SetVisitor(std::unique_ptr<StreamVisitor> visitor) override {
+ visitor_ = std::move(visitor);
+ }
+
+ void ResetWithUserCode(StreamErrorCode error) override;
+ void SendStopSending(StreamErrorCode error) override;
+
+ void ResetDueToInternalError() override { ResetWithUserCode(0); }
+ void MaybeResetDueToStreamObjectGone() override { ResetWithUserCode(0); }
+
+ void CloseReadSide(std::optional<StreamErrorCode> error);
+ void CloseWriteSide(std::optional<StreamErrorCode> error);
+ bool CanBeGarbageCollected() const {
+ return read_side_closed_ && write_side_closed_;
+ }
+
+ bool HasPendingWrite() const { return !pending_write_.empty(); }
+ void FlushPendingWrite();
+
+ void ProcessCapsule(const quiche::Capsule& capsule);
+
+ private:
+ // Struct for storing data that can potentially either stored inside the
+ // object or inside some other object on the stack. Here is roughly how this
+ // works:
+ // 1. A read is enqueued with `data` pointing to a temporary buffer, and
+ // `storage` being empty.
+ // 2. Visitor::OnCanRead() is called, potentially causing the user to
+ // consume the data from the temporary buffer directly.
+ // 3. If user does not consume data immediately, it's copied to `storage`
+ // (and the pointer to `data` is updated) so that it can be read later.
+ struct IncomingRead {
+ absl::string_view data;
+ std::string storage;
+
+ size_t size() const { return data.size(); }
+ };
+
+ // Tries to send `data`; may send less if limited by flow control.
+ [[nodiscard]] size_t WriteInner(absl::Span<const absl::string_view> data,
+ bool fin);
+
+ EncapsulatedSession* session_;
+ StreamId id_;
+ std::unique_ptr<StreamVisitor> visitor_;
+ quiche::QuicheCircularDeque<IncomingRead> incoming_reads_;
+ std::string pending_write_;
+ bool read_side_closed_;
+ bool write_side_closed_;
+ bool reset_frame_sent_ = false;
+ bool stop_sending_sent_ = false;
+ bool fin_received_ = false;
+ bool fin_consumed_ = false;
+ bool fin_buffered_ = false;
+ };
+
struct BufferedClose {
SessionErrorCode error_code = 0;
std::string error_message;
@@ -114,6 +226,18 @@
quiche::SimpleBufferAllocator::Get();
quiche::CapsuleParser capsule_parser_;
+ size_t max_stream_data_buffered_ = kDefaultMaxBufferedStreamData;
+
+ PriorityScheduler scheduler_;
+ absl::node_hash_map<StreamId, InnerStream>
+ streams_; // Streams unregister themselves with scheduler on deletion,
+ // and thus have to be above it.
+ quiche::QuicheCircularDeque<StreamId> incoming_bidirectional_streams_;
+ quiche::QuicheCircularDeque<StreamId> incoming_unidirectional_streams_;
+ std::vector<StreamId> streams_to_garbage_collect_;
+ StreamId next_outgoing_bidi_stream_;
+ StreamId next_outgoing_unidi_stream_;
+
bool session_close_notified_ = false;
bool fin_sent_ = false;
@@ -126,6 +250,20 @@
const std::string& error_message);
void OnFatalError(absl::string_view error_message);
void OnWriteError(absl::Status error);
+
+ bool IsOutgoing(StreamId id) { return IsIdOpenedBy(id, perspective_); }
+ bool IsIncoming(StreamId id) { return !IsOutgoing(id); }
+
+ template <typename CapsuleType>
+ void SendControlCapsule(CapsuleType capsule) {
+ control_capsule_queue_.push_back(quiche::SerializeCapsule(
+ quiche::Capsule(std::move(capsule)), allocator_));
+ OnCanWrite();
+ }
+
+ Stream* AcceptIncomingStream(quiche::QuicheCircularDeque<StreamId>& queue);
+ Stream* OpenOutgoingStream(StreamId& counter);
+ void ProcessStreamCapsule(const quiche::Capsule& capsule, StreamId stream_id);
};
} // namespace webtransport
diff --git a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
index 15728e4..8c0c79f 100644
--- a/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
+++ b/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc
@@ -4,9 +4,11 @@
#include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
+#include <array>
#include <memory>
#include <string>
#include <utility>
+#include <vector>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
@@ -18,6 +20,7 @@
#include "quiche/common/quiche_stream.h"
#include "quiche/common/simple_buffer_allocator.h"
#include "quiche/common/test_tools/mock_streams.h"
+#include "quiche/common/test_tools/quiche_test_utils.h"
#include "quiche/web_transport/test_tools/mock_web_transport.h"
#include "quiche/web_transport/web_transport.h"
@@ -26,7 +29,9 @@
using ::quiche::Capsule;
using ::quiche::CapsuleType;
+using ::quiche::test::StatusIs;
using ::testing::_;
+using ::testing::ElementsAre;
using ::testing::HasSubstr;
using ::testing::IsEmpty;
using ::testing::Return;
@@ -78,6 +83,14 @@
session_->OnCanRead();
}
+ template <typename CapsuleType>
+ void ProcessIncomingCapsule(const CapsuleType& capsule) {
+ quiche::QuicheBuffer buffer = quiche::SerializeCapsule(
+ quiche::Capsule(capsule), quiche::SimpleBufferAllocator::Get());
+ read_buffer_.append(buffer.data(), buffer.size());
+ session_->OnCanRead();
+ }
+
void DefaultHandshakeForClient(EncapsulatedSession& session) {
quiche::HttpHeaderBlock outgoing_headers, incoming_headers;
session.InitializeClient(CreateAndStoreVisitor(), outgoing_headers,
@@ -96,6 +109,18 @@
testing::MockFunction<void(absl::string_view)> fatal_error_callback_;
};
+TEST_F(EncapsulatedWebTransportTest, IsOpenedBy) {
+ EXPECT_EQ(IsIdOpenedBy(0x00, Perspective::kClient), true);
+ EXPECT_EQ(IsIdOpenedBy(0x01, Perspective::kClient), false);
+ EXPECT_EQ(IsIdOpenedBy(0x02, Perspective::kClient), true);
+ EXPECT_EQ(IsIdOpenedBy(0x03, Perspective::kClient), false);
+
+ EXPECT_EQ(IsIdOpenedBy(0x00, Perspective::kServer), false);
+ EXPECT_EQ(IsIdOpenedBy(0x01, Perspective::kServer), true);
+ EXPECT_EQ(IsIdOpenedBy(0x02, Perspective::kServer), false);
+ EXPECT_EQ(IsIdOpenedBy(0x03, Perspective::kServer), true);
+}
+
TEST_F(EncapsulatedWebTransportTest, SetupClientSession) {
std::unique_ptr<EncapsulatedSession> session =
CreateTransport(Perspective::kClient);
@@ -314,5 +339,448 @@
session->NotifySessionDraining();
}
+TEST_F(EncapsulatedWebTransportTest, SimpleRead) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ bool stream_received = false;
+ EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable())
+ .WillOnce([&] { stream_received = true; });
+ std::string data = "test";
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, data, false});
+ // Make sure data gets copied.
+ data[0] = 'q';
+ EXPECT_TRUE(stream_received);
+ Stream* stream = session->AcceptIncomingBidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_EQ(stream->GetStreamId(), 1u);
+ EXPECT_EQ(stream->visitor(), nullptr);
+ EXPECT_EQ(stream->ReadableBytes(), 4u);
+
+ quiche::ReadStream::PeekResult peek = stream->PeekNextReadableRegion();
+ EXPECT_EQ(peek.peeked_data, "test");
+ EXPECT_FALSE(peek.fin_next);
+ EXPECT_FALSE(peek.all_data_received);
+
+ std::string buffer;
+ quiche::ReadStream::ReadResult read = stream->Read(&buffer);
+ EXPECT_EQ(read.bytes_read, 4);
+ EXPECT_FALSE(read.fin);
+ EXPECT_EQ(buffer, "test");
+ EXPECT_EQ(stream->ReadableBytes(), 0u);
+}
+
+class MockStreamVisitorWithDestructor : public MockStreamVisitor {
+ public:
+ ~MockStreamVisitorWithDestructor() { OnDelete(); }
+
+ MOCK_METHOD(void, OnDelete, (), ());
+};
+
+MockStreamVisitorWithDestructor* SetupVisitor(Stream& stream) {
+ auto visitor = std::make_unique<MockStreamVisitorWithDestructor>();
+ MockStreamVisitorWithDestructor* result = visitor.get();
+ stream.SetVisitor(std::move(visitor));
+ return result;
+}
+
+TEST_F(EncapsulatedWebTransportTest, ImmediateRead) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
+ ProcessIncomingCapsule(
+ quiche::WebTransportStreamDataCapsule{1, "abcd", false});
+ Stream* stream = session->AcceptIncomingBidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_EQ(stream->ReadableBytes(), 4u);
+
+ MockStreamVisitor* visitor = SetupVisitor(*stream);
+ EXPECT_CALL(*visitor, OnCanRead()).WillOnce([&] {
+ std::string output;
+ (void)stream->Read(&output);
+ EXPECT_EQ(output, "abcdef");
+ });
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "ef", false});
+}
+
+TEST_F(EncapsulatedWebTransportTest, FinPeek) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
+ ProcessIncomingCapsule(
+ quiche::WebTransportStreamDataCapsule{1, "abcd", false});
+ Stream* stream = session->AcceptIncomingBidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_EQ(stream->ReadableBytes(), 4u);
+
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "ef", true});
+
+ quiche::ReadStream::PeekResult peek = stream->PeekNextReadableRegion();
+ EXPECT_EQ(peek.peeked_data, "abcd");
+ EXPECT_FALSE(peek.fin_next);
+ EXPECT_TRUE(peek.all_data_received);
+
+ EXPECT_FALSE(stream->SkipBytes(2));
+ peek = stream->PeekNextReadableRegion();
+ EXPECT_FALSE(peek.fin_next);
+ EXPECT_TRUE(peek.all_data_received);
+
+ EXPECT_FALSE(stream->SkipBytes(2));
+ peek = stream->PeekNextReadableRegion();
+ EXPECT_EQ(peek.peeked_data, "ef");
+ EXPECT_TRUE(peek.fin_next);
+ EXPECT_TRUE(peek.all_data_received);
+
+ EXPECT_TRUE(stream->SkipBytes(2));
+}
+
+TEST_F(EncapsulatedWebTransportTest, FinRead) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
+ ProcessIncomingCapsule(
+ quiche::WebTransportStreamDataCapsule{1, "abcdef", true});
+ Stream* stream = session->AcceptIncomingBidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_EQ(stream->ReadableBytes(), 6u);
+
+ std::array<char, 3> buffer;
+ quiche::ReadStream::ReadResult read = stream->Read(absl::MakeSpan(buffer));
+ EXPECT_THAT(buffer, ElementsAre('a', 'b', 'c'));
+ EXPECT_EQ(read.bytes_read, 3);
+ EXPECT_FALSE(read.fin);
+
+ read = stream->Read(absl::MakeSpan(buffer));
+ EXPECT_THAT(buffer, ElementsAre('d', 'e', 'f'));
+ EXPECT_EQ(read.bytes_read, 3);
+ EXPECT_TRUE(read.fin);
+}
+
+TEST_F(EncapsulatedWebTransportTest, LargeRead) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{
+ 1, std::string(64 * 1024, 'a'), true});
+ Stream* stream = session->AcceptIncomingBidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_EQ(stream->ReadableBytes(), 65536u);
+
+ for (int i = 0; i < 64; i++) {
+ std::array<char, 1024> buffer;
+ quiche::ReadStream::ReadResult read = stream->Read(absl::MakeSpan(buffer));
+ EXPECT_EQ(read.bytes_read, 1024);
+ EXPECT_EQ(read.fin, i == 63);
+ }
+}
+
+TEST_F(EncapsulatedWebTransportTest, DoubleFinReceived) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "abc", true});
+ Stream* stream = session->AcceptIncomingBidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ EXPECT_CALL(fatal_error_callback_, Call(_))
+ .WillOnce([](absl::string_view error) {
+ EXPECT_THAT(error, HasSubstr("has already received a FIN"));
+ });
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "def", true});
+}
+
+TEST_F(EncapsulatedWebTransportTest, CanWriteUnidiBidi) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
+ EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "abc", true});
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "abc", true});
+
+ Stream* stream = session->AcceptIncomingBidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_TRUE(stream->CanWrite());
+
+ stream = session->AcceptIncomingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_FALSE(stream->CanWrite());
+
+ stream = session->OpenOutgoingBidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_TRUE(stream->CanWrite());
+
+ stream = session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_TRUE(stream->CanWrite());
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReadOnlyGarbageCollection) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "abc", true});
+
+ Stream* stream = session->AcceptIncomingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+ EXPECT_TRUE(stream->SkipBytes(3));
+
+ MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
+ bool deleted = false;
+ EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
+ session->GarbageCollectStreams();
+ EXPECT_TRUE(deleted);
+}
+
+TEST_F(EncapsulatedWebTransportTest, WriteOnlyGarbageCollection) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+
+ Stream* stream = session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
+ bool deleted = false;
+ EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce(Return(true));
+
+ quiche::StreamWriteOptions options;
+ options.set_send_fin(true);
+ EXPECT_THAT(stream->Writev(absl::Span<const absl::string_view>(), options),
+ StatusIs(absl::StatusCode::kOk));
+ session->GarbageCollectStreams();
+ EXPECT_TRUE(deleted);
+}
+
+TEST_F(EncapsulatedWebTransportTest, SimpleWrite) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnIncomingBidirectionalStreamAvailable());
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{1, "", true});
+ Stream* stream = session->AcceptIncomingBidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
+ EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 1u);
+ EXPECT_EQ(capsule.web_transport_stream_data().fin, false);
+ EXPECT_EQ(capsule.web_transport_stream_data().data, "test");
+ return true;
+ });
+ absl::Status status = quiche::WriteIntoStream(*stream, "test");
+ EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
+}
+
+TEST_F(EncapsulatedWebTransportTest, WriteWithFin) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ Stream* stream = session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM_WITH_FIN);
+ EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
+ EXPECT_EQ(capsule.web_transport_stream_data().fin, true);
+ EXPECT_EQ(capsule.web_transport_stream_data().data, "test");
+ return true;
+ });
+ quiche::StreamWriteOptions options;
+ options.set_send_fin(true);
+ EXPECT_TRUE(stream->CanWrite());
+ absl::Status status = quiche::WriteIntoStream(*stream, "test", options);
+ EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
+ EXPECT_FALSE(stream->CanWrite());
+}
+
+TEST_F(EncapsulatedWebTransportTest, FinOnlyWrite) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ Stream* stream = session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM_WITH_FIN);
+ EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
+ EXPECT_EQ(capsule.web_transport_stream_data().fin, true);
+ EXPECT_EQ(capsule.web_transport_stream_data().data, "");
+ return true;
+ });
+ quiche::StreamWriteOptions options;
+ options.set_send_fin(true);
+ EXPECT_TRUE(stream->CanWrite());
+ absl::Status status =
+ stream->Writev(absl::Span<const absl::string_view>(), options);
+ EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
+ EXPECT_FALSE(stream->CanWrite());
+}
+
+TEST_F(EncapsulatedWebTransportTest, BufferedWriteThenUnbuffer) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ Stream* stream = session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
+ absl::Status status = quiche::WriteIntoStream(*stream, "abc");
+ EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
+
+ // While the stream cannot be written right now, we should be still able to
+ // buffer data into it.
+ EXPECT_TRUE(stream->CanWrite());
+ EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
+ status = quiche::WriteIntoStream(*stream, "def");
+ EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
+
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
+ EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
+ EXPECT_EQ(capsule.web_transport_stream_data().data, "abcdef");
+ return true;
+ });
+ session_->OnCanWrite();
+}
+
+TEST_F(EncapsulatedWebTransportTest, BufferedWriteThenFlush) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ Stream* stream = session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ EXPECT_CALL(writer_, CanWrite()).Times(2).WillRepeatedly(Return(false));
+ absl::Status status = quiche::WriteIntoStream(*stream, "abc");
+ EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
+ status = quiche::WriteIntoStream(*stream, "def");
+ EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
+
+ EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
+ EXPECT_EQ(capsule.web_transport_stream_data().stream_id, 2u);
+ EXPECT_EQ(capsule.web_transport_stream_data().data, "abcdef");
+ return true;
+ });
+ session_->OnCanWrite();
+}
+
+TEST_F(EncapsulatedWebTransportTest, BufferedStreamBlocksAnother) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ Stream* stream1 = session->OpenOutgoingUnidirectionalStream();
+ Stream* stream2 = session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(stream1 != nullptr);
+ ASSERT_TRUE(stream2 != nullptr);
+
+ EXPECT_CALL(*this, OnCapsule(_)).Times(0);
+ EXPECT_CALL(writer_, CanWrite()).WillOnce(Return(false));
+ absl::Status status = quiche::WriteIntoStream(*stream1, "abc");
+ EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
+ // ShouldYield will return false here, causing the write to get buffered.
+ EXPECT_CALL(writer_, CanWrite()).WillRepeatedly(Return(true));
+ status = quiche::WriteIntoStream(*stream2, "abc");
+ EXPECT_THAT(status, StatusIs(absl::StatusCode::kOk));
+
+ std::vector<StreamId> writes;
+ EXPECT_CALL(*this, OnCapsule(_)).WillRepeatedly([&](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STREAM);
+ writes.push_back(capsule.web_transport_stream_data().stream_id);
+ return true;
+ });
+ session_->OnCanWrite();
+ EXPECT_THAT(writes, ElementsAre(2, 6));
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendReset) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ Stream* stream = session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([&](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_RESET_STREAM);
+ EXPECT_EQ(capsule.web_transport_reset_stream().stream_id, 2u);
+ EXPECT_EQ(capsule.web_transport_reset_stream().error_code, 1234u);
+ return true;
+ });
+ stream->ResetWithUserCode(1234u);
+
+ bool deleted = false;
+ EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
+ session->GarbageCollectStreams();
+ EXPECT_TRUE(deleted);
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveReset) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "", true});
+ Stream* stream = session->AcceptIncomingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
+ EXPECT_CALL(*visitor, OnResetStreamReceived(1234u));
+ EXPECT_TRUE(session->GetStreamById(3) != nullptr);
+ ProcessIncomingCapsule(quiche::WebTransportResetStreamCapsule{3u, 1234u});
+ // Reading from the underlying transport automatically triggers garbage
+ // collection.
+ EXPECT_TRUE(session->GetStreamById(3) == nullptr);
+}
+
+TEST_F(EncapsulatedWebTransportTest, SendStopSending) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ EXPECT_CALL(*visitor_, OnIncomingUnidirectionalStreamAvailable());
+ ProcessIncomingCapsule(quiche::WebTransportStreamDataCapsule{3, "", true});
+ Stream* stream = session->AcceptIncomingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
+ EXPECT_CALL(*this, OnCapsule(_)).WillOnce([&](const Capsule& capsule) {
+ EXPECT_EQ(capsule.capsule_type(), CapsuleType::WT_STOP_SENDING);
+ EXPECT_EQ(capsule.web_transport_stop_sending().stream_id, 3u);
+ EXPECT_EQ(capsule.web_transport_stop_sending().error_code, 1234u);
+ return true;
+ });
+ stream->SendStopSending(1234u);
+
+ bool deleted = false;
+ EXPECT_CALL(*visitor, OnDelete()).WillOnce([&] { deleted = true; });
+ session->GarbageCollectStreams();
+ EXPECT_TRUE(deleted);
+}
+
+TEST_F(EncapsulatedWebTransportTest, ReceiveStopSending) {
+ std::unique_ptr<EncapsulatedSession> session =
+ CreateTransport(Perspective::kClient);
+ DefaultHandshakeForClient(*session);
+ Stream* stream = session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(stream != nullptr);
+
+ MockStreamVisitorWithDestructor* visitor = SetupVisitor(*stream);
+ EXPECT_CALL(*visitor, OnStopSendingReceived(1234u));
+ EXPECT_TRUE(session->GetStreamById(2) != nullptr);
+ ProcessIncomingCapsule(quiche::WebTransportStopSendingCapsule{2u, 1234u});
+ // Reading from the underlying transport automatically triggers garbage
+ // collection.
+ EXPECT_TRUE(session->GetStreamById(2) == nullptr);
+}
+
} // namespace
} // namespace webtransport::test