blob: adbbb5364a432e16b1414597a4b63c2da91b26a0 [file] [log] [blame]
// Copyright (c) 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/web_transport/encapsulated/encapsulated_web_transport.h"
#include <stdbool.h>
#include <algorithm>
#include <array>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <iterator>
#include <memory>
#include <optional>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
#include "absl/algorithm/container.h"
#include "absl/container/node_hash_map.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "quiche/common/capsule.h"
#include "quiche/common/http/http_header_block.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/quiche_circular_deque.h"
#include "quiche/common/quiche_status_utils.h"
#include "quiche/common/quiche_stream.h"
#include "quiche/web_transport/web_transport.h"
namespace webtransport {
namespace {
using ::quiche::Capsule;
using ::quiche::CapsuleType;
using ::quiche::CloseWebTransportSessionCapsule;
// This is arbitrary, since we don't have any real MTU restriction when running
// over TCP.
constexpr uint64_t kEncapsulatedMaxDatagramSize = 9000;
constexpr StreamPriority kDefaultPriority = StreamPriority{0, 0};
} // namespace
EncapsulatedSession::EncapsulatedSession(
Perspective perspective, FatalErrorCallback fatal_error_callback)
: perspective_(perspective),
fatal_error_callback_(std::move(fatal_error_callback)),
capsule_parser_(this),
next_outgoing_bidi_stream_(perspective == Perspective::kClient ? 0 : 1),
next_outgoing_unidi_stream_(perspective == Perspective::kClient ? 2 : 3) {
QUICHE_DCHECK(IsIdOpenedBy(next_outgoing_bidi_stream_, perspective));
QUICHE_DCHECK(IsIdOpenedBy(next_outgoing_unidi_stream_, perspective));
}
void EncapsulatedSession::InitializeClient(
std::unique_ptr<SessionVisitor> visitor,
quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
quiche::ReadStream* reader) {
if (state_ != kUninitialized) {
OnFatalError("Called InitializeClient() in an invalid state");
return;
}
if (perspective_ != Perspective::kClient) {
OnFatalError("Called InitializeClient() on a server session");
return;
}
visitor_ = std::move(visitor);
writer_ = writer;
reader_ = reader;
state_ = kWaitingForHeaders;
}
void EncapsulatedSession::InitializeServer(
std::unique_ptr<SessionVisitor> visitor,
const quiche::HttpHeaderBlock& /*incoming_headers*/,
quiche::HttpHeaderBlock& /*outgoing_headers*/, quiche::WriteStream* writer,
quiche::ReadStream* reader) {
if (state_ != kUninitialized) {
OnFatalError("Called InitializeServer() in an invalid state");
return;
}
if (perspective_ != Perspective::kServer) {
OnFatalError("Called InitializeServer() on a client session");
return;
}
visitor_ = std::move(visitor);
writer_ = writer;
reader_ = reader;
OpenSession();
}
void EncapsulatedSession::ProcessIncomingServerHeaders(
const quiche::HttpHeaderBlock& /*headers*/) {
if (state_ != kWaitingForHeaders) {
OnFatalError("Called ProcessIncomingServerHeaders() in an invalid state");
return;
}
OpenSession();
}
void EncapsulatedSession::CloseSession(SessionErrorCode error_code,
absl::string_view error_message) {
switch (state_) {
case kUninitialized:
case kWaitingForHeaders:
OnFatalError(absl::StrCat(
"Attempted to close a session before it opened with error 0x",
absl::Hex(error_code), ": ", error_message));
return;
case kSessionClosing:
case kSessionClosed:
OnFatalError(absl::StrCat(
"Attempted to close a session that is already closed with error 0x",
absl::Hex(error_code), ": ", error_message));
return;
case kSessionOpen:
break;
}
state_ = kSessionClosing;
buffered_session_close_ =
BufferedClose{error_code, std::string(error_message)};
OnCanWrite();
}
Stream* EncapsulatedSession::AcceptIncomingStream(
quiche::QuicheCircularDeque<StreamId>& queue) {
while (!queue.empty()) {
StreamId id = queue.front();
queue.pop_front();
Stream* stream = GetStreamById(id);
if (stream == nullptr) {
// Stream got reset and garbage collected before the peer ever had a
// chance to look at it.
continue;
}
return stream;
}
return nullptr;
}
Stream* EncapsulatedSession::AcceptIncomingBidirectionalStream() {
return AcceptIncomingStream(incoming_bidirectional_streams_);
}
Stream* EncapsulatedSession::AcceptIncomingUnidirectionalStream() {
return AcceptIncomingStream(incoming_unidirectional_streams_);
}
bool EncapsulatedSession::CanOpenNextOutgoingBidirectionalStream() {
// TODO: implement flow control.
return true;
}
bool EncapsulatedSession::CanOpenNextOutgoingUnidirectionalStream() {
// TODO: implement flow control.
return true;
}
Stream* EncapsulatedSession::OpenOutgoingStream(StreamId& counter) {
StreamId stream_id = counter;
counter += 4;
auto [it, inserted] = streams_.emplace(
std::piecewise_construct, std::forward_as_tuple(stream_id),
std::forward_as_tuple(this, stream_id));
QUICHE_DCHECK(inserted);
return &it->second;
}
Stream* EncapsulatedSession::OpenOutgoingBidirectionalStream() {
if (!CanOpenNextOutgoingBidirectionalStream()) {
return nullptr;
}
return OpenOutgoingStream(next_outgoing_bidi_stream_);
}
Stream* EncapsulatedSession::OpenOutgoingUnidirectionalStream() {
if (!CanOpenNextOutgoingUnidirectionalStream()) {
return nullptr;
}
return OpenOutgoingStream(next_outgoing_unidi_stream_);
}
Stream* EncapsulatedSession::GetStreamById(StreamId id) {
auto it = streams_.find(id);
if (it == streams_.end()) {
return nullptr;
}
return &it->second;
}
DatagramStats EncapsulatedSession::GetDatagramStats() {
DatagramStats stats;
stats.expired_outgoing = 0;
stats.lost_outgoing = 0;
return stats;
}
SessionStats EncapsulatedSession::GetSessionStats() {
// We could potentially get stats via tcp_info and similar mechanisms, but
// that would require us knowing what the underlying socket is.
return SessionStats();
}
void EncapsulatedSession::NotifySessionDraining() {
SendControlCapsule(quiche::DrainWebTransportSessionCapsule());
OnCanWrite();
}
void EncapsulatedSession::SetOnDraining(
quiche::SingleUseCallback<void()> callback) {
draining_callback_ = std::move(callback);
}
DatagramStatus EncapsulatedSession::SendOrQueueDatagram(
absl::string_view datagram) {
if (datagram.size() > GetMaxDatagramSize()) {
return DatagramStatus{
DatagramStatusCode::kTooBig,
absl::StrCat("Datagram is ", datagram.size(),
" bytes long, while the specified maximum size is ",
GetMaxDatagramSize())};
}
bool write_blocked;
switch (state_) {
case kUninitialized:
write_blocked = true;
break;
// We can send datagrams before receiving any headers from the peer, since
// datagrams are not subject to queueing.
case kWaitingForHeaders:
case kSessionOpen:
write_blocked = !writer_->CanWrite();
break;
case kSessionClosing:
case kSessionClosed:
return DatagramStatus{DatagramStatusCode::kInternalError,
"Writing into an already closed session"};
}
if (write_blocked) {
// TODO: this *may* be useful to split into a separate queue.
control_capsule_queue_.push_back(
quiche::SerializeCapsule(Capsule::Datagram(datagram), allocator_));
return DatagramStatus{DatagramStatusCode::kSuccess, ""};
}
// We could always write via OnCanWrite() above, but the optimistic path below
// allows us to avoid a copy.
quiche::QuicheBuffer buffer =
quiche::SerializeDatagramCapsuleHeader(datagram.size(), allocator_);
std::array spans = {buffer.AsStringView(), datagram};
absl::Status write_status =
writer_->Writev(absl::MakeConstSpan(spans), quiche::StreamWriteOptions());
if (!write_status.ok()) {
OnWriteError(write_status);
return DatagramStatus{
DatagramStatusCode::kInternalError,
absl::StrCat("Write error for datagram: ", write_status.ToString())};
}
return DatagramStatus{DatagramStatusCode::kSuccess, ""};
}
uint64_t EncapsulatedSession::GetMaxDatagramSize() const {
return kEncapsulatedMaxDatagramSize;
}
void EncapsulatedSession::SetDatagramMaxTimeInQueue(
absl::Duration /*max_time_in_queue*/) {
// TODO(b/264263113): implement this (requires having a mockable clock).
}
void EncapsulatedSession::OnCanWrite() {
if (state_ == kUninitialized || !writer_) {
OnFatalError("Trying to write before the session is initialized");
return;
}
if (state_ == kSessionClosed) {
OnFatalError("Trying to write before the session is closed");
return;
}
if (state_ == kSessionClosing) {
if (writer_->CanWrite()) {
CloseWebTransportSessionCapsule capsule{
buffered_session_close_.error_code,
buffered_session_close_.error_message};
quiche::QuicheBuffer buffer =
quiche::SerializeCapsule(Capsule(std::move(capsule)), allocator_);
absl::Status write_status = SendFin(buffer.AsStringView());
if (!write_status.ok()) {
OnWriteError(quiche::AppendToStatus(write_status,
" while writing WT_CLOSE_SESSION"));
return;
}
OnSessionClosed(buffered_session_close_.error_code,
buffered_session_close_.error_message);
}
return;
}
while (writer_->CanWrite() && !control_capsule_queue_.empty()) {
absl::Status write_status = quiche::WriteIntoStream(
*writer_, control_capsule_queue_.front().AsStringView());
if (!write_status.ok()) {
OnWriteError(write_status);
return;
}
control_capsule_queue_.pop_front();
}
while (writer_->CanWrite()) {
absl::StatusOr<StreamId> next_id = scheduler_.PopFront();
if (!next_id.ok()) {
QUICHE_DCHECK_EQ(next_id.status().code(), absl::StatusCode::kNotFound);
return;
}
auto it = streams_.find(*next_id);
if (it == streams_.end()) {
QUICHE_BUG(WT_H2_NextStreamNotInTheMap);
OnFatalError("Next scheduled stream is not in the map");
return;
}
QUICHE_DCHECK(it->second.HasPendingWrite());
it->second.FlushPendingWrite();
}
}
void EncapsulatedSession::OnCanRead() {
if (state_ == kSessionClosed || state_ == kSessionClosing) {
return;
}
bool has_fin = quiche::ProcessAllReadableRegions(
*reader_, [&](absl::string_view fragment) {
capsule_parser_.IngestCapsuleFragment(fragment);
});
if (has_fin) {
capsule_parser_.ErrorIfThereIsRemainingBufferedData();
OnSessionClosed(0, "");
}
if (state_ == kSessionOpen) {
GarbageCollectStreams();
}
}
bool EncapsulatedSession::OnCapsule(const quiche::Capsule& capsule) {
switch (capsule.capsule_type()) {
case CapsuleType::DATAGRAM:
visitor_->OnDatagramReceived(
capsule.datagram_capsule().http_datagram_payload);
break;
case CapsuleType::DRAIN_WEBTRANSPORT_SESSION:
if (draining_callback_) {
std::move(draining_callback_)();
}
break;
case CapsuleType::CLOSE_WEBTRANSPORT_SESSION:
OnSessionClosed(
capsule.close_web_transport_session_capsule().error_code,
std::string(
capsule.close_web_transport_session_capsule().error_message));
break;
case CapsuleType::WT_STREAM:
case CapsuleType::WT_STREAM_WITH_FIN:
ProcessStreamCapsule(capsule,
capsule.web_transport_stream_data().stream_id);
break;
case CapsuleType::WT_RESET_STREAM:
ProcessStreamCapsule(capsule,
capsule.web_transport_reset_stream().stream_id);
break;
case CapsuleType::WT_STOP_SENDING:
ProcessStreamCapsule(capsule,
capsule.web_transport_stop_sending().stream_id);
break;
default:
break;
}
return state_ != kSessionClosed;
}
void EncapsulatedSession::OnCapsuleParseFailure(
absl::string_view error_message) {
if (state_ == kSessionClosed) {
return;
}
OnFatalError(absl::StrCat("Stream parse error: ", error_message));
}
void EncapsulatedSession::ProcessStreamCapsule(const quiche::Capsule& capsule,
StreamId stream_id) {
bool new_stream_created = false;
auto it = streams_.find(stream_id);
if (it == streams_.end()) {
if (IsOutgoing(stream_id)) {
// Ignore this frame, as it is possible that it refers to an outgoing
// stream that has been closed.
return;
}
// TODO: check flow control here.
it = streams_.emplace_hint(it, std::piecewise_construct,
std::forward_as_tuple(stream_id),
std::forward_as_tuple(this, stream_id));
new_stream_created = true;
}
InnerStream& stream = it->second;
stream.ProcessCapsule(capsule);
if (new_stream_created) {
if (IsBidirectionalId(stream_id)) {
incoming_bidirectional_streams_.push_back(stream_id);
visitor_->OnIncomingBidirectionalStreamAvailable();
} else {
incoming_unidirectional_streams_.push_back(stream_id);
visitor_->OnIncomingUnidirectionalStreamAvailable();
}
}
}
void EncapsulatedSession::InnerStream::ProcessCapsule(
const quiche::Capsule& capsule) {
switch (capsule.capsule_type()) {
case CapsuleType::WT_STREAM:
case CapsuleType::WT_STREAM_WITH_FIN: {
if (fin_received_) {
session_->OnFatalError(
"Received stream data for a stream that has already received a "
"FIN");
return;
}
if (read_side_closed_) {
// It is possible that we sent STOP_SENDING but it has not been received
// yet. Ignore.
return;
}
fin_received_ = capsule.capsule_type() == CapsuleType::WT_STREAM_WITH_FIN;
const quiche::WebTransportStreamDataCapsule& data =
capsule.web_transport_stream_data();
if (!data.data.empty()) {
incoming_reads_.push_back(IncomingRead{data.data, std::string()});
}
// Fast path: if the visitor consumes all of the incoming reads, we don't
// need to copy data from the capsule parser.
if (visitor_ != nullptr) {
visitor_->OnCanRead();
}
// Slow path: copy all data that the visitor have not consumed.
for (IncomingRead& read : incoming_reads_) {
QUICHE_DCHECK(!read.data.empty());
if (read.storage.empty()) {
read.storage = std::string(read.data);
read.data = read.storage;
}
}
return;
}
case CapsuleType::WT_RESET_STREAM:
CloseReadSide(capsule.web_transport_reset_stream().error_code);
return;
case CapsuleType::WT_STOP_SENDING:
CloseWriteSide(capsule.web_transport_stop_sending().error_code);
return;
default:
QUICHE_BUG(WT_H2_ProcessStreamCapsule_Unknown)
<< "Unexpected capsule dispatched to InnerStream: " << capsule;
session_->OnFatalError(
"Internal error: Unexpected capsule dispatched to InnerStream");
return;
}
}
void EncapsulatedSession::OpenSession() {
state_ = kSessionOpen;
visitor_->OnSessionReady();
OnCanWrite();
OnCanRead();
}
absl::Status EncapsulatedSession::SendFin(absl::string_view data) {
QUICHE_DCHECK(!fin_sent_);
fin_sent_ = true;
quiche::StreamWriteOptions options;
options.set_send_fin(true);
return quiche::WriteIntoStream(*writer_, data, options);
}
void EncapsulatedSession::OnSessionClosed(SessionErrorCode error_code,
const std::string& error_message) {
if (!fin_sent_) {
absl::Status status = SendFin("");
if (!status.ok()) {
OnWriteError(status);
return;
}
}
if (session_close_notified_) {
QUICHE_DCHECK_EQ(state_, kSessionClosed);
return;
}
state_ = kSessionClosed;
session_close_notified_ = true;
if (visitor_ != nullptr) {
visitor_->OnSessionClosed(error_code, error_message);
}
}
void EncapsulatedSession::OnFatalError(absl::string_view error_message) {
QUICHE_DLOG(ERROR) << "Fatal error in encapsulated WebTransport: "
<< error_message;
state_ = kSessionClosed;
if (fatal_error_callback_) {
std::move(fatal_error_callback_)(error_message);
fatal_error_callback_ = nullptr;
}
}
void EncapsulatedSession::OnWriteError(absl::Status error) {
OnFatalError(absl::StrCat(
error, " while trying to write encapsulated WebTransport data"));
}
EncapsulatedSession::InnerStream::InnerStream(EncapsulatedSession* session,
StreamId id)
: session_(session),
id_(id),
read_side_closed_(IsUnidirectionalId(id) &&
IsIdOpenedBy(id, session->perspective_)),
write_side_closed_(IsUnidirectionalId(id) &&
!IsIdOpenedBy(id, session->perspective_)) {
if (!write_side_closed_) {
absl::Status status = session_->scheduler_.Register(id_, kDefaultPriority);
if (!status.ok()) {
QUICHE_BUG(WT_H2_FailedToRegisterNewStream) << status;
session_->OnFatalError(
"Failed to register new stream with the scheduler");
return;
}
}
}
quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read(
absl::Span<char> output) {
const size_t total_size = output.size();
for (const IncomingRead& read : incoming_reads_) {
size_t size_to_read = std::min(read.size(), output.size());
if (size_to_read == 0) {
break;
}
memcpy(output.data(), read.data.data(), size_to_read);
output = output.subspan(size_to_read);
}
bool fin_consumed = SkipBytes(total_size);
return ReadResult{total_size, fin_consumed};
}
quiche::ReadStream::ReadResult EncapsulatedSession::InnerStream::Read(
std::string* output) {
const size_t total_size = ReadableBytes();
const size_t initial_offset = output->size();
output->resize(initial_offset + total_size);
return Read(absl::Span<char>(&((*output)[initial_offset]), total_size));
}
size_t EncapsulatedSession::InnerStream::ReadableBytes() const {
size_t total_size = 0;
for (const IncomingRead& read : incoming_reads_) {
total_size += read.size();
}
return total_size;
}
quiche::ReadStream::PeekResult
EncapsulatedSession::InnerStream::PeekNextReadableRegion() const {
if (incoming_reads_.empty()) {
return PeekResult{absl::string_view(), fin_received_, fin_received_};
}
return PeekResult{incoming_reads_.front().data,
fin_received_ && incoming_reads_.size() == 1,
fin_received_};
}
bool EncapsulatedSession::InnerStream::SkipBytes(size_t bytes) {
size_t remaining = bytes;
while (remaining > 0) {
if (incoming_reads_.empty()) {
QUICHE_BUG(WT_H2_SkipBytes_toomuch)
<< "Requested to skip " << remaining
<< " bytes that are not present in the read buffer.";
return false;
}
IncomingRead& current = incoming_reads_.front();
if (remaining < current.size()) {
current.data = current.data.substr(remaining);
return false;
}
remaining -= current.size();
incoming_reads_.pop_front();
}
if (incoming_reads_.empty() && fin_received_) {
fin_consumed_ = true;
CloseReadSide(std::nullopt);
return true;
}
return false;
}
absl::Status EncapsulatedSession::InnerStream::Writev(
const absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
if (write_side_closed_) {
return absl::FailedPreconditionError(
"Trying to write into an already-closed stream");
}
if (fin_buffered_) {
return absl::FailedPreconditionError("FIN already buffered");
}
if (!CanWrite()) {
return absl::FailedPreconditionError(
"Trying to write into a stream when CanWrite() = false");
}
const absl::StatusOr<bool> should_yield =
session_->scheduler_.ShouldYield(id_);
if (!should_yield.ok()) {
QUICHE_BUG(WT_H2_Writev_NotRegistered) << should_yield.status();
session_->OnFatalError("Stream not registered with the scheduler");
return absl::InternalError("Stream not registered with the scheduler");
}
const bool write_blocked = !session_->writer_->CanWrite() || *should_yield ||
!pending_write_.empty();
if (write_blocked) {
fin_buffered_ = options.send_fin();
for (absl::string_view chunk : data) {
absl::StrAppend(&pending_write_, chunk);
}
absl::Status status = session_->scheduler_.Schedule(id_);
if (!status.ok()) {
QUICHE_BUG(WT_H2_Writev_CantSchedule) << status;
session_->OnFatalError("Could not schedule a write-blocked stream");
return absl::InternalError("Could not schedule a write-blocked stream");
}
return absl::OkStatus();
}
size_t bytes_written = WriteInner(data, options.send_fin());
// TODO: handle partial writes when flow control requires those.
QUICHE_DCHECK(bytes_written == 0 ||
bytes_written == quiche::TotalStringViewSpanSize(data));
if (bytes_written == 0) {
for (absl::string_view chunk : data) {
absl::StrAppend(&pending_write_, chunk);
}
}
if (options.send_fin()) {
CloseWriteSide(std::nullopt);
}
return absl::OkStatus();
}
bool EncapsulatedSession::InnerStream::CanWrite() const {
return session_->state_ != EncapsulatedSession::kSessionClosed &&
!write_side_closed_ &&
(pending_write_.size() <= session_->max_stream_data_buffered_);
}
void EncapsulatedSession::InnerStream::FlushPendingWrite() {
QUICHE_DCHECK(!write_side_closed_);
QUICHE_DCHECK(session_->writer_->CanWrite());
QUICHE_DCHECK(!pending_write_.empty());
absl::string_view to_write = pending_write_;
size_t bytes_written =
WriteInner(absl::MakeSpan(&to_write, 1), fin_buffered_);
if (bytes_written < to_write.size()) {
pending_write_ = pending_write_.substr(bytes_written);
return;
}
pending_write_.clear();
if (fin_buffered_) {
CloseWriteSide(std::nullopt);
}
if (!write_side_closed_ && visitor_ != nullptr) {
visitor_->OnCanWrite();
}
}
size_t EncapsulatedSession::InnerStream::WriteInner(
absl::Span<const absl::string_view> data, bool fin) {
size_t total_size = quiche::TotalStringViewSpanSize(data);
if (total_size == 0 && !fin) {
session_->OnFatalError("Attempted to make an empty write with fin=false");
return 0;
}
quiche::QuicheBuffer header =
quiche::SerializeWebTransportStreamCapsuleHeader(id_, fin, total_size,
session_->allocator_);
std::vector<absl::string_view> views_to_write;
views_to_write.reserve(data.size() + 1);
views_to_write.push_back(header.AsStringView());
absl::c_copy(data, std::back_inserter(views_to_write));
absl::Status write_status = session_->writer_->Writev(
views_to_write, quiche::kDefaultStreamWriteOptions);
if (!write_status.ok()) {
session_->OnWriteError(write_status);
return 0;
}
return total_size;
}
void EncapsulatedSession::InnerStream::AbruptlyTerminate(absl::Status error) {
QUICHE_DLOG(INFO) << "Abruptly terminating the stream due to error: "
<< error;
ResetDueToInternalError();
}
void EncapsulatedSession::InnerStream::ResetWithUserCode(
StreamErrorCode error) {
if (reset_frame_sent_) {
return;
}
reset_frame_sent_ = true;
session_->SendControlCapsule(
quiche::WebTransportResetStreamCapsule{id_, error});
CloseWriteSide(std::nullopt);
}
void EncapsulatedSession::InnerStream::SendStopSending(StreamErrorCode error) {
if (stop_sending_sent_) {
return;
}
stop_sending_sent_ = true;
session_->SendControlCapsule(
quiche::WebTransportStopSendingCapsule{id_, error});
CloseReadSide(std::nullopt);
}
void EncapsulatedSession::InnerStream::CloseReadSide(
std::optional<StreamErrorCode> error) {
if (read_side_closed_) {
return;
}
read_side_closed_ = true;
incoming_reads_.clear();
if (error.has_value() && visitor_ != nullptr) {
visitor_->OnResetStreamReceived(*error);
}
if (CanBeGarbageCollected()) {
session_->streams_to_garbage_collect_.push_back(id_);
}
}
void EncapsulatedSession::InnerStream::CloseWriteSide(
std::optional<StreamErrorCode> error) {
if (write_side_closed_) {
return;
}
write_side_closed_ = true;
pending_write_.clear();
absl::Status status = session_->scheduler_.Unregister(id_);
if (!status.ok()) {
session_->OnFatalError("Failed to unregister closed stream");
return;
}
if (error.has_value() && visitor_ != nullptr) {
visitor_->OnStopSendingReceived(*error);
}
if (CanBeGarbageCollected()) {
session_->streams_to_garbage_collect_.push_back(id_);
}
}
void EncapsulatedSession::GarbageCollectStreams() {
for (StreamId id : streams_to_garbage_collect_) {
streams_.erase(id);
}
streams_to_garbage_collect_.clear();
}
} // namespace webtransport