Add WakeUp() to the QuicEventLoop API. While we can't consistently support it on all platforms in the default implementations, it is used commonly enough that we should expose it directly. PiperOrigin-RevId: 874548112
diff --git a/quiche/quic/bindings/quic_libevent.h b/quiche/quic/bindings/quic_libevent.h index 0078eb6..c37f0aa 100644 --- a/quiche/quic/bindings/quic_libevent.h +++ b/quiche/quic/bindings/quic_libevent.h
@@ -51,7 +51,8 @@ // Can be called from another thread to wake up the event loop from a blocking // RunEventLoopOnce() call. - void WakeUp(); + void WakeUp() override; + bool SupportsWakeUp() const override { return true; } event_base* base() { return base_; } QuicClock* clock() const { return clock_; }
diff --git a/quiche/quic/core/io/quic_all_event_loops_test.cc b/quiche/quic/core/io/quic_all_event_loops_test.cc index f63f587..02b21b6 100644 --- a/quiche/quic/core/io/quic_all_event_loops_test.cc +++ b/quiche/quic/core/io/quic_all_event_loops_test.cc
@@ -14,11 +14,11 @@ #include <fcntl.h> #include <unistd.h> +#include <atomic> #include <memory> #include <string> #include <utility> -#include "absl/cleanup/cleanup.h" #include "absl/memory/memory.h" #include "absl/strings/string_view.h" #include "quiche/quic/core/io/quic_default_event_loop.h" @@ -29,6 +29,7 @@ #include "quiche/quic/core/quic_time.h" #include "quiche/quic/platform/api/quic_test.h" #include "quiche/quic/test_tools/quic_test_utils.h" +#include "quiche/common/platform/api/quiche_thread.h" namespace quic::test { namespace { @@ -467,5 +468,51 @@ QuicTime::Delta::FromMilliseconds(100)); } +constexpr int kWakeUpTestIterations = 1000; +constexpr QuicTimeDelta kWakeUpTestTimeout = QuicTimeDelta::FromSeconds(5); + +class WakeUpThread : public quiche::QuicheThread { + public: + explicit WakeUpThread(QuicEventLoop* loop) + : QuicheThread("WakeUpThread"), loop_(loop) {} + + void Run() override { + while (counter_.load() < kWakeUpTestIterations) { + loop_->WakeUp(); + } + } + + std::atomic<int>& counter() { return counter_; } + + private: + QuicEventLoop* loop_; + std::atomic<int> counter_ = 0; +}; + +TEST_P(QuicEventLoopFactoryTest, WakeUp) { + if (!loop_->SupportsWakeUp()) { + GTEST_SKIP(); + } + + WakeUpThread thread(loop_.get()); + thread.Start(); + const QuicTime start = clock_.Now(); + // If `WakeUp()` does not work, the event loop will take about + // `kWakeUpTestIterations` seconds to finish. + for (int i = 0; i < kWakeUpTestIterations; ++i) { + loop_->RunEventLoopOnce(QuicTimeDelta::FromSeconds(1)); + ++thread.counter(); + const QuicTimeDelta time_elapsed = clock_.Now() - start; + if (time_elapsed > kWakeUpTestTimeout) { + ADD_FAILURE() << "WakeUp test timed out with " << thread.counter().load() + << " iterations ran"; + thread.counter().store( + kWakeUpTestIterations); // Cause the thread to exit. + break; + } + } + thread.Join(); +} + } // namespace } // namespace quic::test
diff --git a/quiche/quic/core/io/quic_event_loop.h b/quiche/quic/core/io/quic_event_loop.h index 53707b4..e10fa83 100644 --- a/quiche/quic/core/io/quic_event_loop.h +++ b/quiche/quic/core/io/quic_event_loop.h
@@ -12,6 +12,7 @@ #include "quiche/quic/core/io/socket.h" #include "quiche/quic/core/quic_alarm_factory.h" #include "quiche/quic/core/quic_clock.h" +#include "quiche/quic/core/quic_time.h" namespace quic { @@ -70,7 +71,7 @@ // Runs a single iteration of the event loop. The iteration will run for at // most |default_timeout|. - virtual void RunEventLoopOnce(QuicTime::Delta default_timeout) = 0; + virtual void RunEventLoopOnce(QuicTimeDelta default_timeout) = 0; // Returns an alarm factory that allows alarms to be scheduled on this event // loop. @@ -79,6 +80,18 @@ // Returns the clock that is used by the alarm factory that the event loop // provides. virtual const QuicClock* GetClock() = 0; + + // Returns true if the event loop implementation supports the `WakeUp()` call. + virtual bool SupportsWakeUp() const = 0; + + // If `RunEventLoopOnce` is called in another thread, causes that call to exit + // even if no event occurred. Thread-safe, unlike all other methods in the + // class; the caller still has to ensure the lifetime safety across both of + // the threads involved. + // + // `WakeUp()` is not supported in all implementations and/or platforms; the + // caller has to call `SupportsWakeUp()` to ensure it is enabled. + virtual void WakeUp() = 0; }; // A factory object for the event loop. Every implementation is expected to have
diff --git a/quiche/quic/core/io/quic_poll_event_loop.cc b/quiche/quic/core/io/quic_poll_event_loop.cc index ee9d04d..d13f8cb 100644 --- a/quiche/quic/core/io/quic_poll_event_loop.cc +++ b/quiche/quic/core/io/quic_poll_event_loop.cc
@@ -13,6 +13,7 @@ #include <utility> #include <vector> +#include "absl/base/no_destructor.h" #include "absl/types/span.h" #include "quiche/quic/core/io/quic_event_loop.h" #include "quiche/quic/core/io/socket.h" @@ -22,6 +23,17 @@ #include "quiche/quic/platform/api/quic_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" +// On Linux, use eventfd(2) to implement QuicEventLoop::WakeUp() API. +// The API in question is not implemented on other QUICHE-supported platforms. +// libevent supports this on other platforms, and should be used if such +// functionality is required. +#if defined(__linux__) +#include <sys/eventfd.h> +#define QUIC_SUPPORTS_EVENTFD 1 +#else +#define QUIC_SUPPORTS_EVENTFD 0 +#endif // defined(__linux__) + namespace quic { namespace { @@ -40,9 +52,36 @@ ((poll_mask & POLLERR) ? kSocketEventError : 0); } +#if QUIC_SUPPORTS_EVENTFD +class QuicPollEventLoopDrainListener : public QuicSocketEventListener { + public: + void OnSocketEvent(QuicEventLoop* loop, SocketFd fd, + QuicSocketEventMask event) override { + QUICHE_DCHECK_EQ(event, kSocketEventReadable); + // eventfd_read will reset the associated event counter to zero. + eventfd_t value; + int result = eventfd_read(fd, &value); + QUIC_BUG_IF(QuicPollEventLoopDrainListener_read_failed, result != 0) + << "eventfd_read call failed: " << errno; + // Rearm the `fd`, since the poll-based loop is level-triggered. + bool success = loop->RearmSocket(fd, kSocketEventReadable); + QUICHE_DCHECK(success); + } +}; +#endif + } // namespace -QuicPollEventLoop::QuicPollEventLoop(QuicClock* clock) : clock_(clock) {} +QuicPollEventLoop::QuicPollEventLoop(QuicClock* clock) : clock_(clock) { +#if QUIC_SUPPORTS_EVENTFD + static absl::NoDestructor<QuicPollEventLoopDrainListener> + poll_event_loop_drain_listener; + wake_up_eventfd_ = OwnedSocketFd(::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)); + bool success = RegisterSocket(wake_up_eventfd_.get(), kSocketEventReadable, + poll_event_loop_drain_listener.get()); + QUICHE_DCHECK(success); +#endif +} bool QuicPollEventLoop::RegisterSocket(SocketFd fd, QuicSocketEventMask events, QuicSocketEventListener* listener) { @@ -213,6 +252,24 @@ return std::make_unique<QuicAlarmFactoryProxy>(&alarms_); } +bool QuicPollEventLoop::SupportsWakeUp() const { + return wake_up_eventfd_.valid(); +} + +void QuicPollEventLoop::WakeUp() { +#if QUIC_SUPPORTS_EVENTFD + if (SupportsWakeUp()) { + int result = eventfd_write(*wake_up_eventfd_, 1); + QUIC_BUG_IF(QuicPollEventLoop_WakeUp_Failed, result != 0) + << "eventfd_write call failed: " << errno; + return; + } +#endif + + QUIC_BUG(QuicPollEventLoop_WakeUp_Unimplemented) + << "QuicPollEventLoop::WakeUp() is not supported on this platform"; +} + int QuicPollEventLoop::PollSyscall(pollfd* fds, size_t nfds, int timeout) { #if defined(_WIN32) return WSAPoll(fds, nfds, timeout);
diff --git a/quiche/quic/core/io/quic_poll_event_loop.h b/quiche/quic/core/io/quic_poll_event_loop.h index ad66c7f..a3cf1ba 100644 --- a/quiche/quic/core/io/quic_poll_event_loop.h +++ b/quiche/quic/core/io/quic_poll_event_loop.h
@@ -60,6 +60,8 @@ void RunEventLoopOnce(QuicTime::Delta default_timeout) override; std::unique_ptr<QuicAlarmFactory> CreateAlarmFactory() override; const QuicClock* GetClock() override { return clock_; } + bool SupportsWakeUp() const override; + void WakeUp() override; protected: // Allows poll(2) calls to be mocked out in unit tests. @@ -110,6 +112,7 @@ const QuicClock* clock_; RegistrationMap registrations_; QuicQueueAlarmFactory alarms_; + OwnedSocketFd wake_up_eventfd_; // Only populated on Linux. bool has_artificial_events_pending_ = false; };