Move HTTP/3 Datagram parsing to QuicSpdySession The flag is marked as enabling_blocked_by until we fully support draft-ietf-masque-h3-datagram. Protected by FLAGS_quic_reloadable_flag_quic_h3_datagram. PiperOrigin-RevId: 359877243 Change-Id: I3b76e3e703b14b5311c091a215237b34f2020bf1
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc index 81c60b2..b6750fe 100644 --- a/quic/core/http/quic_spdy_session.cc +++ b/quic/core/http/quic_spdy_session.cc
@@ -518,9 +518,11 @@ QuicSpdySession::~QuicSpdySession() { QUIC_BUG_IF(destruction_indicator_ != 123456789) - << "QuicSpdyStream use after free. " << destruction_indicator_ + << "QuicSpdySession use after free. " << destruction_indicator_ << QuicStackTrace(); destruction_indicator_ = 987654321; + QUIC_BUG_IF(!h3_datagram_registrations_.empty()) + << "HTTP/3 datagram flow ID was not unregistered"; } void QuicSpdySession::Initialize() { @@ -1684,6 +1686,64 @@ return result; } +MessageStatus QuicSpdySession::SendHttp3Datagram(QuicDatagramFlowId flow_id, + absl::string_view payload) { + size_t slice_length = + QuicDataWriter::GetVarInt62Len(flow_id) + payload.length(); + QuicUniqueBufferPtr buffer = MakeUniqueBuffer( + connection()->helper()->GetStreamSendBufferAllocator(), slice_length); + QuicDataWriter writer(slice_length, buffer.get()); + if (!writer.WriteVarInt62(flow_id)) { + QUIC_BUG << "Failed to write HTTP/3 datagram flow ID"; + return MESSAGE_STATUS_INTERNAL_ERROR; + } + if (!writer.WriteBytes(payload.data(), payload.length())) { + QUIC_BUG << "Failed to write HTTP/3 datagram payload"; + return MESSAGE_STATUS_INTERNAL_ERROR; + } + + QuicMemSlice slice(std::move(buffer), slice_length); + return datagram_queue()->SendOrQueueDatagram(std::move(slice)); +} + +void QuicSpdySession::RegisterHttp3FlowId( + QuicDatagramFlowId flow_id, + QuicSpdySession::Http3DatagramVisitor* visitor) { + QUICHE_DCHECK_NE(visitor, nullptr); + auto insertion_result = h3_datagram_registrations_.insert({flow_id, visitor}); + QUIC_BUG_IF(!insertion_result.second) + << "Attempted to doubly register HTTP/3 flow ID " << flow_id; +} + +void QuicSpdySession::UnregisterHttp3FlowId(QuicDatagramFlowId flow_id) { + size_t num_erased = h3_datagram_registrations_.erase(flow_id); + QUIC_BUG_IF(num_erased != 1) + << "Attempted to unregister unknown HTTP/3 flow ID " << flow_id; +} + +void QuicSpdySession::OnMessageReceived(absl::string_view message) { + QuicSession::OnMessageReceived(message); + if (!h3_datagram_supported_) { + QUIC_DLOG(ERROR) << "Ignoring unexpected received HTTP/3 datagram"; + return; + } + QuicDataReader reader(message); + QuicDatagramFlowId flow_id; + if (!reader.ReadVarInt62(&flow_id)) { + QUIC_DLOG(ERROR) << "Failed to parse flow ID in received HTTP/3 datagram"; + return; + } + auto it = h3_datagram_registrations_.find(flow_id); + if (it == h3_datagram_registrations_.end()) { + // TODO(dschinazi) buffer unknown HTTP/3 datagram flow IDs for a short + // period of time in case they were reordered. + QUIC_DLOG(ERROR) << "Received unknown HTTP/3 datagram flow ID " << flow_id; + return; + } + absl::string_view payload = reader.ReadRemainingPayload(); + it->second->OnHttp3Datagram(flow_id, payload); +} + #undef ENDPOINT // undef for jumbo builds } // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h index cff9e63..6a47b5d 100644 --- a/quic/core/http/quic_spdy_session.h +++ b/quic/core/http/quic_spdy_session.h
@@ -420,6 +420,34 @@ // SETTINGS. bool h3_datagram_supported() const { return h3_datagram_supported_; } + // Sends an HTTP/3 datagram. The flow ID is not part of |payload|. + MessageStatus SendHttp3Datagram(QuicDatagramFlowId flow_id, + absl::string_view payload); + + class QUIC_EXPORT_PRIVATE Http3DatagramVisitor { + public: + virtual ~Http3DatagramVisitor() {} + + // Called when an HTTP/3 datagram is received. |payload| does not contain + // the flow ID. + virtual void OnHttp3Datagram(QuicDatagramFlowId flow_id, + absl::string_view payload) = 0; + }; + + // Registers |visitor| to receive HTTP/3 datagrams for flow ID |flow_id|. This + // must not be called on a previously register flow ID without first calling + // UnregisterHttp3FlowId. |visitor| must be valid until a corresponding call + // to UnregisterHttp3FlowId. The flow ID must be unregistered before the + // QuicSpdySession is destroyed. + void RegisterHttp3FlowId(QuicDatagramFlowId flow_id, + Http3DatagramVisitor* visitor); + + // Unregister a given HTTP/3 datagram flow ID. + void UnregisterHttp3FlowId(QuicDatagramFlowId flow_id); + + // Override from QuicSession to support HTTP/3 datagrams. + void OnMessageReceived(absl::string_view message) override; + protected: // Override CreateIncomingStream(), CreateOutgoingBidirectionalStream() and // CreateOutgoingUnidirectionalStream() with QuicSpdyStream return type to @@ -629,6 +657,9 @@ // Whether both this endpoint and our peer support HTTP/3 datagrams. bool h3_datagram_supported_ = false; + + absl::flat_hash_map<QuicDatagramFlowId, Http3DatagramVisitor*> + h3_datagram_registrations_; }; } // namespace quic
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc index 3985769..d231f78 100644 --- a/quic/core/http/quic_spdy_session_test.cc +++ b/quic/core/http/quic_spdy_session_test.cc
@@ -48,6 +48,7 @@ #include "quic/test_tools/quic_test_utils.h" #include "common/platform/api/quiche_text_utils.h" #include "common/quiche_endian.h" +#include "common/test_tools/quiche_test_utils.h" #include "spdy/core/spdy_framer.h" using spdy::kV3HighestPriority; @@ -60,6 +61,7 @@ using ::testing::_; using ::testing::AnyNumber; using ::testing::AtLeast; +using ::testing::ElementsAre; using ::testing::InSequence; using ::testing::Invoke; using ::testing::Return; @@ -3347,6 +3349,47 @@ EXPECT_TRUE(session_.h3_datagram_supported()); } +TEST_P(QuicSpdySessionTestClient, H3DatagramRegistration) { + if (!version().UsesHttp3()) { + return; + } + CompleteHandshake(); + SetQuicReloadableFlag(quic_h3_datagram, true); + QuicSpdySessionPeer::SetH3DatagramSupported(&session_, true); + SavingHttp3DatagramVisitor h3_datagram_visitor; + QuicDatagramFlowId flow_id = session_.GetNextDatagramFlowId(); + ASSERT_EQ(QuicDataWriter::GetVarInt62Len(flow_id), 1); + uint8_t datagram[256]; + datagram[0] = flow_id; + for (size_t i = 1; i < ABSL_ARRAYSIZE(datagram); i++) { + datagram[i] = i; + } + session_.RegisterHttp3FlowId(flow_id, &h3_datagram_visitor); + session_.OnMessageReceived(absl::string_view( + reinterpret_cast<const char*>(datagram), sizeof(datagram))); + EXPECT_THAT( + h3_datagram_visitor.received_h3_datagrams(), + ElementsAre(SavingHttp3DatagramVisitor::SavedHttp3Datagram{ + flow_id, std::string(reinterpret_cast<const char*>(datagram + 1), + sizeof(datagram) - 1)})); + session_.UnregisterHttp3FlowId(flow_id); +} + +TEST_P(QuicSpdySessionTestClient, SendHttp3Datagram) { + if (!version().UsesHttp3()) { + return; + } + CompleteHandshake(); + SetQuicReloadableFlag(quic_h3_datagram, true); + QuicSpdySessionPeer::SetH3DatagramSupported(&session_, true); + QuicDatagramFlowId flow_id = session_.GetNextDatagramFlowId(); + std::string h3_datagram_payload = {1, 2, 3, 4, 5, 6}; + EXPECT_CALL(*connection_, SendMessage(1, _, false)) + .WillOnce(Return(MESSAGE_STATUS_SUCCESS)); + EXPECT_EQ(session_.SendHttp3Datagram(flow_id, h3_datagram_payload), + MESSAGE_STATUS_SUCCESS); +} + } // namespace } // namespace test } // namespace quic
diff --git a/quic/masque/masque_client_session.cc b/quic/masque/masque_client_session.cc index 6c7859e..e158125 100644 --- a/quic/masque/masque_client_session.cc +++ b/quic/masque/masque_client_session.cc
@@ -29,8 +29,8 @@ compression_engine_(this) {} void MasqueClientSession::OnMessageReceived(absl::string_view message) { - QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length(); if (masque_mode_ == MasqueMode::kLegacy) { + QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length(); QuicConnectionId client_connection_id, server_connection_id; QuicSocketAddress target_server_address; std::vector<char> packet; @@ -58,31 +58,8 @@ << client_connection_id; return; } - QuicDataReader reader(message); - QuicDatagramFlowId flow_id; - if (!reader.ReadVarInt62(&flow_id)) { - QUIC_DLOG(ERROR) << "Failed to parse flow_id"; - return; - } - auto it = - absl::c_find_if(connect_udp_client_states_, - [flow_id](const ConnectUdpClientState& connect_udp) { - return connect_udp.flow_id() == flow_id; - }); - if (it == connect_udp_client_states_.end()) { - QUIC_DLOG(ERROR) << "Received unknown flow_id " << flow_id; - return; - } - EncapsulatedClientSession* encapsulated_client_session = - it->encapsulated_client_session(); - QuicSocketAddress target_server_address = it->target_server_address(); - QUICHE_DCHECK_NE(encapsulated_client_session, nullptr); - QUICHE_DCHECK(target_server_address.IsInitialized()); - absl::string_view packet = reader.ReadRemainingPayload(); - encapsulated_client_session->ProcessPacket(packet, target_server_address); - - QUIC_DVLOG(1) << "Sent " << packet.size() - << " bytes to connection for flow_id " << flow_id; + QUICHE_DCHECK_EQ(masque_mode_, MasqueMode::kOpen); + QuicSpdySession::OnMessageReceived(message); } void MasqueClientSession::OnMessageAcked(QuicMessageId message_id, @@ -130,8 +107,9 @@ return nullptr; } - connect_udp_client_states_.push_back(ConnectUdpClientState( - stream, encapsulated_client_session, flow_id, target_server_address)); + connect_udp_client_states_.push_back( + ConnectUdpClientState(stream, encapsulated_client_session, this, flow_id, + target_server_address)); return &connect_udp_client_states_.back(); } @@ -155,26 +133,12 @@ } QuicDatagramFlowId flow_id = connect_udp->flow_id(); - size_t slice_length = - QuicDataWriter::GetVarInt62Len(flow_id) + packet.length(); - QuicUniqueBufferPtr buffer = MakeUniqueBuffer( - connection()->helper()->GetStreamSendBufferAllocator(), slice_length); - QuicDataWriter writer(slice_length, buffer.get()); - if (!writer.WriteVarInt62(flow_id)) { - QUIC_BUG << "Failed to write flow_id"; - return; - } - if (!writer.WriteBytes(packet.data(), packet.length())) { - QUIC_BUG << "Failed to write packet"; - return; - } - - QuicMemSlice slice(std::move(buffer), slice_length); - MessageResult message_result = SendMessage(QuicMemSliceSpan(&slice)); + MessageStatus message_status = + SendHttp3Datagram(connect_udp->flow_id(), packet); QUIC_DVLOG(1) << "Sent packet to " << target_server_address << " compressed with flow ID " << flow_id - << " and got message result " << message_result; + << " and got message status " << message_status; } void MasqueClientSession::RegisterConnectionId( @@ -225,7 +189,7 @@ ConnectionCloseSource source) { QuicSpdyClientSession::OnConnectionClosed(frame, source); // Close all encapsulated sessions. - for (auto client_state : connect_udp_client_states_) { + for (const auto& client_state : connect_udp_client_states_) { client_state.encapsulated_client_session()->CloseConnection( QUIC_CONNECTION_CANCELLED, "Underlying MASQUE connection was closed", ConnectionCloseBehavior::SILENT_CLOSE); @@ -253,4 +217,55 @@ QuicSpdyClientSession::OnStreamClosed(stream_id); } +MasqueClientSession::ConnectUdpClientState::ConnectUdpClientState( + QuicSpdyClientStream* stream, + EncapsulatedClientSession* encapsulated_client_session, + MasqueClientSession* masque_session, + QuicDatagramFlowId flow_id, + const QuicSocketAddress& target_server_address) + : stream_(stream), + encapsulated_client_session_(encapsulated_client_session), + masque_session_(masque_session), + flow_id_(flow_id), + target_server_address_(target_server_address) { + QUICHE_DCHECK_NE(masque_session_, nullptr); + masque_session_->RegisterHttp3FlowId(this->flow_id(), this); +} + +MasqueClientSession::ConnectUdpClientState::~ConnectUdpClientState() { + if (flow_id_.has_value()) { + masque_session_->UnregisterHttp3FlowId(flow_id()); + } +} + +MasqueClientSession::ConnectUdpClientState::ConnectUdpClientState( + MasqueClientSession::ConnectUdpClientState&& other) { + *this = std::move(other); +} + +MasqueClientSession::ConnectUdpClientState& +MasqueClientSession::ConnectUdpClientState::operator=( + MasqueClientSession::ConnectUdpClientState&& other) { + stream_ = other.stream_; + encapsulated_client_session_ = other.encapsulated_client_session_; + masque_session_ = other.masque_session_; + flow_id_ = other.flow_id_; + target_server_address_ = other.target_server_address_; + other.flow_id_.reset(); + if (flow_id_.has_value()) { + masque_session_->UnregisterHttp3FlowId(flow_id()); + masque_session_->RegisterHttp3FlowId(flow_id(), this); + } + return *this; +} + +void MasqueClientSession::ConnectUdpClientState::OnHttp3Datagram( + QuicDatagramFlowId flow_id, + absl::string_view payload) { + QUICHE_DCHECK_EQ(flow_id, this->flow_id()); + encapsulated_client_session_->ProcessPacket(payload, target_server_address_); + QUIC_DVLOG(1) << "Sent " << payload.size() + << " bytes to connection for flow_id " << flow_id; +} + } // namespace quic
diff --git a/quic/masque/masque_client_session.h b/quic/masque/masque_client_session.h index f849c61..e055a76 100644 --- a/quic/masque/masque_client_session.h +++ b/quic/masque/masque_client_session.h
@@ -100,33 +100,47 @@ private: // State that the MasqueClientSession keeps for each CONNECT-UDP request. - class QUIC_NO_EXPORT ConnectUdpClientState { + class QUIC_NO_EXPORT ConnectUdpClientState + : public QuicSpdySession::Http3DatagramVisitor { public: // |stream| and |encapsulated_client_session| must be valid for the lifetime // of the ConnectUdpClientState. explicit ConnectUdpClientState( QuicSpdyClientStream* stream, EncapsulatedClientSession* encapsulated_client_session, + MasqueClientSession* masque_session, QuicDatagramFlowId flow_id, - const QuicSocketAddress& target_server_address) - : stream_(stream), - encapsulated_client_session_(encapsulated_client_session), - flow_id_(flow_id), - target_server_address_(target_server_address) {} + const QuicSocketAddress& target_server_address); + + ~ConnectUdpClientState(); + + // Disallow copy but allow move. + ConnectUdpClientState(const ConnectUdpClientState&) = delete; + ConnectUdpClientState(ConnectUdpClientState&&); + ConnectUdpClientState& operator=(const ConnectUdpClientState&) = delete; + ConnectUdpClientState& operator=(ConnectUdpClientState&&); QuicSpdyClientStream* stream() const { return stream_; } EncapsulatedClientSession* encapsulated_client_session() const { return encapsulated_client_session_; } - QuicDatagramFlowId flow_id() const { return flow_id_; } + QuicDatagramFlowId flow_id() const { + QUICHE_DCHECK(flow_id_.has_value()); + return *flow_id_; + } const QuicSocketAddress& target_server_address() const { return target_server_address_; } + // From QuicSpdySession::Http3DatagramVisitor. + void OnHttp3Datagram(QuicDatagramFlowId flow_id, + absl::string_view payload) override; + private: QuicSpdyClientStream* stream_; // Unowned. EncapsulatedClientSession* encapsulated_client_session_; // Unowned. - QuicDatagramFlowId flow_id_; + MasqueClientSession* masque_session_; // Unowned. + absl::optional<QuicDatagramFlowId> flow_id_; QuicSocketAddress target_server_address_; };
diff --git a/quic/masque/masque_server_session.cc b/quic/masque/masque_server_session.cc index de231ba..1e420b9 100644 --- a/quic/masque/masque_server_session.cc +++ b/quic/masque/masque_server_session.cc
@@ -7,6 +7,7 @@ #include <netdb.h> #include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" #include "quic/core/quic_data_reader.h" #include "quic/core/quic_udp_socket.h" #include "quic/tools/quic_url.h" @@ -95,11 +96,12 @@ compression_engine_(this), masque_mode_(masque_mode) { masque_server_backend_->RegisterBackendClient(connection_id(), this); + QUICHE_DCHECK_NE(epoll_server_, nullptr); } void MasqueServerSession::OnMessageReceived(absl::string_view message) { - QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length(); if (masque_mode_ == MasqueMode::kLegacy) { + QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length(); QuicConnectionId client_connection_id, server_connection_id; QuicSocketAddress target_server_address; std::vector<char> packet; @@ -132,33 +134,7 @@ return; } QUICHE_DCHECK_EQ(masque_mode_, MasqueMode::kOpen); - QuicDataReader reader(message); - QuicDatagramFlowId flow_id; - if (!reader.ReadVarInt62(&flow_id)) { - QUIC_DLOG(ERROR) << "Failed to read flow_id"; - return; - } - - auto it = - absl::c_find_if(connect_udp_server_states_, - [flow_id](const ConnectUdpServerState& connect_udp) { - return connect_udp.flow_id() == flow_id; - }); - if (it == connect_udp_server_states_.end()) { - QUIC_DLOG(ERROR) << "Received unknown flow_id " << flow_id; - return; - } - QuicSocketAddress target_server_address = it->target_server_address(); - QUICHE_DCHECK(target_server_address.IsInitialized()); - QuicUdpSocketFd fd = it->fd(); - QUICHE_DCHECK_NE(fd, kQuicInvalidSocketFd); - absl::string_view packet = reader.ReadRemainingPayload(); - QuicUdpSocketApi socket_api; - QuicUdpPacketInfo packet_info; - packet_info.SetPeerAddress(target_server_address); - WriteResult write_result = - socket_api.WritePacket(fd, packet.data(), packet.length(), packet_info); - QUIC_DVLOG(1) << "Wrote packet to server with result " << write_result; + QuicSpdySession::OnMessageReceived(message); } void MasqueServerSession::OnMessageAcked(QuicMessageId message_id, @@ -275,7 +251,7 @@ connect_udp_server_states_.emplace_back(ConnectUdpServerState( flow_id, request_handler->stream_id(), target_server_address, - fd_wrapper.extract_fd(), epoll_server_)); + fd_wrapper.extract_fd(), this)); spdy::Http2HeaderBlock response_headers; response_headers[":status"] = "200"; @@ -400,26 +376,12 @@ return; } // The packet is valid, send it to the client in a DATAGRAM frame. - size_t slice_length = QuicDataWriter::GetVarInt62Len(flow_id) + - read_result.packet_buffer.buffer_len; - QuicUniqueBufferPtr buffer = MakeUniqueBuffer( - connection()->helper()->GetStreamSendBufferAllocator(), slice_length); - QuicDataWriter writer(slice_length, buffer.get()); - if (!writer.WriteVarInt62(flow_id)) { - QUIC_BUG << "Failed to write flow_id"; - continue; - } - if (!writer.WriteBytes(read_result.packet_buffer.buffer, - read_result.packet_buffer.buffer_len)) { - QUIC_BUG << "Failed to write packet"; - continue; - } - QUICHE_DCHECK_EQ(writer.remaining(), 0u); - QuicMemSlice slice(std::move(buffer), slice_length); - MessageResult message_result = SendMessage(QuicMemSliceSpan(&slice)); + MessageStatus message_status = SendHttp3Datagram( + flow_id, absl::string_view(read_result.packet_buffer.buffer, + read_result.packet_buffer.buffer_len)); QUIC_DVLOG(1) << "Sent UDP packet from target server of length " << read_result.packet_buffer.buffer_len << " with flow ID " - << flow_id << " and got message result " << message_result; + << flow_id << " and got message status " << message_status; } } @@ -442,23 +404,27 @@ QuicStreamId stream_id, const QuicSocketAddress& target_server_address, QuicUdpSocketFd fd, - QuicEpollServer* epoll_server) + MasqueServerSession* masque_session) : flow_id_(flow_id), stream_id_(stream_id), target_server_address_(target_server_address), fd_(fd), - epoll_server_(epoll_server) { + masque_session_(masque_session) { QUICHE_DCHECK_NE(fd_, kQuicInvalidSocketFd); - QUICHE_DCHECK_NE(epoll_server_, nullptr); + QUICHE_DCHECK_NE(masque_session_, nullptr); + masque_session_->RegisterHttp3FlowId(this->flow_id(), this); } MasqueServerSession::ConnectUdpServerState::~ConnectUdpServerState() { + if (flow_id_.has_value()) { + masque_session_->UnregisterHttp3FlowId(flow_id()); + } if (fd_ == kQuicInvalidSocketFd) { return; } QuicUdpSocketApi socket_api; QUIC_DLOG(INFO) << "Closing fd " << fd_; - epoll_server_->UnregisterFD(fd_); + masque_session_->epoll_server()->UnregisterFD(fd_); socket_api.Destroy(fd_); } @@ -474,16 +440,33 @@ if (fd_ != kQuicInvalidSocketFd) { QuicUdpSocketApi socket_api; QUIC_DLOG(INFO) << "Closing fd " << fd_; - epoll_server_->UnregisterFD(fd_); + masque_session_->epoll_server()->UnregisterFD(fd_); socket_api.Destroy(fd_); } flow_id_ = other.flow_id_; stream_id_ = other.stream_id_; target_server_address_ = other.target_server_address_; fd_ = other.fd_; - epoll_server_ = other.epoll_server_; + masque_session_ = other.masque_session_; other.fd_ = kQuicInvalidSocketFd; + other.flow_id_.reset(); + if (flow_id_.has_value()) { + masque_session_->UnregisterHttp3FlowId(flow_id()); + masque_session_->RegisterHttp3FlowId(flow_id(), this); + } return *this; } +void MasqueServerSession::ConnectUdpServerState::OnHttp3Datagram( + QuicDatagramFlowId flow_id, + absl::string_view payload) { + QUICHE_DCHECK_EQ(flow_id, this->flow_id()); + QuicUdpSocketApi socket_api; + QuicUdpPacketInfo packet_info; + packet_info.SetPeerAddress(target_server_address_); + WriteResult write_result = socket_api.WritePacket( + fd_, payload.data(), payload.length(), packet_info); + QUIC_DVLOG(1) << "Wrote packet to server with result " << write_result; +} + } // namespace quic
diff --git a/quic/masque/masque_server_session.h b/quic/masque/masque_server_session.h index a73e641..14b8293 100644 --- a/quic/masque/masque_server_session.h +++ b/quic/masque/masque_server_session.h
@@ -83,9 +83,12 @@ // Handle packet for client, meant to be called by MasqueDispatcher. void HandlePacketFromServer(const ReceivedPacketInfo& packet_info); + QuicEpollServer* epoll_server() const { return epoll_server_; } + private: // State that the MasqueServerSession keeps for each CONNECT-UDP request. - class QUIC_NO_EXPORT ConnectUdpServerState { + class QUIC_NO_EXPORT ConnectUdpServerState + : public QuicSpdySession::Http3DatagramVisitor { public: // ConnectUdpServerState takes ownership of |fd|. It will unregister it // from |epoll_server| and close the file descriptor when destructed. @@ -94,7 +97,7 @@ QuicStreamId stream_id, const QuicSocketAddress& target_server_address, QuicUdpSocketFd fd, - QuicEpollServer* epoll_server); + MasqueServerSession* masque_session); ~ConnectUdpServerState(); @@ -104,19 +107,26 @@ ConnectUdpServerState& operator=(const ConnectUdpServerState&) = delete; ConnectUdpServerState& operator=(ConnectUdpServerState&&); - QuicDatagramFlowId flow_id() const { return flow_id_; } + QuicDatagramFlowId flow_id() const { + QUICHE_DCHECK(flow_id_.has_value()); + return *flow_id_; + } QuicStreamId stream_id() const { return stream_id_; } const QuicSocketAddress& target_server_address() const { return target_server_address_; } QuicUdpSocketFd fd() const { return fd_; } + // From QuicSpdySession::Http3DatagramVisitor. + void OnHttp3Datagram(QuicDatagramFlowId flow_id, + absl::string_view payload) override; + private: - QuicDatagramFlowId flow_id_; + absl::optional<QuicDatagramFlowId> flow_id_; QuicStreamId stream_id_; QuicSocketAddress target_server_address_; QuicUdpSocketFd fd_; // Owned. - QuicEpollServer* epoll_server_; // Unowned. + MasqueServerSession* masque_session_; // Unowned. }; MasqueServerBackend* masque_server_backend_; // Unowned.
diff --git a/quic/test_tools/quic_spdy_session_peer.cc b/quic/test_tools/quic_spdy_session_peer.cc index 58eedcc..d41ddfd 100644 --- a/quic/test_tools/quic_spdy_session_peer.cc +++ b/quic/test_tools/quic_spdy_session_peer.cc
@@ -109,5 +109,11 @@ return session->qpack_encoder_receive_stream_; } +// static +void QuicSpdySessionPeer::SetH3DatagramSupported(QuicSpdySession* session, + bool h3_datagram_supported) { + session->h3_datagram_supported_ = h3_datagram_supported; +} + } // namespace test } // namespace quic
diff --git a/quic/test_tools/quic_spdy_session_peer.h b/quic/test_tools/quic_spdy_session_peer.h index c0413c6..4ad0367 100644 --- a/quic/test_tools/quic_spdy_session_peer.h +++ b/quic/test_tools/quic_spdy_session_peer.h
@@ -57,6 +57,8 @@ QuicSpdySession* session); static QpackReceiveStream* GetQpackEncoderReceiveStream( QuicSpdySession* session); + static void SetH3DatagramSupported(QuicSpdySession* session, + bool h3_datagram_supported); }; } // namespace test
diff --git a/quic/test_tools/quic_test_utils.h b/quic/test_tools/quic_test_utils.h index fe255b4..6d5e52b 100644 --- a/quic/test_tools/quic_test_utils.h +++ b/quic/test_tools/quic_test_utils.h
@@ -2284,6 +2284,32 @@ const char* source_connection_id_bytes, uint8_t source_connection_id_length); +// Implementation of Http3DatagramVisitor which saves all received datagrams. +class SavingHttp3DatagramVisitor + : public QuicSpdySession::Http3DatagramVisitor { + public: + struct SavedHttp3Datagram { + QuicDatagramFlowId flow_id; + std::string payload; + bool operator==(const SavedHttp3Datagram& o) const { + return flow_id == o.flow_id && payload == o.payload; + } + }; + const std::vector<SavedHttp3Datagram>& received_h3_datagrams() const { + return received_h3_datagrams_; + } + + // Override from QuicSpdySession::Http3DatagramVisitor. + void OnHttp3Datagram(QuicDatagramFlowId flow_id, + absl::string_view payload) override { + received_h3_datagrams_.push_back( + SavedHttp3Datagram{flow_id, std::string(payload)}); + } + + private: + std::vector<SavedHttp3Datagram> received_h3_datagrams_; +}; + } // namespace test } // namespace quic