Move HTTP/3 Datagram parsing to QuicSpdySession
The flag is marked as enabling_blocked_by until we fully support draft-ietf-masque-h3-datagram.
Protected by FLAGS_quic_reloadable_flag_quic_h3_datagram.
PiperOrigin-RevId: 359877243
Change-Id: I3b76e3e703b14b5311c091a215237b34f2020bf1
diff --git a/quic/masque/masque_client_session.cc b/quic/masque/masque_client_session.cc
index 6c7859e..e158125 100644
--- a/quic/masque/masque_client_session.cc
+++ b/quic/masque/masque_client_session.cc
@@ -29,8 +29,8 @@
compression_engine_(this) {}
void MasqueClientSession::OnMessageReceived(absl::string_view message) {
- QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length();
if (masque_mode_ == MasqueMode::kLegacy) {
+ QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length();
QuicConnectionId client_connection_id, server_connection_id;
QuicSocketAddress target_server_address;
std::vector<char> packet;
@@ -58,31 +58,8 @@
<< client_connection_id;
return;
}
- QuicDataReader reader(message);
- QuicDatagramFlowId flow_id;
- if (!reader.ReadVarInt62(&flow_id)) {
- QUIC_DLOG(ERROR) << "Failed to parse flow_id";
- return;
- }
- auto it =
- absl::c_find_if(connect_udp_client_states_,
- [flow_id](const ConnectUdpClientState& connect_udp) {
- return connect_udp.flow_id() == flow_id;
- });
- if (it == connect_udp_client_states_.end()) {
- QUIC_DLOG(ERROR) << "Received unknown flow_id " << flow_id;
- return;
- }
- EncapsulatedClientSession* encapsulated_client_session =
- it->encapsulated_client_session();
- QuicSocketAddress target_server_address = it->target_server_address();
- QUICHE_DCHECK_NE(encapsulated_client_session, nullptr);
- QUICHE_DCHECK(target_server_address.IsInitialized());
- absl::string_view packet = reader.ReadRemainingPayload();
- encapsulated_client_session->ProcessPacket(packet, target_server_address);
-
- QUIC_DVLOG(1) << "Sent " << packet.size()
- << " bytes to connection for flow_id " << flow_id;
+ QUICHE_DCHECK_EQ(masque_mode_, MasqueMode::kOpen);
+ QuicSpdySession::OnMessageReceived(message);
}
void MasqueClientSession::OnMessageAcked(QuicMessageId message_id,
@@ -130,8 +107,9 @@
return nullptr;
}
- connect_udp_client_states_.push_back(ConnectUdpClientState(
- stream, encapsulated_client_session, flow_id, target_server_address));
+ connect_udp_client_states_.push_back(
+ ConnectUdpClientState(stream, encapsulated_client_session, this, flow_id,
+ target_server_address));
return &connect_udp_client_states_.back();
}
@@ -155,26 +133,12 @@
}
QuicDatagramFlowId flow_id = connect_udp->flow_id();
- size_t slice_length =
- QuicDataWriter::GetVarInt62Len(flow_id) + packet.length();
- QuicUniqueBufferPtr buffer = MakeUniqueBuffer(
- connection()->helper()->GetStreamSendBufferAllocator(), slice_length);
- QuicDataWriter writer(slice_length, buffer.get());
- if (!writer.WriteVarInt62(flow_id)) {
- QUIC_BUG << "Failed to write flow_id";
- return;
- }
- if (!writer.WriteBytes(packet.data(), packet.length())) {
- QUIC_BUG << "Failed to write packet";
- return;
- }
-
- QuicMemSlice slice(std::move(buffer), slice_length);
- MessageResult message_result = SendMessage(QuicMemSliceSpan(&slice));
+ MessageStatus message_status =
+ SendHttp3Datagram(connect_udp->flow_id(), packet);
QUIC_DVLOG(1) << "Sent packet to " << target_server_address
<< " compressed with flow ID " << flow_id
- << " and got message result " << message_result;
+ << " and got message status " << message_status;
}
void MasqueClientSession::RegisterConnectionId(
@@ -225,7 +189,7 @@
ConnectionCloseSource source) {
QuicSpdyClientSession::OnConnectionClosed(frame, source);
// Close all encapsulated sessions.
- for (auto client_state : connect_udp_client_states_) {
+ for (const auto& client_state : connect_udp_client_states_) {
client_state.encapsulated_client_session()->CloseConnection(
QUIC_CONNECTION_CANCELLED, "Underlying MASQUE connection was closed",
ConnectionCloseBehavior::SILENT_CLOSE);
@@ -253,4 +217,55 @@
QuicSpdyClientSession::OnStreamClosed(stream_id);
}
+MasqueClientSession::ConnectUdpClientState::ConnectUdpClientState(
+ QuicSpdyClientStream* stream,
+ EncapsulatedClientSession* encapsulated_client_session,
+ MasqueClientSession* masque_session,
+ QuicDatagramFlowId flow_id,
+ const QuicSocketAddress& target_server_address)
+ : stream_(stream),
+ encapsulated_client_session_(encapsulated_client_session),
+ masque_session_(masque_session),
+ flow_id_(flow_id),
+ target_server_address_(target_server_address) {
+ QUICHE_DCHECK_NE(masque_session_, nullptr);
+ masque_session_->RegisterHttp3FlowId(this->flow_id(), this);
+}
+
+MasqueClientSession::ConnectUdpClientState::~ConnectUdpClientState() {
+ if (flow_id_.has_value()) {
+ masque_session_->UnregisterHttp3FlowId(flow_id());
+ }
+}
+
+MasqueClientSession::ConnectUdpClientState::ConnectUdpClientState(
+ MasqueClientSession::ConnectUdpClientState&& other) {
+ *this = std::move(other);
+}
+
+MasqueClientSession::ConnectUdpClientState&
+MasqueClientSession::ConnectUdpClientState::operator=(
+ MasqueClientSession::ConnectUdpClientState&& other) {
+ stream_ = other.stream_;
+ encapsulated_client_session_ = other.encapsulated_client_session_;
+ masque_session_ = other.masque_session_;
+ flow_id_ = other.flow_id_;
+ target_server_address_ = other.target_server_address_;
+ other.flow_id_.reset();
+ if (flow_id_.has_value()) {
+ masque_session_->UnregisterHttp3FlowId(flow_id());
+ masque_session_->RegisterHttp3FlowId(flow_id(), this);
+ }
+ return *this;
+}
+
+void MasqueClientSession::ConnectUdpClientState::OnHttp3Datagram(
+ QuicDatagramFlowId flow_id,
+ absl::string_view payload) {
+ QUICHE_DCHECK_EQ(flow_id, this->flow_id());
+ encapsulated_client_session_->ProcessPacket(payload, target_server_address_);
+ QUIC_DVLOG(1) << "Sent " << payload.size()
+ << " bytes to connection for flow_id " << flow_id;
+}
+
} // namespace quic
diff --git a/quic/masque/masque_client_session.h b/quic/masque/masque_client_session.h
index f849c61..e055a76 100644
--- a/quic/masque/masque_client_session.h
+++ b/quic/masque/masque_client_session.h
@@ -100,33 +100,47 @@
private:
// State that the MasqueClientSession keeps for each CONNECT-UDP request.
- class QUIC_NO_EXPORT ConnectUdpClientState {
+ class QUIC_NO_EXPORT ConnectUdpClientState
+ : public QuicSpdySession::Http3DatagramVisitor {
public:
// |stream| and |encapsulated_client_session| must be valid for the lifetime
// of the ConnectUdpClientState.
explicit ConnectUdpClientState(
QuicSpdyClientStream* stream,
EncapsulatedClientSession* encapsulated_client_session,
+ MasqueClientSession* masque_session,
QuicDatagramFlowId flow_id,
- const QuicSocketAddress& target_server_address)
- : stream_(stream),
- encapsulated_client_session_(encapsulated_client_session),
- flow_id_(flow_id),
- target_server_address_(target_server_address) {}
+ const QuicSocketAddress& target_server_address);
+
+ ~ConnectUdpClientState();
+
+ // Disallow copy but allow move.
+ ConnectUdpClientState(const ConnectUdpClientState&) = delete;
+ ConnectUdpClientState(ConnectUdpClientState&&);
+ ConnectUdpClientState& operator=(const ConnectUdpClientState&) = delete;
+ ConnectUdpClientState& operator=(ConnectUdpClientState&&);
QuicSpdyClientStream* stream() const { return stream_; }
EncapsulatedClientSession* encapsulated_client_session() const {
return encapsulated_client_session_;
}
- QuicDatagramFlowId flow_id() const { return flow_id_; }
+ QuicDatagramFlowId flow_id() const {
+ QUICHE_DCHECK(flow_id_.has_value());
+ return *flow_id_;
+ }
const QuicSocketAddress& target_server_address() const {
return target_server_address_;
}
+ // From QuicSpdySession::Http3DatagramVisitor.
+ void OnHttp3Datagram(QuicDatagramFlowId flow_id,
+ absl::string_view payload) override;
+
private:
QuicSpdyClientStream* stream_; // Unowned.
EncapsulatedClientSession* encapsulated_client_session_; // Unowned.
- QuicDatagramFlowId flow_id_;
+ MasqueClientSession* masque_session_; // Unowned.
+ absl::optional<QuicDatagramFlowId> flow_id_;
QuicSocketAddress target_server_address_;
};
diff --git a/quic/masque/masque_server_session.cc b/quic/masque/masque_server_session.cc
index de231ba..1e420b9 100644
--- a/quic/masque/masque_server_session.cc
+++ b/quic/masque/masque_server_session.cc
@@ -7,6 +7,7 @@
#include <netdb.h>
#include "absl/strings/str_cat.h"
+#include "absl/strings/string_view.h"
#include "quic/core/quic_data_reader.h"
#include "quic/core/quic_udp_socket.h"
#include "quic/tools/quic_url.h"
@@ -95,11 +96,12 @@
compression_engine_(this),
masque_mode_(masque_mode) {
masque_server_backend_->RegisterBackendClient(connection_id(), this);
+ QUICHE_DCHECK_NE(epoll_server_, nullptr);
}
void MasqueServerSession::OnMessageReceived(absl::string_view message) {
- QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length();
if (masque_mode_ == MasqueMode::kLegacy) {
+ QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length();
QuicConnectionId client_connection_id, server_connection_id;
QuicSocketAddress target_server_address;
std::vector<char> packet;
@@ -132,33 +134,7 @@
return;
}
QUICHE_DCHECK_EQ(masque_mode_, MasqueMode::kOpen);
- QuicDataReader reader(message);
- QuicDatagramFlowId flow_id;
- if (!reader.ReadVarInt62(&flow_id)) {
- QUIC_DLOG(ERROR) << "Failed to read flow_id";
- return;
- }
-
- auto it =
- absl::c_find_if(connect_udp_server_states_,
- [flow_id](const ConnectUdpServerState& connect_udp) {
- return connect_udp.flow_id() == flow_id;
- });
- if (it == connect_udp_server_states_.end()) {
- QUIC_DLOG(ERROR) << "Received unknown flow_id " << flow_id;
- return;
- }
- QuicSocketAddress target_server_address = it->target_server_address();
- QUICHE_DCHECK(target_server_address.IsInitialized());
- QuicUdpSocketFd fd = it->fd();
- QUICHE_DCHECK_NE(fd, kQuicInvalidSocketFd);
- absl::string_view packet = reader.ReadRemainingPayload();
- QuicUdpSocketApi socket_api;
- QuicUdpPacketInfo packet_info;
- packet_info.SetPeerAddress(target_server_address);
- WriteResult write_result =
- socket_api.WritePacket(fd, packet.data(), packet.length(), packet_info);
- QUIC_DVLOG(1) << "Wrote packet to server with result " << write_result;
+ QuicSpdySession::OnMessageReceived(message);
}
void MasqueServerSession::OnMessageAcked(QuicMessageId message_id,
@@ -275,7 +251,7 @@
connect_udp_server_states_.emplace_back(ConnectUdpServerState(
flow_id, request_handler->stream_id(), target_server_address,
- fd_wrapper.extract_fd(), epoll_server_));
+ fd_wrapper.extract_fd(), this));
spdy::Http2HeaderBlock response_headers;
response_headers[":status"] = "200";
@@ -400,26 +376,12 @@
return;
}
// The packet is valid, send it to the client in a DATAGRAM frame.
- size_t slice_length = QuicDataWriter::GetVarInt62Len(flow_id) +
- read_result.packet_buffer.buffer_len;
- QuicUniqueBufferPtr buffer = MakeUniqueBuffer(
- connection()->helper()->GetStreamSendBufferAllocator(), slice_length);
- QuicDataWriter writer(slice_length, buffer.get());
- if (!writer.WriteVarInt62(flow_id)) {
- QUIC_BUG << "Failed to write flow_id";
- continue;
- }
- if (!writer.WriteBytes(read_result.packet_buffer.buffer,
- read_result.packet_buffer.buffer_len)) {
- QUIC_BUG << "Failed to write packet";
- continue;
- }
- QUICHE_DCHECK_EQ(writer.remaining(), 0u);
- QuicMemSlice slice(std::move(buffer), slice_length);
- MessageResult message_result = SendMessage(QuicMemSliceSpan(&slice));
+ MessageStatus message_status = SendHttp3Datagram(
+ flow_id, absl::string_view(read_result.packet_buffer.buffer,
+ read_result.packet_buffer.buffer_len));
QUIC_DVLOG(1) << "Sent UDP packet from target server of length "
<< read_result.packet_buffer.buffer_len << " with flow ID "
- << flow_id << " and got message result " << message_result;
+ << flow_id << " and got message status " << message_status;
}
}
@@ -442,23 +404,27 @@
QuicStreamId stream_id,
const QuicSocketAddress& target_server_address,
QuicUdpSocketFd fd,
- QuicEpollServer* epoll_server)
+ MasqueServerSession* masque_session)
: flow_id_(flow_id),
stream_id_(stream_id),
target_server_address_(target_server_address),
fd_(fd),
- epoll_server_(epoll_server) {
+ masque_session_(masque_session) {
QUICHE_DCHECK_NE(fd_, kQuicInvalidSocketFd);
- QUICHE_DCHECK_NE(epoll_server_, nullptr);
+ QUICHE_DCHECK_NE(masque_session_, nullptr);
+ masque_session_->RegisterHttp3FlowId(this->flow_id(), this);
}
MasqueServerSession::ConnectUdpServerState::~ConnectUdpServerState() {
+ if (flow_id_.has_value()) {
+ masque_session_->UnregisterHttp3FlowId(flow_id());
+ }
if (fd_ == kQuicInvalidSocketFd) {
return;
}
QuicUdpSocketApi socket_api;
QUIC_DLOG(INFO) << "Closing fd " << fd_;
- epoll_server_->UnregisterFD(fd_);
+ masque_session_->epoll_server()->UnregisterFD(fd_);
socket_api.Destroy(fd_);
}
@@ -474,16 +440,33 @@
if (fd_ != kQuicInvalidSocketFd) {
QuicUdpSocketApi socket_api;
QUIC_DLOG(INFO) << "Closing fd " << fd_;
- epoll_server_->UnregisterFD(fd_);
+ masque_session_->epoll_server()->UnregisterFD(fd_);
socket_api.Destroy(fd_);
}
flow_id_ = other.flow_id_;
stream_id_ = other.stream_id_;
target_server_address_ = other.target_server_address_;
fd_ = other.fd_;
- epoll_server_ = other.epoll_server_;
+ masque_session_ = other.masque_session_;
other.fd_ = kQuicInvalidSocketFd;
+ other.flow_id_.reset();
+ if (flow_id_.has_value()) {
+ masque_session_->UnregisterHttp3FlowId(flow_id());
+ masque_session_->RegisterHttp3FlowId(flow_id(), this);
+ }
return *this;
}
+void MasqueServerSession::ConnectUdpServerState::OnHttp3Datagram(
+ QuicDatagramFlowId flow_id,
+ absl::string_view payload) {
+ QUICHE_DCHECK_EQ(flow_id, this->flow_id());
+ QuicUdpSocketApi socket_api;
+ QuicUdpPacketInfo packet_info;
+ packet_info.SetPeerAddress(target_server_address_);
+ WriteResult write_result = socket_api.WritePacket(
+ fd_, payload.data(), payload.length(), packet_info);
+ QUIC_DVLOG(1) << "Wrote packet to server with result " << write_result;
+}
+
} // namespace quic
diff --git a/quic/masque/masque_server_session.h b/quic/masque/masque_server_session.h
index a73e641..14b8293 100644
--- a/quic/masque/masque_server_session.h
+++ b/quic/masque/masque_server_session.h
@@ -83,9 +83,12 @@
// Handle packet for client, meant to be called by MasqueDispatcher.
void HandlePacketFromServer(const ReceivedPacketInfo& packet_info);
+ QuicEpollServer* epoll_server() const { return epoll_server_; }
+
private:
// State that the MasqueServerSession keeps for each CONNECT-UDP request.
- class QUIC_NO_EXPORT ConnectUdpServerState {
+ class QUIC_NO_EXPORT ConnectUdpServerState
+ : public QuicSpdySession::Http3DatagramVisitor {
public:
// ConnectUdpServerState takes ownership of |fd|. It will unregister it
// from |epoll_server| and close the file descriptor when destructed.
@@ -94,7 +97,7 @@
QuicStreamId stream_id,
const QuicSocketAddress& target_server_address,
QuicUdpSocketFd fd,
- QuicEpollServer* epoll_server);
+ MasqueServerSession* masque_session);
~ConnectUdpServerState();
@@ -104,19 +107,26 @@
ConnectUdpServerState& operator=(const ConnectUdpServerState&) = delete;
ConnectUdpServerState& operator=(ConnectUdpServerState&&);
- QuicDatagramFlowId flow_id() const { return flow_id_; }
+ QuicDatagramFlowId flow_id() const {
+ QUICHE_DCHECK(flow_id_.has_value());
+ return *flow_id_;
+ }
QuicStreamId stream_id() const { return stream_id_; }
const QuicSocketAddress& target_server_address() const {
return target_server_address_;
}
QuicUdpSocketFd fd() const { return fd_; }
+ // From QuicSpdySession::Http3DatagramVisitor.
+ void OnHttp3Datagram(QuicDatagramFlowId flow_id,
+ absl::string_view payload) override;
+
private:
- QuicDatagramFlowId flow_id_;
+ absl::optional<QuicDatagramFlowId> flow_id_;
QuicStreamId stream_id_;
QuicSocketAddress target_server_address_;
QuicUdpSocketFd fd_; // Owned.
- QuicEpollServer* epoll_server_; // Unowned.
+ MasqueServerSession* masque_session_; // Unowned.
};
MasqueServerBackend* masque_server_backend_; // Unowned.