Support unidirectional streams in WebTransport over HTTP/3.
This CL adds support for sending, receiving, buffering, tracking and closing unidirectional streams attached to a WebTransport over HTTP/3 session.
This also fills in some of the missing gaps in the API and testing infrastructure to make this work.
PiperOrigin-RevId: 366108288
Change-Id: I16b779f8905b597b5f948f02a7c37e656ae44659
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index 3ac82a4..8b67c47 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -61,6 +61,7 @@
#include "quic/test_tools/quic_test_client.h"
#include "quic/test_tools/quic_test_server.h"
#include "quic/test_tools/quic_test_utils.h"
+#include "quic/test_tools/quic_transport_test_tools.h"
#include "quic/test_tools/server_thread.h"
#include "quic/test_tools/simple_session_cache.h"
#include "quic/tools/quic_backend_response.h"
@@ -77,6 +78,7 @@
using spdy::SpdySerializedFrame;
using spdy::SpdySettingsIR;
using ::testing::_;
+using ::testing::Assign;
using ::testing::Invoke;
using ::testing::NiceMock;
@@ -709,10 +711,19 @@
if (wait_for_server_response) {
client_->WaitUntil(-1,
[stream]() { return stream->headers_decompressed(); });
+ EXPECT_TRUE(session->ready());
}
return session;
}
+ NiceMock<MockClientVisitor>& SetupWebTransportVisitor(
+ WebTransportHttp3* session) {
+ auto visitor_owned = std::make_unique<NiceMock<MockClientVisitor>>();
+ NiceMock<MockClientVisitor>& visitor = *visitor_owned;
+ session->SetVisitor(std::move(visitor_owned));
+ return visitor;
+ }
+
ScopedEnvironmentForThreads environment_;
bool initialized_;
// If true, the Initialize() function will create |client_| and starts to
@@ -5718,7 +5729,7 @@
ADD_FAILURE() << "Client should not have 10 resumption tickets.";
}
-TEST_P(EndToEndTest, WebTransportSession) {
+TEST_P(EndToEndTest, WebTransportSessionSetup) {
enable_web_transport_ = true;
ASSERT_TRUE(Initialize());
@@ -5757,6 +5768,71 @@
server_thread_->Resume();
}
+TEST_P(EndToEndTest, WebTransportSessionUnidirectionalStream) {
+ enable_web_transport_ = true;
+ ASSERT_TRUE(Initialize());
+
+ if (!version_.UsesHttp3()) {
+ return;
+ }
+
+ WebTransportHttp3* session =
+ CreateWebTransportSession("/echo", /*wait_for_server_response=*/true);
+ ASSERT_TRUE(session != nullptr);
+ NiceMock<MockClientVisitor>& visitor = SetupWebTransportVisitor(session);
+
+ WebTransportStream* outgoing_stream =
+ session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(outgoing_stream != nullptr);
+ EXPECT_TRUE(outgoing_stream->Write("test"));
+ EXPECT_TRUE(outgoing_stream->SendFin());
+
+ bool stream_received = false;
+ EXPECT_CALL(visitor, OnIncomingUnidirectionalStreamAvailable())
+ .WillOnce(Assign(&stream_received, true));
+ client_->WaitUntil(2000, [&stream_received]() { return stream_received; });
+ EXPECT_TRUE(stream_received);
+ WebTransportStream* received_stream =
+ session->AcceptIncomingUnidirectionalStream();
+ ASSERT_TRUE(received_stream != nullptr);
+ std::string received_data;
+ received_stream->Read(&received_data);
+ EXPECT_EQ(received_data, "test");
+}
+
+TEST_P(EndToEndTest, WebTransportSessionUnidirectionalStreamSentEarly) {
+ enable_web_transport_ = true;
+ SetPacketLossPercentage(30);
+ ASSERT_TRUE(Initialize());
+
+ if (!version_.UsesHttp3()) {
+ return;
+ }
+
+ WebTransportHttp3* session =
+ CreateWebTransportSession("/echo", /*wait_for_server_response=*/false);
+ ASSERT_TRUE(session != nullptr);
+ NiceMock<MockClientVisitor>& visitor = SetupWebTransportVisitor(session);
+
+ WebTransportStream* outgoing_stream =
+ session->OpenOutgoingUnidirectionalStream();
+ ASSERT_TRUE(outgoing_stream != nullptr);
+ EXPECT_TRUE(outgoing_stream->Write("test"));
+ EXPECT_TRUE(outgoing_stream->SendFin());
+
+ bool stream_received = false;
+ EXPECT_CALL(visitor, OnIncomingUnidirectionalStreamAvailable())
+ .WillOnce(Assign(&stream_received, true));
+ client_->WaitUntil(5000, [&stream_received]() { return stream_received; });
+ EXPECT_TRUE(stream_received);
+ WebTransportStream* received_stream =
+ session->AcceptIncomingUnidirectionalStream();
+ ASSERT_TRUE(received_stream != nullptr);
+ std::string received_data;
+ received_stream->Read(&received_data);
+ EXPECT_EQ(received_data, "test");
+}
+
} // namespace
} // namespace test
} // namespace quic
diff --git a/quic/core/http/http_constants.h b/quic/core/http/http_constants.h
index a5409b5..c13a1a8 100644
--- a/quic/core/http/http_constants.h
+++ b/quic/core/http/http_constants.h
@@ -22,6 +22,8 @@
// https://quicwg.org/base-drafts/draft-ietf-quic-qpack.html#enc-dec-stream-def
kQpackEncoderStream = 0x02,
kQpackDecoderStream = 0x03,
+ // https://ietf-wg-webtrans.github.io/draft-ietf-webtrans-http3/draft-ietf-webtrans-http3.html#name-unidirectional-streams
+ kWebTransportUnidirectionalStream = 0x54,
};
// This includes control stream, QPACK encoder stream, and QPACK decoder stream.
diff --git a/quic/core/http/quic_spdy_client_stream.cc b/quic/core/http/quic_spdy_client_stream.cc
index 931bfb3..d264ba6 100644
--- a/quic/core/http/quic_spdy_client_stream.cc
+++ b/quic/core/http/quic_spdy_client_stream.cc
@@ -10,6 +10,7 @@
#include "quic/core/http/quic_client_promised_info.h"
#include "quic/core/http/quic_spdy_client_session.h"
#include "quic/core/http/spdy_utils.h"
+#include "quic/core/http/web_transport_http3.h"
#include "quic/core/quic_alarm.h"
#include "quic/platform/api/quic_logging.h"
#include "spdy/core/spdy_protocol.h"
@@ -58,6 +59,15 @@
return;
}
+ if (web_transport() != nullptr) {
+ web_transport()->HeadersReceived(response_headers_);
+ if (!web_transport()->ready()) {
+ // Rejected due to status not being 200, or other reason.
+ WriteOrBufferData("", /*fin=*/true, nullptr);
+ return;
+ }
+ }
+
if (!ParseHeaderStatusCode(response_headers_, &response_code_)) {
QUIC_DLOG(ERROR) << "Received invalid response code: "
<< response_headers_[":status"].as_string()
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index 442e1eb..6d2006c 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -7,6 +7,7 @@
#include <algorithm>
#include <cstdint>
#include <limits>
+#include <memory>
#include <string>
#include <utility>
@@ -18,6 +19,7 @@
#include "quic/core/http/http_decoder.h"
#include "quic/core/http/http_frames.h"
#include "quic/core/http/quic_headers_stream.h"
+#include "quic/core/http/web_transport_http3.h"
#include "quic/core/quic_error_codes.h"
#include "quic/core/quic_types.h"
#include "quic/core/quic_utils.h"
@@ -55,6 +57,8 @@
namespace quic {
+ABSL_CONST_INIT const size_t kMaxUnassociatedWebTransportStreams = 24;
+
namespace {
#define ENDPOINT \
@@ -1438,18 +1442,37 @@
}
return qpack_decoder_receive_stream_;
}
- default:
- if (GetQuicReloadableFlag(quic_unify_stop_sending)) {
- QUIC_RELOADABLE_FLAG_COUNT(quic_unify_stop_sending);
- MaybeSendStopSendingFrame(pending->id(),
- QUIC_STREAM_STREAM_CREATION_ERROR);
- } else {
- // TODO(renjietang): deprecate SendStopSending() when the flag is
- // deprecated.
- SendStopSending(QUIC_STREAM_STREAM_CREATION_ERROR, pending->id());
+ case kWebTransportUnidirectionalStream: {
+ // Note that this checks whether WebTransport is enabled on the receiver
+ // side, as we may receive WebTransport streams before peer's SETTINGS are
+ // received.
+ // TODO(b/184156476): consider whether this means we should drop buffered
+ // streams if we don't receive indication of WebTransport support.
+ if (!WillNegotiateWebTransport()) {
+ // Treat as unknown stream type.
+ break;
}
- pending->StopReading();
+ QUIC_DVLOG(1) << ENDPOINT << "Created an incoming WebTransport stream "
+ << pending->id();
+ auto stream_owned =
+ std::make_unique<WebTransportHttp3UnidirectionalStream>(pending,
+ this);
+ WebTransportHttp3UnidirectionalStream* stream = stream_owned.get();
+ ActivateStream(std::move(stream_owned));
+ return stream;
+ }
+ default:
+ break;
}
+ if (GetQuicReloadableFlag(quic_unify_stop_sending)) {
+ QUIC_RELOADABLE_FLAG_COUNT(quic_unify_stop_sending);
+ MaybeSendStopSendingFrame(pending->id(), QUIC_STREAM_STREAM_CREATION_ERROR);
+ } else {
+ // TODO(renjietang): deprecate SendStopSending() when the flag is
+ // deprecated.
+ SendStopSending(QUIC_STREAM_STREAM_CREATION_ERROR, pending->id());
+ }
+ pending->StopReading();
return nullptr;
}
@@ -1790,6 +1813,77 @@
streams_waiting_for_settings_.insert(id);
}
+void QuicSpdySession::AssociateIncomingWebTransportStreamWithSession(
+ WebTransportSessionId session_id,
+ QuicStreamId stream_id) {
+ if (QuicUtils::IsOutgoingStreamId(version(), stream_id, perspective())) {
+ QUIC_BUG(AssociateIncomingWebTransportStreamWithSession got outgoing stream)
+ << ENDPOINT
+ << "AssociateIncomingWebTransportStreamWithSession() got an outgoing "
+ "stream ID: "
+ << stream_id;
+ return;
+ }
+ WebTransportHttp3* session = GetWebTransportSession(session_id);
+ if (session != nullptr) {
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Successfully associated incoming WebTransport stream "
+ << stream_id << " with session ID " << session_id;
+
+ session->AssociateStream(stream_id);
+ return;
+ }
+ // Evict the oldest streams until we are under the limit.
+ while (buffered_streams_.size() >= kMaxUnassociatedWebTransportStreams) {
+ QUIC_DVLOG(1) << ENDPOINT << "Removing stream "
+ << buffered_streams_.front().stream_id
+ << " from buffered streams as the queue is full.";
+ ResetStream(buffered_streams_.front().stream_id,
+ QUIC_STREAM_WEBTRANSPORT_BUFFERED_STREAMS_LIMIT_EXCEEDED);
+ buffered_streams_.pop_front();
+ }
+ QUIC_DVLOG(1) << ENDPOINT << "Received a WebTransport stream " << stream_id
+ << " for session ID " << session_id
+ << " but cannot associate it; buffering instead.";
+ buffered_streams_.push_back(
+ BufferedWebTransportStream{session_id, stream_id});
+}
+
+void QuicSpdySession::ProcessBufferedWebTransportStreamsForSession(
+ WebTransportHttp3* session) {
+ const WebTransportSessionId session_id = session->id();
+ QUIC_DVLOG(1) << "Processing buffered WebTransport streams for "
+ << session_id;
+ auto it = buffered_streams_.begin();
+ while (it != buffered_streams_.end()) {
+ if (it->session_id == session_id) {
+ QUIC_DVLOG(1) << "Unbuffered and associated WebTransport stream "
+ << it->stream_id << " with session " << it->session_id;
+ session->AssociateStream(it->stream_id);
+ it = buffered_streams_.erase(it);
+ } else {
+ it++;
+ }
+ }
+}
+
+WebTransportHttp3UnidirectionalStream*
+QuicSpdySession::CreateOutgoingUnidirectionalWebTransportStream(
+ WebTransportHttp3* session) {
+ if (!CanOpenNextOutgoingUnidirectionalStream()) {
+ return nullptr;
+ }
+
+ QuicStreamId stream_id = GetNextOutgoingUnidirectionalStreamId();
+ auto stream_owned = std::make_unique<WebTransportHttp3UnidirectionalStream>(
+ stream_id, this, session->id());
+ WebTransportHttp3UnidirectionalStream* stream = stream_owned.get();
+ ActivateStream(std::move(stream_owned));
+ stream->WritePreamble();
+ session->AssociateStream(stream_id);
+ return stream;
+}
+
#undef ENDPOINT // undef for jumbo builds
} // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index 3a9b60e..703c018 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -6,6 +6,7 @@
#define QUICHE_QUIC_CORE_HTTP_QUIC_SPDY_SESSION_H_
#include <cstddef>
+#include <list>
#include <memory>
#include <string>
@@ -25,10 +26,12 @@
#include "quic/core/qpack/qpack_encoder_stream_sender.h"
#include "quic/core/qpack/qpack_receive_stream.h"
#include "quic/core/qpack/qpack_send_stream.h"
+#include "quic/core/quic_circular_deque.h"
#include "quic/core/quic_session.h"
#include "quic/core/quic_time.h"
#include "quic/core/quic_types.h"
#include "quic/core/quic_versions.h"
+#include "quic/platform/api/quic_containers.h"
#include "quic/platform/api/quic_export.h"
#include "spdy/core/http2_frame_decoder_adapter.h"
@@ -38,6 +41,10 @@
class QuicSpdySessionPeer;
} // namespace test
+class WebTransportHttp3UnidirectionalStream;
+
+QUIC_EXPORT_PRIVATE extern const size_t kMaxUnassociatedWebTransportStreams;
+
class QUIC_EXPORT_PRIVATE Http3DebugVisitor {
public:
Http3DebugVisitor();
@@ -461,6 +468,24 @@
void OnStreamWaitingForClientSettings(QuicStreamId id);
+ // Links the specified stream with a WebTransport session. If the session is
+ // not present, it is buffered until a corresponding stream is found.
+ void AssociateIncomingWebTransportStreamWithSession(
+ WebTransportSessionId session_id,
+ QuicStreamId stream_id);
+
+ void ProcessBufferedWebTransportStreamsForSession(WebTransportHttp3* session);
+
+ bool CanOpenOutgoingUnidirectionalWebTransportStream(
+ WebTransportSessionId /*id*/) {
+ return CanOpenNextOutgoingUnidirectionalStream();
+ }
+
+ // Creates an outgoing unidirectional WebTransport stream. Returns nullptr if
+ // the stream cannot be created due to flow control or some other reason.
+ WebTransportHttp3UnidirectionalStream*
+ CreateOutgoingUnidirectionalWebTransportStream(WebTransportHttp3* session);
+
protected:
// Override CreateIncomingStream(), CreateOutgoingBidirectionalStream() and
// CreateOutgoingUnidirectionalStream() with QuicSpdyStream return type to
@@ -536,6 +561,11 @@
class SpdyFramerVisitor;
+ struct QUIC_EXPORT_PRIVATE BufferedWebTransportStream {
+ WebTransportSessionId session_id;
+ QuicStreamId stream_id;
+ };
+
// The following methods are called by the SimpleVisitor.
// Called when a HEADERS frame has been received.
@@ -683,6 +713,11 @@
// If ShouldBufferRequestsUntilSettings() is true, all streams that are
// blocked by that are tracked here.
absl::flat_hash_set<QuicStreamId> streams_waiting_for_settings_;
+
+ // WebTransport streams that do not have a session associated with them.
+ // Limited to kMaxUnassociatedWebTransportStreams; when the list is full,
+ // oldest streams are evicated first.
+ std::list<BufferedWebTransportStream> buffered_streams_;
};
} // namespace quic
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index d434931..cf51fd0 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -20,6 +20,8 @@
#include "quic/core/frames/quic_streams_blocked_frame.h"
#include "quic/core/http/http_constants.h"
#include "quic/core/http/http_encoder.h"
+#include "quic/core/http/quic_header_list.h"
+#include "quic/core/http/web_transport_http3.h"
#include "quic/core/qpack/qpack_header_table.h"
#include "quic/core/quic_config.h"
#include "quic/core/quic_crypto_stream.h"
@@ -526,6 +528,53 @@
testing::Mock::VerifyAndClearExpectations(connection_);
}
+ void ReceiveWebTransportSettings() {
+ SettingsFrame settings;
+ settings.values[SETTINGS_H3_DATAGRAM] = 1;
+ settings.values[SETTINGS_WEBTRANS_DRAFT00] = 1;
+ std::string data =
+ std::string(1, kControlStream) + EncodeSettings(settings);
+ QuicStreamId control_stream_id =
+ session_.perspective() == Perspective::IS_SERVER
+ ? GetNthClientInitiatedUnidirectionalStreamId(transport_version(),
+ 3)
+ : GetNthServerInitiatedUnidirectionalStreamId(transport_version(),
+ 3);
+ QuicStreamFrame frame(control_stream_id, /*fin=*/false, /*offset=*/0, data);
+ session_.OnStreamFrame(frame);
+ }
+
+ void ReceiveWebTransportSession(WebTransportSessionId session_id) {
+ SetQuicReloadableFlag(quic_accept_empty_stream_frame_with_no_fin, true);
+ QuicStreamFrame frame(session_id, /*fin=*/false, /*offset=*/0,
+ absl::string_view());
+ session_.OnStreamFrame(frame);
+ QuicSpdyStream* stream =
+ static_cast<QuicSpdyStream*>(session_.GetOrCreateStream(session_id));
+ QuicHeaderList headers;
+ headers.OnHeaderBlockStart();
+ headers.OnHeader(":method", "CONNECT");
+ headers.OnHeader(":protocol", "webtransport");
+ stream->OnStreamHeaderList(/*fin=*/true, 0, headers);
+ WebTransportHttp3* web_transport =
+ session_.GetWebTransportSession(session_id);
+ ASSERT_TRUE(web_transport != nullptr);
+ spdy::SpdyHeaderBlock header_block;
+ web_transport->HeadersReceived(header_block);
+ }
+
+ void ReceiveWebTransportUnidirectionalStream(WebTransportSessionId session_id,
+ QuicStreamId stream_id) {
+ char buffer[256];
+ QuicDataWriter data_writer(sizeof(buffer), buffer);
+ ASSERT_TRUE(data_writer.WriteVarInt62(kWebTransportUnidirectionalStream));
+ ASSERT_TRUE(data_writer.WriteVarInt62(session_id));
+ ASSERT_TRUE(data_writer.WriteStringPiece("test data"));
+ std::string data(buffer, data_writer.length());
+ QuicStreamFrame frame(stream_id, /*fin=*/false, /*offset=*/0, data);
+ session_.OnStreamFrame(frame);
+ }
+
MockQuicConnectionHelper helper_;
MockAlarmFactory alarm_factory_;
StrictMock<MockQuicConnection>* connection_;
@@ -3566,19 +3615,120 @@
CompleteHandshake();
- SettingsFrame server_settings;
- server_settings.values[SETTINGS_H3_DATAGRAM] = 1;
- server_settings.values[SETTINGS_WEBTRANS_DRAFT00] = 1;
- std::string data =
- std::string(1, kControlStream) + EncodeSettings(server_settings);
- QuicStreamId stream_id =
- GetNthClientInitiatedUnidirectionalStreamId(transport_version(), 3);
- QuicStreamFrame frame(stream_id, /*fin=*/false, /*offset=*/0, data);
- session_.OnStreamFrame(frame);
+ ReceiveWebTransportSettings();
EXPECT_TRUE(session_.SupportsWebTransport());
EXPECT_TRUE(session_.ShouldProcessIncomingRequests());
}
+TEST_P(QuicSpdySessionTestServer, BufferingIncomingStreams) {
+ if (!version().UsesHttp3()) {
+ return;
+ }
+ SetQuicReloadableFlag(quic_h3_datagram, true);
+ session_.set_supports_webtransport(true);
+
+ CompleteHandshake();
+ QuicStreamId session_id =
+ GetNthClientInitiatedBidirectionalStreamId(transport_version(), 1);
+
+ QuicStreamId data_stream_id =
+ GetNthClientInitiatedUnidirectionalStreamId(transport_version(), 4);
+ ReceiveWebTransportUnidirectionalStream(session_id, data_stream_id);
+
+ ReceiveWebTransportSettings();
+
+ ReceiveWebTransportSession(session_id);
+ WebTransportHttp3* web_transport =
+ session_.GetWebTransportSession(session_id);
+ ASSERT_TRUE(web_transport != nullptr);
+
+ EXPECT_EQ(web_transport->NumberOfAssociatedStreams(), 1u);
+
+ EXPECT_CALL(*connection_, SendControlFrame(_))
+ .WillRepeatedly(Invoke(&ClearControlFrame));
+ EXPECT_CALL(*connection_, OnStreamReset(session_id, _));
+ EXPECT_CALL(
+ *connection_,
+ OnStreamReset(data_stream_id, QUIC_STREAM_WEBTRANSPORT_SESSION_GONE));
+ session_.ResetStream(session_id, QUIC_STREAM_INTERNAL_ERROR);
+}
+
+TEST_P(QuicSpdySessionTestServer, BufferingIncomingStreamsLimit) {
+ if (!version().UsesHttp3()) {
+ return;
+ }
+ SetQuicReloadableFlag(quic_h3_datagram, true);
+ session_.set_supports_webtransport(true);
+
+ CompleteHandshake();
+ QuicStreamId session_id =
+ GetNthClientInitiatedBidirectionalStreamId(transport_version(), 1);
+
+ const int streams_to_send = kMaxUnassociatedWebTransportStreams + 4;
+ EXPECT_CALL(*connection_, SendControlFrame(_))
+ .WillRepeatedly(Invoke(&ClearControlFrame));
+ EXPECT_CALL(*connection_,
+ OnStreamReset(
+ _, QUIC_STREAM_WEBTRANSPORT_BUFFERED_STREAMS_LIMIT_EXCEEDED))
+ .Times(4);
+ for (int i = 0; i < streams_to_send; i++) {
+ QuicStreamId data_stream_id =
+ GetNthClientInitiatedUnidirectionalStreamId(transport_version(), 4 + i);
+ ReceiveWebTransportUnidirectionalStream(session_id, data_stream_id);
+ }
+
+ ReceiveWebTransportSettings();
+
+ ReceiveWebTransportSession(session_id);
+ WebTransportHttp3* web_transport =
+ session_.GetWebTransportSession(session_id);
+ ASSERT_TRUE(web_transport != nullptr);
+
+ EXPECT_EQ(web_transport->NumberOfAssociatedStreams(),
+ kMaxUnassociatedWebTransportStreams);
+
+ EXPECT_CALL(*connection_, SendControlFrame(_))
+ .WillRepeatedly(Invoke(&ClearControlFrame));
+ EXPECT_CALL(*connection_, OnStreamReset(_, _))
+ .Times(kMaxUnassociatedWebTransportStreams + 1);
+ session_.ResetStream(session_id, QUIC_STREAM_INTERNAL_ERROR);
+}
+
+TEST_P(QuicSpdySessionTestServer, ResetOutgoingWebTransportStreams) {
+ if (!version().UsesHttp3()) {
+ return;
+ }
+ SetQuicReloadableFlag(quic_h3_datagram, true);
+ session_.set_supports_webtransport(true);
+
+ CompleteHandshake();
+ QuicStreamId session_id =
+ GetNthClientInitiatedBidirectionalStreamId(transport_version(), 1);
+
+ ReceiveWebTransportSettings();
+ ReceiveWebTransportSession(session_id);
+ WebTransportHttp3* web_transport =
+ session_.GetWebTransportSession(session_id);
+ ASSERT_TRUE(web_transport != nullptr);
+
+ session_.set_writev_consumes_all_data(true);
+ EXPECT_TRUE(web_transport->CanOpenNextOutgoingUnidirectionalStream());
+ EXPECT_EQ(web_transport->NumberOfAssociatedStreams(), 0u);
+ WebTransportStream* stream =
+ web_transport->OpenOutgoingUnidirectionalStream();
+ EXPECT_EQ(web_transport->NumberOfAssociatedStreams(), 1u);
+ ASSERT_TRUE(stream != nullptr);
+ QuicStreamId stream_id = stream->GetStreamId();
+
+ EXPECT_CALL(*connection_, SendControlFrame(_))
+ .WillRepeatedly(Invoke(&ClearControlFrame));
+ EXPECT_CALL(*connection_, OnStreamReset(session_id, _));
+ EXPECT_CALL(*connection_,
+ OnStreamReset(stream_id, QUIC_STREAM_WEBTRANSPORT_SESSION_GONE));
+ session_.ResetStream(session_id, QUIC_STREAM_INTERNAL_ERROR);
+ EXPECT_EQ(web_transport->NumberOfAssociatedStreams(), 0u);
+}
+
} // namespace
} // namespace test
} // namespace quic
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index 1302bf2..1a03885 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -857,6 +857,10 @@
visitor_ = nullptr;
visitor->OnClose(this);
}
+
+ if (web_transport_ != nullptr) {
+ web_transport_->CloseAllAssociatedStreams();
+ }
}
void QuicSpdyStream::OnCanWrite() {
diff --git a/quic/core/http/web_transport_http3.cc b/quic/core/http/web_transport_http3.cc
index 9523d99..dac35f7 100644
--- a/quic/core/http/web_transport_http3.cc
+++ b/quic/core/http/web_transport_http3.cc
@@ -6,11 +6,21 @@
#include <memory>
+#include "absl/strings/string_view.h"
#include "quic/core/http/quic_spdy_session.h"
#include "quic/core/http/quic_spdy_stream.h"
+#include "quic/core/quic_data_reader.h"
+#include "quic/core/quic_data_writer.h"
+#include "quic/core/quic_stream.h"
+#include "quic/core/quic_types.h"
#include "quic/core/quic_utils.h"
+#include "quic/core/quic_versions.h"
+#include "quic/platform/api/quic_bug_tracker.h"
#include "common/platform/api/quiche_logging.h"
+#define ENDPOINT \
+ (session_->perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
+
namespace quic {
namespace {
@@ -36,18 +46,68 @@
QUICHE_DCHECK_EQ(connect_stream_->id(), id);
}
-void WebTransportHttp3::HeadersReceived(
- const spdy::SpdyHeaderBlock& /*headers*/) {
+void WebTransportHttp3::AssociateStream(QuicStreamId stream_id) {
+ streams_.insert(stream_id);
+
+ ParsedQuicVersion version = session_->version();
+ if (QuicUtils::IsOutgoingStreamId(version, stream_id,
+ session_->perspective())) {
+ return;
+ }
+ if (QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
+ incoming_bidirectional_streams_.push_back(stream_id);
+ visitor_->OnIncomingBidirectionalStreamAvailable();
+ } else {
+ incoming_unidirectional_streams_.push_back(stream_id);
+ visitor_->OnIncomingUnidirectionalStreamAvailable();
+ }
+}
+
+void WebTransportHttp3::CloseAllAssociatedStreams() {
+ // Copy the stream list before iterating over it, as calls to ResetStream()
+ // can potentially mutate the |session_| list.
+ std::vector<QuicStreamId> streams(streams_.begin(), streams_.end());
+ streams_.clear();
+ for (QuicStreamId id : streams) {
+ session_->ResetStream(id, QUIC_STREAM_WEBTRANSPORT_SESSION_GONE);
+ }
+}
+
+void WebTransportHttp3::HeadersReceived(const spdy::SpdyHeaderBlock& headers) {
+ if (session_->perspective() == Perspective::IS_CLIENT) {
+ auto it = headers.find(":status");
+ if (it == headers.end() || it->second != "200") {
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Received WebTransport headers from server without "
+ "status 200, rejecting.";
+ return;
+ }
+ }
+
+ QUIC_DVLOG(1) << ENDPOINT << "WebTransport session " << id_ << " ready.";
ready_ = true;
visitor_->OnSessionReady();
+ session_->ProcessBufferedWebTransportStreamsForSession(this);
}
WebTransportStream* WebTransportHttp3::AcceptIncomingBidirectionalStream() {
// TODO(vasilvv): implement this.
return nullptr;
}
+
WebTransportStream* WebTransportHttp3::AcceptIncomingUnidirectionalStream() {
- // TODO(vasilvv): implement this.
+ while (!incoming_unidirectional_streams_.empty()) {
+ QuicStreamId id = incoming_unidirectional_streams_.front();
+ incoming_unidirectional_streams_.pop_front();
+ QuicStream* stream = session_->GetOrCreateStream(id);
+ if (stream == nullptr) {
+ // Skip the streams that were reset in between the time they were
+ // receieved and the time the client has polled for them.
+ continue;
+ }
+ return static_cast<WebTransportHttp3UnidirectionalStream*>(stream)
+ ->interface();
+ }
return nullptr;
}
@@ -56,16 +116,21 @@
return false;
}
bool WebTransportHttp3::CanOpenNextOutgoingUnidirectionalStream() {
- // TODO(vasilvv): implement this.
- return false;
+ return session_->CanOpenOutgoingUnidirectionalWebTransportStream(id_);
}
WebTransportStream* WebTransportHttp3::OpenOutgoingBidirectionalStream() {
// TODO(vasilvv): implement this.
return nullptr;
}
+
WebTransportStream* WebTransportHttp3::OpenOutgoingUnidirectionalStream() {
- // TODO(vasilvv): implement this.
- return nullptr;
+ WebTransportHttp3UnidirectionalStream* stream =
+ session_->CreateOutgoingUnidirectionalWebTransportStream(this);
+ if (stream == nullptr) {
+ // If stream cannot be created due to flow control, return nullptr.
+ return nullptr;
+ }
+ return stream->interface();
}
MessageStatus WebTransportHttp3::SendOrQueueDatagram(
@@ -78,4 +143,104 @@
// TODO(vasilvv): implement this.
}
+WebTransportHttp3UnidirectionalStream::WebTransportHttp3UnidirectionalStream(
+ PendingStream* pending,
+ QuicSpdySession* session)
+ : QuicStream(pending, session, READ_UNIDIRECTIONAL, /*is_static=*/false),
+ session_(session),
+ adapter_(session, this, sequencer()),
+ needs_to_send_preamble_(false) {}
+
+WebTransportHttp3UnidirectionalStream::WebTransportHttp3UnidirectionalStream(
+ QuicStreamId id,
+ QuicSpdySession* session,
+ WebTransportSessionId session_id)
+ : QuicStream(id, session, /*is_static=*/false, WRITE_UNIDIRECTIONAL),
+ session_(session),
+ adapter_(session, this, sequencer()),
+ session_id_(session_id),
+ needs_to_send_preamble_(true) {}
+
+void WebTransportHttp3UnidirectionalStream::WritePreamble() {
+ if (!needs_to_send_preamble_ || !session_id_.has_value()) {
+ QUIC_BUG(WebTransportHttp3UnidirectionalStream duplicate preamble)
+ << ENDPOINT << "Sending preamble on stream ID " << id()
+ << " at the wrong time.";
+ OnUnrecoverableError(QUIC_INTERNAL_ERROR,
+ "Attempting to send a WebTransport unidirectional "
+ "stream preamble at the wrong time.");
+ return;
+ }
+
+ QuicConnection::ScopedPacketFlusher flusher(session_->connection());
+ char buffer[sizeof(uint64_t) * 2]; // varint62, varint62
+ QuicDataWriter writer(sizeof(buffer), buffer);
+ bool success = true;
+ success = success && writer.WriteVarInt62(kWebTransportUnidirectionalStream);
+ success = success && writer.WriteVarInt62(*session_id_);
+ QUICHE_DCHECK(success);
+ WriteOrBufferData(absl::string_view(buffer, writer.length()), /*fin=*/false,
+ /*ack_listener=*/nullptr);
+ QUIC_DVLOG(1) << ENDPOINT << "Sent stream type and session ID ("
+ << *session_id_ << ") on WebTransport stream " << id();
+ needs_to_send_preamble_ = false;
+}
+
+bool WebTransportHttp3UnidirectionalStream::ReadSessionId() {
+ iovec iov;
+ if (!sequencer()->GetReadableRegion(&iov)) {
+ return false;
+ }
+ QuicDataReader reader(static_cast<const char*>(iov.iov_base), iov.iov_len);
+ WebTransportSessionId session_id;
+ uint8_t session_id_length = reader.PeekVarInt62Length();
+ if (!reader.ReadVarInt62(&session_id)) {
+ // If all of the data has been received, and we still cannot associate the
+ // stream with a session, consume all of the data so that the stream can
+ // be closed.
+ if (sequencer()->NumBytesConsumed() + sequencer()->NumBytesBuffered() >=
+ sequencer()->close_offset()) {
+ QUIC_DLOG(WARNING)
+ << ENDPOINT << "Failed to associate WebTransport stream " << id()
+ << " with a session because the stream ended prematurely.";
+ sequencer()->MarkConsumed(sequencer()->NumBytesBuffered());
+ }
+ return false;
+ }
+ sequencer()->MarkConsumed(session_id_length);
+ session_id_ = session_id;
+ session_->AssociateIncomingWebTransportStreamWithSession(session_id, id());
+ return true;
+}
+
+void WebTransportHttp3UnidirectionalStream::OnDataAvailable() {
+ if (!session_id_.has_value()) {
+ if (!ReadSessionId()) {
+ return;
+ }
+ }
+
+ adapter_.OnDataAvailable();
+}
+
+void WebTransportHttp3UnidirectionalStream::OnCanWriteNewData() {
+ adapter_.OnCanWriteNewData();
+}
+
+void WebTransportHttp3UnidirectionalStream::OnClose() {
+ QuicStream::OnClose();
+
+ if (!session_id_.has_value()) {
+ return;
+ }
+ WebTransportHttp3* session = session_->GetWebTransportSession(*session_id_);
+ if (session == nullptr) {
+ QUIC_DLOG(WARNING) << ENDPOINT << "WebTransport stream " << id()
+ << " attempted to notify parent session " << *session_id_
+ << ", but the session could not be found.";
+ return;
+ }
+ session->OnStreamClosed(id());
+}
+
} // namespace quic
diff --git a/quic/core/http/web_transport_http3.h b/quic/core/http/web_transport_http3.h
index b2da0ed..cc622c2 100644
--- a/quic/core/http/web_transport_http3.h
+++ b/quic/core/http/web_transport_http3.h
@@ -7,8 +7,12 @@
#include <memory>
+#include "absl/container/flat_hash_set.h"
+#include "absl/types/optional.h"
+#include "quic/core/quic_stream.h"
#include "quic/core/quic_types.h"
#include "quic/core/web_transport_interface.h"
+#include "quic/core/web_transport_stream_adapter.h"
#include "spdy/core/spdy_header_block.h"
namespace quic {
@@ -33,6 +37,13 @@
}
WebTransportSessionId id() { return id_; }
+ bool ready() { return ready_; }
+
+ void AssociateStream(QuicStreamId stream_id);
+ void OnStreamClosed(QuicStreamId stream_id) { streams_.erase(stream_id); }
+ void CloseAllAssociatedStreams();
+
+ size_t NumberOfAssociatedStreams() { return streams_.size(); }
// Return the earliest incoming stream that has been received by the session
// but has not been accepted. Returns nullptr if there are no incoming
@@ -55,6 +66,42 @@
// |ready_| is set to true when the peer has seen both sets of headers.
bool ready_ = false;
std::unique_ptr<WebTransportVisitor> visitor_;
+ absl::flat_hash_set<QuicStreamId> streams_;
+ QuicCircularDeque<QuicStreamId> incoming_bidirectional_streams_;
+ QuicCircularDeque<QuicStreamId> incoming_unidirectional_streams_;
+};
+
+class QUIC_EXPORT_PRIVATE WebTransportHttp3UnidirectionalStream
+ : public QuicStream {
+ public:
+ // Incoming stream.
+ WebTransportHttp3UnidirectionalStream(PendingStream* pending,
+ QuicSpdySession* session);
+ // Outgoing stream.
+ WebTransportHttp3UnidirectionalStream(QuicStreamId id,
+ QuicSpdySession* session,
+ WebTransportSessionId session_id);
+
+ // Sends the stream type and the session ID on the stream.
+ void WritePreamble();
+
+ // Implementation of QuicStream.
+ void OnDataAvailable() override;
+ void OnCanWriteNewData() override;
+ void OnClose() override;
+
+ WebTransportStream* interface() { return &adapter_; }
+ void SetUnblocked() { sequencer()->SetUnblocked(); }
+
+ private:
+ QuicSpdySession* session_;
+ WebTransportStreamAdapter adapter_;
+ absl::optional<WebTransportSessionId> session_id_;
+ bool needs_to_send_preamble_;
+
+ bool ReadSessionId();
+ // Closes the stream if all of the data has been received.
+ void MaybeCloseIncompleteStream();
};
} // namespace quic