Implement HTTP/3 datagrams within WebTransport The version of HTTP/3 datagrams here uses the "Datagram-Flow-Id = sf-integer" header definition; however, since the HTTP/3 datagram is currently highly in flux, this is very likely to be not a final version of this. PiperOrigin-RevId: 367157595 Change-Id: I2f0a6c077c5cf91435809969d98b0fd0e418f0ec
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc index 5cb4db2..5e070dc 100644 --- a/quic/core/http/end_to_end_test.cc +++ b/quic/core/http/end_to_end_test.cc
@@ -5895,6 +5895,36 @@ EXPECT_EQ(received_data, "test"); } +TEST_P(EndToEndTest, WebTransportDatagrams) { + enable_web_transport_ = true; + ASSERT_TRUE(Initialize()); + + if (!version_.UsesHttp3()) { + return; + } + + WebTransportHttp3* session = + CreateWebTransportSession("/echo", /*wait_for_server_response=*/true); + ASSERT_TRUE(session != nullptr); + NiceMock<MockClientVisitor>& visitor = SetupWebTransportVisitor(session); + + SimpleBufferAllocator allocator; + for (int i = 0; i < 10; i++) { + absl::string_view datagram = "test"; + auto buffer = MakeUniqueBuffer(&allocator, datagram.size()); + memcpy(buffer.get(), datagram.data(), datagram.size()); + QuicMemSlice slice(std::move(buffer), datagram.size()); + session->SendOrQueueDatagram(std::move(slice)); + } + + int received = 0; + EXPECT_CALL(visitor, OnDatagramReceived(_)).WillRepeatedly([&received]() { + received++; + }); + client_->WaitUntil(5000, [&received]() { return received > 0; }); + EXPECT_GT(received, 0); +} + } // namespace } // namespace test } // namespace quic
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc index 86ddc94..fe75a05 100644 --- a/quic/core/http/quic_spdy_session.cc +++ b/quic/core/http/quic_spdy_session.cc
@@ -509,8 +509,6 @@ << "QuicSpdySession use after free. " << destruction_indicator_ << QuicStackTrace(); destruction_indicator_ = 987654321; - QUIC_BUG_IF(quic_bug_12477_3, !h3_datagram_registrations_.empty()) - << "HTTP/3 datagram flow ID was not unregistered"; } void QuicSpdySession::Initialize() { @@ -1687,6 +1685,14 @@ << "Attempted to unregister unknown HTTP/3 flow ID " << flow_id; } +void QuicSpdySession::SetMaxTimeInQueueForFlowId( + QuicDatagramFlowId /*flow_id*/, + QuicTime::Delta max_time_in_queue) { + // TODO(b/184598230): implement this in a way that works for multiple sessions + // on a same connection. + datagram_queue()->SetMaxTimeInQueue(max_time_in_queue); +} + void QuicSpdySession::OnMessageReceived(absl::string_view message) { QuicSession::OnMessageReceived(message); if (!h3_datagram_supported_) {
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h index f5903b3..e0bf096 100644 --- a/quic/core/http/quic_spdy_session.h +++ b/quic/core/http/quic_spdy_session.h
@@ -429,6 +429,10 @@ // Unregister a given HTTP/3 datagram flow ID. void UnregisterHttp3FlowId(QuicDatagramFlowId flow_id); + // Sets max time in queue for a specified datagram flow ID. + void SetMaxTimeInQueueForFlowId(QuicDatagramFlowId flow_id, + QuicTime::Delta max_time_in_queue); + // Override from QuicSession to support HTTP/3 datagrams. void OnMessageReceived(absl::string_view message) override;
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc index 3608946..7c22389 100644 --- a/quic/core/http/quic_spdy_session_test.cc +++ b/quic/core/http/quic_spdy_session_test.cc
@@ -569,6 +569,8 @@ headers.OnHeaderBlockStart(); headers.OnHeader(":method", "CONNECT"); headers.OnHeader(":protocol", "webtransport"); + headers.OnHeader("datagram-flow-id", + absl::StrCat(session_.GetNextDatagramFlowId())); stream->OnStreamHeaderList(/*fin=*/true, 0, headers); WebTransportHttp3* web_transport = session_.GetWebTransportSession(session_id);
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc index ac39f48..4d4cc95 100644 --- a/quic/core/http/quic_spdy_stream.cc +++ b/quic/core/http/quic_spdy_stream.cc
@@ -21,6 +21,7 @@ #include "quic/core/qpack/qpack_decoder.h" #include "quic/core/qpack/qpack_encoder.h" #include "quic/core/quic_error_codes.h" +#include "quic/core/quic_types.h" #include "quic/core/quic_utils.h" #include "quic/core/quic_versions.h" #include "quic/core/quic_write_blocked_list.h" @@ -1242,6 +1243,7 @@ std::string method; std::string protocol; + absl::optional<QuicDatagramFlowId> flow_id; for (const auto& header : header_list_) { const std::string& header_name = header.first; const std::string& header_value = header.second; @@ -1257,14 +1259,25 @@ } protocol = header_value; } + if (header_name == "datagram-flow-id") { + if (flow_id.has_value() || header_value.empty()) { + return; + } + QuicDatagramFlowId flow_id_out; + if (!absl::SimpleAtoi(header_value, &flow_id_out)) { + return; + } + flow_id = flow_id_out; + } } - if (method != "CONNECT" || protocol != "webtransport") { + if (method != "CONNECT" || protocol != "webtransport" || + !flow_id.has_value()) { return; } web_transport_ = - std::make_unique<WebTransportHttp3>(spdy_session_, this, id()); + std::make_unique<WebTransportHttp3>(spdy_session_, this, id(), *flow_id); } void QuicSpdyStream::MaybeProcessSentWebTransportHeaders( @@ -1286,8 +1299,11 @@ return; } + QuicDatagramFlowId flow_id = spdy_session_->GetNextDatagramFlowId(); + headers["datagram-flow-id"] = absl::StrCat(flow_id); + web_transport_ = - std::make_unique<WebTransportHttp3>(spdy_session_, this, id()); + std::make_unique<WebTransportHttp3>(spdy_session_, this, id(), flow_id); } void QuicSpdyStream::OnCanWriteNewData() {
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc index ea62753..bb464bd 100644 --- a/quic/core/http/quic_spdy_stream_test.cc +++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -3122,6 +3122,7 @@ spdy::SpdyHeaderBlock headers; headers[":method"] = "CONNECT"; headers[":protocol"] = "webtransport"; + headers["datagram-flow-id"] = absl::StrCat(session_->GetNextDatagramFlowId()); stream_->WriteHeaders(std::move(headers), /*fin=*/false, nullptr); ASSERT_TRUE(stream_->web_transport() != nullptr); EXPECT_EQ(stream_->id(), stream_->web_transport()->id()); @@ -3138,6 +3139,8 @@ headers_[":method"] = "CONNECT"; headers_[":protocol"] = "webtransport"; + headers_["datagram-flow-id"] = + absl::StrCat(session_->GetNextDatagramFlowId()); stream_->OnStreamHeadersPriority( spdy::SpdyStreamPrecedence(kV3HighestPriority));
diff --git a/quic/core/http/web_transport_http3.cc b/quic/core/http/web_transport_http3.cc index 3d1812e..2a105c2 100644 --- a/quic/core/http/web_transport_http3.cc +++ b/quic/core/http/web_transport_http3.cc
@@ -36,14 +36,17 @@ WebTransportHttp3::WebTransportHttp3(QuicSpdySession* session, QuicSpdyStream* connect_stream, - WebTransportSessionId id) + WebTransportSessionId id, + QuicDatagramFlowId flow_id) : session_(session), connect_stream_(connect_stream), id_(id), + flow_id_(flow_id), visitor_(std::make_unique<NoopWebTransportVisitor>()) { QUICHE_DCHECK(session_->SupportsWebTransport()); QUICHE_DCHECK(IsValidWebTransportSessionId(id, session_->version())); QUICHE_DCHECK_EQ(connect_stream_->id(), id); + session_->RegisterHttp3FlowId(flow_id, this); } void WebTransportHttp3::AssociateStream(QuicStreamId stream_id) { @@ -71,6 +74,7 @@ for (QuicStreamId id : streams) { session_->ResetStream(id, QUIC_STREAM_WEBTRANSPORT_SESSION_GONE); } + session_->UnregisterHttp3FlowId(flow_id_); } void WebTransportHttp3::HeadersReceived(const spdy::SpdyHeaderBlock& headers) { @@ -148,14 +152,20 @@ return stream->interface(); } -MessageStatus WebTransportHttp3::SendOrQueueDatagram( - QuicMemSlice /*datagram*/) { - // TODO(vasilvv): implement this. - return MessageStatus::MESSAGE_STATUS_UNSUPPORTED; +MessageStatus WebTransportHttp3::SendOrQueueDatagram(QuicMemSlice datagram) { + return session_->SendHttp3Datagram( + flow_id_, absl::string_view(datagram.data(), datagram.length())); } + void WebTransportHttp3::SetDatagramMaxTimeInQueue( - QuicTime::Delta /*max_time_in_queue*/) { - // TODO(vasilvv): implement this. + QuicTime::Delta max_time_in_queue) { + session_->SetMaxTimeInQueueForFlowId(flow_id_, max_time_in_queue); +} + +void WebTransportHttp3::OnHttp3Datagram(QuicDatagramFlowId flow_id, + absl::string_view payload) { + QUICHE_DCHECK_EQ(flow_id, flow_id_); + visitor_->OnDatagramReceived(payload); } WebTransportHttp3UnidirectionalStream::WebTransportHttp3UnidirectionalStream(
diff --git a/quic/core/http/web_transport_http3.h b/quic/core/http/web_transport_http3.h index cc622c2..0c91c2b 100644 --- a/quic/core/http/web_transport_http3.h +++ b/quic/core/http/web_transport_http3.h
@@ -9,6 +9,7 @@ #include "absl/container/flat_hash_set.h" #include "absl/types/optional.h" +#include "quic/core/http/quic_spdy_session.h" #include "quic/core/quic_stream.h" #include "quic/core/quic_types.h" #include "quic/core/web_transport_interface.h" @@ -25,11 +26,14 @@ // // WebTransport over HTTP/3 specification: // <https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3> -class QUIC_EXPORT_PRIVATE WebTransportHttp3 : public WebTransportSession { +class QUIC_EXPORT_PRIVATE WebTransportHttp3 + : public WebTransportSession, + public QuicSpdySession::Http3DatagramVisitor { public: WebTransportHttp3(QuicSpdySession* session, QuicSpdyStream* connect_stream, - WebTransportSessionId id); + WebTransportSessionId id, + QuicDatagramFlowId flow_id); void HeadersReceived(const spdy::SpdyHeaderBlock& headers); void SetVisitor(std::unique_ptr<WebTransportVisitor> visitor) { @@ -59,10 +63,14 @@ MessageStatus SendOrQueueDatagram(QuicMemSlice datagram) override; void SetDatagramMaxTimeInQueue(QuicTime::Delta max_time_in_queue) override; + void OnHttp3Datagram(QuicDatagramFlowId flow_id, + absl::string_view payload) override; + private: QuicSpdySession* const session_; // Unowned. QuicSpdyStream* const connect_stream_; // Unowned. const WebTransportSessionId id_; + const QuicDatagramFlowId flow_id_; // |ready_| is set to true when the peer has seen both sets of headers. bool ready_ = false; std::unique_ptr<WebTransportVisitor> visitor_;