blob: 1eee5908b41009259dead15da312d59813d476ee [file] [log] [blame] [edit]
// Copyright 2022 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/bindings/quic_libevent.h"
#include <memory>
#include <utility>
#include "absl/time/time.h"
#include "event2/event.h"
#include "event2/event_struct.h"
#include "event2/thread.h"
#include "quiche/quic/core/io/quic_event_loop.h"
#include "quiche/quic/core/quic_alarm.h"
#include "quiche/quic/core/quic_clock.h"
#include "quiche/quic/core/quic_default_clock.h"
#include "quiche/quic/core/quic_time.h"
namespace quic {
using LibeventEventMask = short; // NOLINT(runtime/int)
QuicSocketEventMask LibeventEventMaskToQuicEvents(int events) {
return ((events & EV_READ) ? kSocketEventReadable : 0) |
((events & EV_WRITE) ? kSocketEventWritable : 0);
}
LibeventEventMask QuicEventsToLibeventEventMask(QuicSocketEventMask events) {
return ((events & kSocketEventReadable) ? EV_READ : 0) |
((events & kSocketEventWritable) ? EV_WRITE : 0);
}
class LibeventAlarm : public QuicAlarm {
public:
LibeventAlarm(LibeventQuicEventLoop* loop,
QuicArenaScopedPtr<QuicAlarm::Delegate> delegate)
: QuicAlarm(std::move(delegate)), clock_(loop->clock()) {
event_.reset(evtimer_new(
loop->base(),
[](evutil_socket_t, LibeventEventMask, void* arg) {
LibeventAlarm* self = reinterpret_cast<LibeventAlarm*>(arg);
self->Fire();
},
this));
}
protected:
void SetImpl() override {
absl::Duration timeout =
absl::Microseconds((deadline() - clock_->Now()).ToMicroseconds());
timeval unix_time = absl::ToTimeval(timeout);
event_add(event_.get(), &unix_time);
}
void CancelImpl() override { event_del(event_.get()); }
private:
std::unique_ptr<event, LibeventEventDeleter> event_;
QuicClock* clock_;
};
LibeventQuicEventLoop::LibeventQuicEventLoop(event_base* base, QuicClock* clock)
: base_(base),
edge_triggered_(event_base_get_features(base) & EV_FEATURE_ET),
clock_(clock),
artifical_event_timer_(evtimer_new(
base_,
[](evutil_socket_t, LibeventEventMask, void* arg) {
auto* self = reinterpret_cast<LibeventQuicEventLoop*>(arg);
self->ActivateArtificialEvents();
},
this)) {
QUICHE_CHECK_LE(sizeof(event), event_get_struct_event_size())
<< "libevent ABI mismatch: sizeof(event) is bigger than the one QUICHE "
"has been compiled with";
}
LibeventQuicEventLoop::~LibeventQuicEventLoop() {
event_del(artifical_event_timer_.get());
}
bool LibeventQuicEventLoop::RegisterSocket(QuicUdpSocketFd fd,
QuicSocketEventMask events,
QuicSocketEventListener* listener) {
auto [it, success] =
registration_map_.try_emplace(fd, this, fd, events, listener);
return success;
}
bool LibeventQuicEventLoop::UnregisterSocket(QuicUdpSocketFd fd) {
fds_with_artifical_events_.erase(fd);
return registration_map_.erase(fd);
}
bool LibeventQuicEventLoop::RearmSocket(QuicUdpSocketFd fd,
QuicSocketEventMask events) {
if (edge_triggered_) {
QUICHE_BUG(LibeventQuicEventLoop_RearmSocket_called_on_ET)
<< "RearmSocket() called on an edge-triggered event loop";
return false;
}
auto it = registration_map_.find(fd);
if (it == registration_map_.end()) {
return false;
}
it->second.Rearm(events);
return true;
}
bool LibeventQuicEventLoop::ArtificiallyNotifyEvent(
QuicUdpSocketFd fd, QuicSocketEventMask events) {
auto it = registration_map_.find(fd);
if (it == registration_map_.end()) {
return false;
}
it->second.RecordArtificalEvents(events);
fds_with_artifical_events_.insert(fd);
if (!evtimer_pending(artifical_event_timer_.get(), nullptr)) {
struct timeval tv = {0, 0}; // Fire immediately in the next iteration.
evtimer_add(artifical_event_timer_.get(), &tv);
}
return true;
}
void LibeventQuicEventLoop::ActivateArtificialEvents() {
absl::flat_hash_set<QuicUdpSocketFd> fds_with_artifical_events;
{
using std::swap;
swap(fds_with_artifical_events_, fds_with_artifical_events);
}
for (QuicUdpSocketFd fd : fds_with_artifical_events) {
auto it = registration_map_.find(fd);
if (it == registration_map_.end()) {
continue;
}
it->second.MaybeNotifyArtificalEvents();
}
}
void LibeventQuicEventLoop::RunEventLoopOnce(QuicTime::Delta default_timeout) {
timeval timeout =
absl::ToTimeval(absl::Microseconds(default_timeout.ToMicroseconds()));
event_base_loopexit(base_, &timeout);
event_base_loop(base_, EVLOOP_ONCE);
}
void LibeventQuicEventLoop::WakeUp() {
timeval timeout = absl::ToTimeval(absl::ZeroDuration());
event_base_loopexit(base_, &timeout);
}
LibeventQuicEventLoop::Registration::Registration(
LibeventQuicEventLoop* loop, QuicUdpSocketFd fd, QuicSocketEventMask events,
QuicSocketEventListener* listener)
: loop_(loop), listener_(listener) {
event_callback_fn callback = [](evutil_socket_t fd, LibeventEventMask events,
void* arg) {
auto* self = reinterpret_cast<LibeventQuicEventLoop::Registration*>(arg);
self->listener_->OnSocketEvent(self->loop_, fd,
LibeventEventMaskToQuicEvents(events));
};
if (loop_->SupportsEdgeTriggered()) {
LibeventEventMask mask =
QuicEventsToLibeventEventMask(events) | EV_PERSIST | EV_ET;
event_assign(&both_events_, loop_->base(), fd, mask, callback, this);
event_add(&both_events_, nullptr);
} else {
event_assign(&read_event_, loop_->base(), fd, EV_READ, callback, this);
event_assign(&write_event_, loop_->base(), fd, EV_WRITE, callback, this);
Rearm(events);
}
}
LibeventQuicEventLoop::Registration::~Registration() {
if (loop_->SupportsEdgeTriggered()) {
event_del(&both_events_);
} else {
event_del(&read_event_);
event_del(&write_event_);
}
}
void LibeventQuicEventLoop::Registration::RecordArtificalEvents(
QuicSocketEventMask events) {
artificial_events_ |= events;
}
void LibeventQuicEventLoop::Registration::MaybeNotifyArtificalEvents() {
if (artificial_events_ == 0) {
return;
}
QuicSocketEventMask events = artificial_events_;
artificial_events_ = 0;
if (loop_->SupportsEdgeTriggered()) {
event_active(&both_events_, QuicEventsToLibeventEventMask(events), 0);
return;
}
if (events & kSocketEventReadable) {
event_active(&read_event_, EV_READ, 0);
}
if (events & kSocketEventWritable) {
event_active(&write_event_, EV_WRITE, 0);
}
}
void LibeventQuicEventLoop::Registration::Rearm(QuicSocketEventMask events) {
QUICHE_DCHECK(!loop_->SupportsEdgeTriggered());
if (events & kSocketEventReadable) {
event_add(&read_event_, nullptr);
}
if (events & kSocketEventWritable) {
event_add(&write_event_, nullptr);
}
}
QuicAlarm* LibeventQuicEventLoop::AlarmFactory::CreateAlarm(
QuicAlarm::Delegate* delegate) {
return new LibeventAlarm(loop_,
QuicArenaScopedPtr<QuicAlarm::Delegate>(delegate));
}
QuicArenaScopedPtr<QuicAlarm> LibeventQuicEventLoop::AlarmFactory::CreateAlarm(
QuicArenaScopedPtr<QuicAlarm::Delegate> delegate,
QuicConnectionArena* arena) {
if (arena != nullptr) {
return arena->New<LibeventAlarm>(loop_, std::move(delegate));
}
return QuicArenaScopedPtr<QuicAlarm>(
new LibeventAlarm(loop_, std::move(delegate)));
}
QuicLibeventEventLoopFactory::QuicLibeventEventLoopFactory(
bool force_level_triggered)
: force_level_triggered_(force_level_triggered) {
std::unique_ptr<QuicEventLoop> event_loop = Create(QuicDefaultClock::Get());
name_ = absl::StrFormat(
"libevent(%s)",
event_base_get_method(
static_cast<LibeventQuicEventLoopWithOwnership*>(event_loop.get())
->base()));
}
struct LibeventConfigDeleter {
void operator()(event_config* config) { event_config_free(config); }
};
std::unique_ptr<LibeventQuicEventLoopWithOwnership>
LibeventQuicEventLoopWithOwnership::Create(QuicClock* clock,
bool force_level_triggered) {
// Required for event_base_loopbreak() to actually work.
static int threads_initialized = []() {
#ifdef _WIN32
return evthread_use_windows_threads();
#else
return evthread_use_pthreads();
#endif
}();
QUICHE_DCHECK_EQ(threads_initialized, 0);
std::unique_ptr<event_config, LibeventConfigDeleter> config(
event_config_new());
if (force_level_triggered) {
// epoll and kqueue are the two only current libevent backends that support
// edge-triggered I/O.
event_config_avoid_method(config.get(), "epoll");
event_config_avoid_method(config.get(), "kqueue");
}
return std::make_unique<LibeventQuicEventLoopWithOwnership>(
event_base_new_with_config(config.get()), clock);
}
} // namespace quic