blob: 820950b31bd5d69ddf758edb1662cff4cbc9c51b [file] [log] [blame]
// Copyright 2021 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quic/core/http/web_transport_http3.h"
#include <memory>
#include "absl/strings/string_view.h"
#include "quic/core/http/quic_spdy_session.h"
#include "quic/core/http/quic_spdy_stream.h"
#include "quic/core/quic_data_reader.h"
#include "quic/core/quic_data_writer.h"
#include "quic/core/quic_stream.h"
#include "quic/core/quic_types.h"
#include "quic/core/quic_utils.h"
#include "quic/core/quic_versions.h"
#include "quic/platform/api/quic_bug_tracker.h"
#include "common/platform/api/quiche_logging.h"
#define ENDPOINT \
(session_->perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
namespace quic {
namespace {
class QUIC_NO_EXPORT NoopWebTransportVisitor : public WebTransportVisitor {
void OnSessionReady() override {}
void OnIncomingBidirectionalStreamAvailable() override {}
void OnIncomingUnidirectionalStreamAvailable() override {}
void OnDatagramReceived(absl::string_view /*datagram*/) override {}
void OnCanCreateNewOutgoingBidirectionalStream() override {}
void OnCanCreateNewOutgoingUnidirectionalStream() override {}
};
} // namespace
WebTransportHttp3::WebTransportHttp3(QuicSpdySession* session,
QuicSpdyStream* connect_stream,
WebTransportSessionId id)
: session_(session),
connect_stream_(connect_stream),
id_(id),
visitor_(std::make_unique<NoopWebTransportVisitor>()) {
QUICHE_DCHECK(session_->SupportsWebTransport());
QUICHE_DCHECK(IsValidWebTransportSessionId(id, session_->version()));
QUICHE_DCHECK_EQ(connect_stream_->id(), id);
connect_stream_->RegisterHttp3DatagramRegistrationVisitor(this);
if (session_->perspective() == Perspective::IS_CLIENT) {
context_is_known_ = true;
context_currently_registered_ = true;
}
}
void WebTransportHttp3::AssociateStream(QuicStreamId stream_id) {
streams_.insert(stream_id);
ParsedQuicVersion version = session_->version();
if (QuicUtils::IsOutgoingStreamId(version, stream_id,
session_->perspective())) {
return;
}
if (QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
incoming_bidirectional_streams_.push_back(stream_id);
visitor_->OnIncomingBidirectionalStreamAvailable();
} else {
incoming_unidirectional_streams_.push_back(stream_id);
visitor_->OnIncomingUnidirectionalStreamAvailable();
}
}
void WebTransportHttp3::CloseAllAssociatedStreams() {
// Copy the stream list before iterating over it, as calls to ResetStream()
// can potentially mutate the |session_| list.
std::vector<QuicStreamId> streams(streams_.begin(), streams_.end());
streams_.clear();
for (QuicStreamId id : streams) {
session_->ResetStream(id, QUIC_STREAM_WEBTRANSPORT_SESSION_GONE);
}
if (context_currently_registered_) {
context_currently_registered_ = false;
connect_stream_->UnregisterHttp3DatagramContextId(context_id_);
}
connect_stream_->UnregisterHttp3DatagramRegistrationVisitor();
}
void WebTransportHttp3::HeadersReceived(const spdy::SpdyHeaderBlock& headers) {
if (session_->perspective() == Perspective::IS_CLIENT) {
auto it = headers.find(":status");
if (it == headers.end() || it->second != "200") {
QUIC_DVLOG(1) << ENDPOINT
<< "Received WebTransport headers from server without "
"status 200, rejecting.";
return;
}
}
QUIC_DVLOG(1) << ENDPOINT << "WebTransport session " << id_ << " ready.";
ready_ = true;
visitor_->OnSessionReady();
session_->ProcessBufferedWebTransportStreamsForSession(this);
}
WebTransportStream* WebTransportHttp3::AcceptIncomingBidirectionalStream() {
while (!incoming_bidirectional_streams_.empty()) {
QuicStreamId id = incoming_bidirectional_streams_.front();
incoming_bidirectional_streams_.pop_front();
QuicSpdyStream* stream = session_->GetOrCreateSpdyDataStream(id);
if (stream == nullptr) {
// Skip the streams that were reset in between the time they were
// receieved and the time the client has polled for them.
continue;
}
return stream->web_transport_stream();
}
return nullptr;
}
WebTransportStream* WebTransportHttp3::AcceptIncomingUnidirectionalStream() {
while (!incoming_unidirectional_streams_.empty()) {
QuicStreamId id = incoming_unidirectional_streams_.front();
incoming_unidirectional_streams_.pop_front();
QuicStream* stream = session_->GetOrCreateStream(id);
if (stream == nullptr) {
// Skip the streams that were reset in between the time they were
// receieved and the time the client has polled for them.
continue;
}
return static_cast<WebTransportHttp3UnidirectionalStream*>(stream)
->interface();
}
return nullptr;
}
bool WebTransportHttp3::CanOpenNextOutgoingBidirectionalStream() {
return session_->CanOpenOutgoingBidirectionalWebTransportStream(id_);
}
bool WebTransportHttp3::CanOpenNextOutgoingUnidirectionalStream() {
return session_->CanOpenOutgoingUnidirectionalWebTransportStream(id_);
}
WebTransportStream* WebTransportHttp3::OpenOutgoingBidirectionalStream() {
QuicSpdyStream* stream =
session_->CreateOutgoingBidirectionalWebTransportStream(this);
if (stream == nullptr) {
// If stream cannot be created due to flow control or other errors, return
// nullptr.
return nullptr;
}
return stream->web_transport_stream();
}
WebTransportStream* WebTransportHttp3::OpenOutgoingUnidirectionalStream() {
WebTransportHttp3UnidirectionalStream* stream =
session_->CreateOutgoingUnidirectionalWebTransportStream(this);
if (stream == nullptr) {
// If stream cannot be created due to flow control, return nullptr.
return nullptr;
}
return stream->interface();
}
MessageStatus WebTransportHttp3::SendOrQueueDatagram(QuicMemSlice datagram) {
return connect_stream_->SendHttp3Datagram(
context_id_, absl::string_view(datagram.data(), datagram.length()));
}
void WebTransportHttp3::SetDatagramMaxTimeInQueue(
QuicTime::Delta max_time_in_queue) {
connect_stream_->SetMaxDatagramTimeInQueue(max_time_in_queue);
}
void WebTransportHttp3::OnHttp3Datagram(
QuicStreamId stream_id, absl::optional<QuicDatagramContextId> context_id,
absl::string_view payload) {
QUICHE_DCHECK_EQ(stream_id, connect_stream_->id());
QUICHE_DCHECK(context_id == context_id_);
visitor_->OnDatagramReceived(payload);
}
void WebTransportHttp3::OnContextReceived(
QuicStreamId stream_id, absl::optional<QuicDatagramContextId> context_id,
const Http3DatagramContextExtensions& /*extensions*/) {
if (stream_id != connect_stream_->id()) {
QUIC_BUG(WT3 bad datagram context registration)
<< ENDPOINT << "Registered stream ID " << stream_id << ", expected "
<< connect_stream_->id();
return;
}
if (!context_is_known_) {
context_is_known_ = true;
context_id_ = context_id;
}
if (context_id != context_id_) {
QUIC_DLOG(INFO) << ENDPOINT << "Ignoring unexpected context ID "
<< (context_id.has_value() ? context_id.value() : 0)
<< " instead of "
<< (context_id_.has_value() ? context_id_.value() : 0)
<< " on stream ID " << connect_stream_->id();
return;
}
if (session_->perspective() == Perspective::IS_SERVER) {
if (context_currently_registered_) {
QUIC_DLOG(ERROR) << ENDPOINT << "Received duplicate context ID "
<< (context_id_.has_value() ? context_id_.value() : 0)
<< " on stream ID " << connect_stream_->id();
session_->ResetStream(connect_stream_->id(), QUIC_STREAM_CANCELLED);
return;
}
context_currently_registered_ = true;
Http3DatagramContextExtensions reply_extensions;
connect_stream_->RegisterHttp3DatagramContextId(context_id_,
reply_extensions, this);
}
}
void WebTransportHttp3::OnContextClosed(
QuicStreamId stream_id, absl::optional<QuicDatagramContextId> context_id,
const Http3DatagramContextExtensions& /*extensions*/) {
if (stream_id != connect_stream_->id()) {
QUIC_BUG(WT3 bad datagram context registration)
<< ENDPOINT << "Closed context on stream ID " << stream_id
<< ", expected " << connect_stream_->id();
return;
}
if (context_id != context_id_) {
QUIC_DLOG(INFO) << ENDPOINT << "Ignoring unexpected close of context ID "
<< (context_id.has_value() ? context_id.value() : 0)
<< " instead of "
<< (context_id_.has_value() ? context_id_.value() : 0)
<< " on stream ID " << connect_stream_->id();
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Received datagram context close on stream ID "
<< connect_stream_->id() << ", resetting stream";
session_->ResetStream(connect_stream_->id(), QUIC_STREAM_CANCELLED);
}
WebTransportHttp3UnidirectionalStream::WebTransportHttp3UnidirectionalStream(
PendingStream* pending,
QuicSpdySession* session)
: QuicStream(pending, session, READ_UNIDIRECTIONAL, /*is_static=*/false),
session_(session),
adapter_(session, this, sequencer()),
needs_to_send_preamble_(false) {}
WebTransportHttp3UnidirectionalStream::WebTransportHttp3UnidirectionalStream(
QuicStreamId id,
QuicSpdySession* session,
WebTransportSessionId session_id)
: QuicStream(id, session, /*is_static=*/false, WRITE_UNIDIRECTIONAL),
session_(session),
adapter_(session, this, sequencer()),
session_id_(session_id),
needs_to_send_preamble_(true) {}
void WebTransportHttp3UnidirectionalStream::WritePreamble() {
if (!needs_to_send_preamble_ || !session_id_.has_value()) {
QUIC_BUG(WebTransportHttp3UnidirectionalStream duplicate preamble)
<< ENDPOINT << "Sending preamble on stream ID " << id()
<< " at the wrong time.";
OnUnrecoverableError(QUIC_INTERNAL_ERROR,
"Attempting to send a WebTransport unidirectional "
"stream preamble at the wrong time.");
return;
}
QuicConnection::ScopedPacketFlusher flusher(session_->connection());
char buffer[sizeof(uint64_t) * 2]; // varint62, varint62
QuicDataWriter writer(sizeof(buffer), buffer);
bool success = true;
success = success && writer.WriteVarInt62(kWebTransportUnidirectionalStream);
success = success && writer.WriteVarInt62(*session_id_);
QUICHE_DCHECK(success);
WriteOrBufferData(absl::string_view(buffer, writer.length()), /*fin=*/false,
/*ack_listener=*/nullptr);
QUIC_DVLOG(1) << ENDPOINT << "Sent stream type and session ID ("
<< *session_id_ << ") on WebTransport stream " << id();
needs_to_send_preamble_ = false;
}
bool WebTransportHttp3UnidirectionalStream::ReadSessionId() {
iovec iov;
if (!sequencer()->GetReadableRegion(&iov)) {
return false;
}
QuicDataReader reader(static_cast<const char*>(iov.iov_base), iov.iov_len);
WebTransportSessionId session_id;
uint8_t session_id_length = reader.PeekVarInt62Length();
if (!reader.ReadVarInt62(&session_id)) {
// If all of the data has been received, and we still cannot associate the
// stream with a session, consume all of the data so that the stream can
// be closed.
if (sequencer()->NumBytesConsumed() + sequencer()->NumBytesBuffered() >=
sequencer()->close_offset()) {
QUIC_DLOG(WARNING)
<< ENDPOINT << "Failed to associate WebTransport stream " << id()
<< " with a session because the stream ended prematurely.";
sequencer()->MarkConsumed(sequencer()->NumBytesBuffered());
}
return false;
}
sequencer()->MarkConsumed(session_id_length);
session_id_ = session_id;
session_->AssociateIncomingWebTransportStreamWithSession(session_id, id());
return true;
}
void WebTransportHttp3UnidirectionalStream::OnDataAvailable() {
if (!session_id_.has_value()) {
if (!ReadSessionId()) {
return;
}
}
adapter_.OnDataAvailable();
}
void WebTransportHttp3UnidirectionalStream::OnCanWriteNewData() {
adapter_.OnCanWriteNewData();
}
void WebTransportHttp3UnidirectionalStream::OnClose() {
QuicStream::OnClose();
if (!session_id_.has_value()) {
return;
}
WebTransportHttp3* session = session_->GetWebTransportSession(*session_id_);
if (session == nullptr) {
QUIC_DLOG(WARNING) << ENDPOINT << "WebTransport stream " << id()
<< " attempted to notify parent session " << *session_id_
<< ", but the session could not be found.";
return;
}
session->OnStreamClosed(id());
}
} // namespace quic