Skip to main content

pal_async/unix/
epoll.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! An executor based on epoll.
5
6use super::epoll_uring::EPOLL_URING_STATE;
7use super::epoll_uring::EPOLL_URING_TOKEN;
8use super::epoll_uring::EpollIoUring;
9use super::epoll_uring::EpollUringThreadState;
10use super::wait::FdWait;
11use crate::fd::FdReadyDriver;
12use crate::fd::PollFdReady;
13use crate::interest::InterestSlot;
14use crate::interest::PollEvents;
15use crate::interest::PollInterestSet;
16use crate::io_pool::IoBackend;
17use crate::io_pool::IoDriver;
18use crate::io_pool::IoPool;
19use crate::timer::Instant;
20use crate::timer::PollTimer;
21use crate::timer::TimerDriver;
22use crate::timer::TimerQueue;
23use crate::timer::TimerQueueId;
24use crate::timer::TimerResult;
25use crate::wait::WaitDriver;
26use crate::waker::WakerList;
27use futures::FutureExt;
28use futures::task::ArcWake;
29use futures::task::waker_ref;
30use once_cell::sync::OnceCell;
31use pal::unix::Errno;
32use pal::unix::SyscallResult;
33use pal::unix::while_eintr;
34use pal_event::Event;
35use parking_lot::Mutex;
36use std::fs::File;
37use std::future::Future;
38use std::io;
39use std::os::unix::prelude::*;
40use std::pin::pin;
41use std::sync::Arc;
42use std::task::Context;
43use std::task::Poll;
44
45/// An single-threaded task pool backed by epoll.
46pub type EpollPool = IoPool<EpollBackend>;
47
48/// A driver to spawn tasks and IO objects on [`EpollPool`].
49pub type EpollDriver = IoDriver<EpollBackend>;
50
51#[derive(Debug)]
52pub struct EpollBackend {
53    epfd: EpollFd,
54    wake_event: Event,
55    state: Mutex<EpollState>,
56    io_uring: OnceCell<EpollIoUring>,
57}
58
59impl Default for EpollBackend {
60    fn default() -> Self {
61        let epfd = EpollFd::new().expect("epoll not functional");
62        let wake_event = Event::new();
63        // Register for notifications when the wake event is signaled. Use
64        // edge-triggered mode because we can (we immediately consume the event
65        // when it is signaled) and because miri requires it.
66        epfd.add(
67            wake_event.as_fd().as_raw_fd(),
68            libc::EPOLLIN | libc::EPOLLET,
69            0,
70        )
71        .expect("could not add wake event");
72        Self {
73            epfd,
74            wake_event,
75            state: Mutex::new(EpollState {
76                state: PoolState::Running,
77                timers: TimerQueue::default(),
78                fd_ready_to_delete: Vec::new(),
79            }),
80            io_uring: OnceCell::new(),
81        }
82    }
83}
84
85#[derive(Debug)]
86struct EpollState {
87    state: PoolState,
88    timers: TimerQueue,
89    fd_ready_to_delete: Vec<Arc<FdReadyOp>>,
90}
91
92#[derive(Debug)]
93enum PoolState {
94    Running,
95    RunAgain,
96    Sleeping(Option<Instant>),
97    Waking,
98}
99
100impl PoolState {
101    fn reset(&mut self) {
102        match self {
103            PoolState::Running => {}
104            PoolState::RunAgain => {}
105            PoolState::Sleeping(_) => unreachable!(),
106            PoolState::Waking => {}
107        }
108        *self = PoolState::Running;
109    }
110
111    /// Returns true if the wake event must be signaled.
112    #[must_use]
113    fn wake(&mut self) -> bool {
114        match self {
115            PoolState::Running => {
116                *self = PoolState::RunAgain;
117                false
118            }
119            PoolState::RunAgain => false,
120            PoolState::Sleeping(_) => {
121                *self = PoolState::Waking;
122                true
123            }
124            PoolState::Waking => false,
125        }
126    }
127
128    fn can_sleep(&self) -> bool {
129        match self {
130            PoolState::Running => true,
131            PoolState::RunAgain => false,
132            PoolState::Sleeping(_) => unreachable!(),
133            PoolState::Waking => unreachable!(),
134        }
135    }
136
137    fn sleep(&mut self, deadline: Option<Instant>) {
138        match self {
139            PoolState::Running => {}
140            PoolState::RunAgain => unreachable!(),
141            PoolState::Sleeping(_) => unreachable!(),
142            PoolState::Waking => unreachable!(),
143        }
144        *self = PoolState::Sleeping(deadline);
145    }
146
147    /// Returns true if the wake event must be signaled.
148    #[must_use]
149    fn wake_for_timer(&mut self, deadline: Instant) -> bool {
150        match self {
151            PoolState::Running => false,
152            PoolState::RunAgain => false,
153            PoolState::Waking => false,
154            &mut PoolState::Sleeping(Some(current_deadline)) if current_deadline <= deadline => {
155                false
156            }
157            PoolState::Sleeping(_) => {
158                *self = PoolState::Waking;
159                true
160            }
161        }
162    }
163
164    fn is_referencing_ops(&self) -> bool {
165        match self {
166            PoolState::Running => false,
167            PoolState::RunAgain => false,
168            PoolState::Sleeping(_) => true,
169            PoolState::Waking => true,
170        }
171    }
172}
173
174impl IoBackend for EpollBackend {
175    fn name() -> &'static str {
176        "epoll"
177    }
178
179    fn run<Fut: Future>(self: &Arc<Self>, fut: Fut) -> Fut::Output {
180        let mut fut = pin!(fut);
181
182        let waker = waker_ref(self);
183        let mut cx = Context::from_waker(&waker);
184
185        let mut to_delete: Vec<_> = Vec::new();
186        let mut wakers = WakerList::default();
187
188        // Set up thread-local state for on-thread io-uring access.
189        // The io-uring may not be initialized yet (it's lazy), so we
190        // use a null pointer initially and update it on first use.
191        let uring_thread_state = EpollUringThreadState::new(std::ptr::null());
192
193        EPOLL_URING_STATE.with(|cell| {
194            cell.lend(&uring_thread_state, || {
195                let mut state = self.state.lock();
196                loop {
197                    state.state.reset();
198
199                    // Wake timers.
200                    state.timers.wake_expired(&mut wakers);
201                    drop(state);
202
203                    // Drain completions before polling so futures see
204                    // results from the previous cycle immediately.
205                    // Also update the thread-local ring pointer so
206                    // queue_sqe takes the fast path for on-thread callers.
207                    if let Some(uring) = self.io_uring.get() {
208                        // SAFETY: We are on the epoll thread and sole accessor
209                        // of the ring's SQ/CQ.
210                        unsafe { uring_thread_state.set_ring(uring) }
211                            .process_completions(&mut wakers);
212                    }
213
214                    wakers.wake();
215                    to_delete.clear();
216
217                    match fut.poll_unpin(&mut cx) {
218                        Poll::Ready(r) => break r,
219                        Poll::Pending => {}
220                    }
221
222                    state = self.state.lock();
223                    // This list is only populated while in the Sleeping state.
224                    assert!(state.fd_ready_to_delete.is_empty());
225
226                    if state.state.can_sleep() {
227                        let deadline = state.timers.next_deadline();
228                        state.state.sleep(deadline);
229                        drop(state);
230
231                        // Flush queued SQEs before sleeping. This is
232                        // the single io_uring_enter() per sleep cycle,
233                        // naturally batching all SQEs queued during
234                        // RunAgain into one syscall.
235                        if let Some(uring) = self.io_uring.get() {
236                            // SAFETY: We are on the epoll thread and sole
237                            // accessor of the ring's SQ/CQ.
238                            unsafe { uring_thread_state.set_ring(uring) }.flush();
239                        }
240
241                        let timeout = deadline
242                            .map(|deadline| {
243                                let now = Instant::now();
244                                (deadline.max(now) - now)
245                                    .as_millis()
246                                    .try_into()
247                                    .unwrap_or(i32::MAX)
248                            })
249                            .unwrap_or(-1);
250
251                        let mut events = [libc::epoll_event { events: 0, u64: 0 }; 8];
252                        let n = while_eintr(|| self.epfd.wait(&mut events, timeout))
253                            .expect("epoll_wait failed unexpectedly");
254
255                        // Block unnecessary wakeups.
256                        let _ = self.state.lock().state.wake();
257
258                        for event in &events[..n] {
259                            if event.u64 == 0 {
260                                self.wake_event.try_wait();
261                            } else if event.u64 == EPOLL_URING_TOKEN {
262                                // Completions are drained at the top of
263                                // the loop, so nothing to do here.
264                            } else {
265                                // SAFETY: the operation context is still alive and
266                                // can be dereferenced. It's possible the underlying
267                                // FdReady has been dropped, but in that case the
268                                // associated operation context will have been added
269                                // to the fd_ready_to_delete list.
270                                //
271                                // Note that this is only true until state reverts
272                                // to the Running state, which occurs below.
273                                let op = unsafe { &*(event.u64 as usize as *const FdReadyOp) };
274                                op.wake_ready(event.events, &mut wakers);
275                            }
276                        }
277
278                        state = self.state.lock();
279
280                        // Free any FdReadyOp objects that were deleted while in the epoll_wait call.
281                        to_delete.append(&mut state.fd_ready_to_delete);
282                    }
283                }
284            }) // cell.lend
285        }) // EPOLL_URING_STATE.with
286    }
287}
288
289#[derive(Debug)]
290struct EpollFd(File);
291
292impl EpollFd {
293    fn new() -> Result<Self, Errno> {
294        // SAFETY: epoll_create1 creates a new, uniquely owned fd.
295        let epfd = unsafe {
296            File::from_raw_fd(libc::epoll_create1(libc::EPOLL_CLOEXEC).syscall_result()?)
297        };
298        Ok(Self(epfd))
299    }
300
301    fn add(&self, fd: RawFd, events: i32, context: u64) -> Result<(), Errno> {
302        let mut event = libc::epoll_event {
303            events: events as u32,
304            u64: context,
305        };
306        // SAFETY: safe to call with any fd.
307        unsafe {
308            libc::epoll_ctl(self.0.as_raw_fd(), libc::EPOLL_CTL_ADD, fd, &mut event)
309                .syscall_result()?;
310        }
311        Ok(())
312    }
313
314    fn del(&self, fd: RawFd) -> Result<(), Errno> {
315        // SAFETY: safe to call with any fd.
316        unsafe {
317            libc::epoll_ctl(
318                self.0.as_raw_fd(),
319                libc::EPOLL_CTL_DEL,
320                fd,
321                std::ptr::null_mut(),
322            )
323            .syscall_result()?;
324        }
325        Ok(())
326    }
327
328    fn wait(&self, events: &mut [libc::epoll_event], timeout: i32) -> Result<usize, Errno> {
329        let maxevents = events.len() as i32;
330        // SAFETY: maxevents is set appropriately to write to the events slice.
331        let n = unsafe {
332            libc::epoll_wait(self.0.as_raw_fd(), events.as_mut_ptr(), maxevents, timeout)
333                .syscall_result()?
334        };
335        Ok(n as usize)
336    }
337}
338
339impl ArcWake for EpollBackend {
340    fn wake_by_ref(arc_self: &Arc<Self>) {
341        let wake = arc_self.state.lock().state.wake();
342        if wake {
343            arc_self.wake_event.signal();
344        }
345    }
346}
347
348#[derive(Debug)]
349pub struct FdReady {
350    op: Arc<FdReadyOp>,
351    epoll: Arc<EpollBackend>,
352    fd: RawFd,
353}
354
355#[derive(Debug)]
356struct FdReadyOp {
357    inner: Mutex<FdReadyInner>,
358}
359
360#[derive(Debug)]
361struct FdReadyInner {
362    interests: PollInterestSet,
363}
364
365impl FdReadyOp {
366    fn wake_ready(&self, ep_events: u32, wakers: &mut WakerList) {
367        let revents = PollEvents::from_epoll_events(ep_events);
368        self.inner.lock().interests.wake_ready(revents, wakers)
369    }
370}
371
372impl FdReadyDriver for EpollDriver {
373    type FdReady = FdReady;
374
375    fn new_fd_ready(&self, fd: RawFd) -> io::Result<Self::FdReady> {
376        let op = Arc::new(FdReadyOp {
377            inner: Mutex::new(FdReadyInner {
378                interests: PollInterestSet::default(),
379            }),
380        });
381        self.inner.epfd.add(
382            fd,
383            libc::EPOLLET | libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLPRI | libc::EPOLLRDHUP,
384            Arc::as_ptr(&op) as usize as u64,
385        )?;
386        // Add reference owned by epfd, reclaimed in drop.
387        let _ = Arc::into_raw(op.clone());
388
389        Ok(FdReady {
390            op,
391            epoll: self.inner.clone(),
392            fd,
393        })
394    }
395}
396
397impl Drop for FdReady {
398    fn drop(&mut self) {
399        // N.B. This can fail if `self.fd` was closed before `self` is
400        // dropped. Since other executors don't behave this way, this might
401        // be a common mistake. But it's not recoverable here (e.g. by
402        // ignoring EBADF or ENOENT), since `self.fd` might be reused for
403        // another file descriptor in the race window between closing the fd
404        // and issuing this ioctl.
405        //
406        // Instead, the user of this object should ensure that `self` is
407        // dropped before the associated fd is closed (e.g. by putting the
408        // `FdReady` above the `File` or whatever in their struct).
409        self.epoll
410            .epfd
411            .del(self.fd)
412            .expect("epoll_ctl unexpectedly failed");
413
414        // SAFETY: Reclaiming the reference added in new_fd_ready.
415        let op = unsafe { Arc::from_raw(Arc::as_ptr(&self.op)) };
416        let mut state = self.epoll.state.lock();
417        if state.state.is_referencing_ops() {
418            state.fd_ready_to_delete.push(op);
419            if state.state.wake() {
420                drop(state);
421                self.epoll.wake_event.signal();
422            }
423        }
424    }
425}
426
427impl PollFdReady for FdReady {
428    fn poll_fd_ready(
429        &mut self,
430        cx: &mut Context<'_>,
431        slot: InterestSlot,
432        events: PollEvents,
433    ) -> Poll<PollEvents> {
434        self.op.inner.lock().interests.poll_ready(cx, slot, events)
435    }
436
437    fn clear_fd_ready(&mut self, slot: InterestSlot) {
438        self.op.inner.lock().interests.clear_ready(slot)
439    }
440}
441
442impl WaitDriver for EpollDriver {
443    type Wait = FdWait<FdReady>;
444
445    fn new_wait(&self, fd: RawFd, read_size: usize) -> io::Result<Self::Wait> {
446        Ok(FdWait::new(fd, self.new_fd_ready(fd)?, read_size))
447    }
448}
449
450impl TimerDriver for EpollDriver {
451    type Timer = Timer;
452
453    fn new_timer(&self) -> Self::Timer {
454        let id = self.inner.state.lock().timers.add();
455        Timer {
456            epoll: self.inner.clone(),
457            id,
458        }
459    }
460}
461
462impl crate::io_uring::IoUringDriver for EpollDriver {
463    type Submitter = EpollIoUring;
464
465    fn io_uring_submitter(&self) -> Option<&EpollIoUring> {
466        self.inner
467            .io_uring
468            .get_or_try_init(|| {
469                EpollIoUring::new(
470                    self.inner.epfd.0.as_raw_fd(),
471                    self.inner.wake_event.as_fd().as_raw_fd(),
472                )
473            })
474            .ok()
475    }
476}
477
478#[derive(Debug)]
479pub struct Timer {
480    epoll: Arc<EpollBackend>,
481    id: TimerQueueId,
482}
483
484impl Drop for Timer {
485    fn drop(&mut self) {
486        let _waker = self.epoll.state.lock().timers.remove(self.id);
487    }
488}
489
490impl PollTimer for Timer {
491    fn poll_timer(&mut self, cx: &mut Context<'_>, deadline: Option<Instant>) -> Poll<Instant> {
492        let mut state = self.epoll.state.lock();
493        if let Some(deadline) = deadline {
494            state.timers.set_deadline(self.id, deadline);
495        }
496        match state.timers.poll_deadline(cx, self.id) {
497            TimerResult::TimedOut(now) => Poll::Ready(now),
498            TimerResult::Pending(deadline) => {
499                if state.state.wake_for_timer(deadline) {
500                    drop(state);
501                    self.epoll.wake_event.signal();
502                }
503                Poll::Pending
504            }
505        }
506    }
507
508    fn set_deadline(&mut self, deadline: Instant) {
509        let mut state = self.epoll.state.lock();
510        if state.timers.set_deadline(self.id, deadline) && state.state.wake_for_timer(deadline) {
511            drop(state);
512            self.epoll.wake_event.signal();
513        }
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::EpollPool;
520    use crate::executor_tests;
521
522    #[test]
523    fn waker_works() {
524        EpollPool::run_with(|_| executor_tests::waker_tests())
525    }
526
527    #[test]
528    fn spawn_works() {
529        executor_tests::spawn_tests(|| {
530            let pool = EpollPool::new();
531            (pool.driver(), move || pool.run())
532        })
533    }
534
535    #[test]
536    fn sleep_works() {
537        EpollPool::run_with(executor_tests::sleep_tests)
538    }
539
540    #[test]
541    fn wait_works() {
542        EpollPool::run_with(executor_tests::wait_tests)
543    }
544
545    #[test]
546    fn socket_works() {
547        EpollPool::run_with(executor_tests::socket_tests)
548    }
549
550    #[test]
551    fn uring_works() {
552        EpollPool::run_with(executor_tests::io_uring_tests::uring_tests)
553    }
554}