pal_async/unix/
epoll.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! An executor based on epoll.
5
6use super::wait::FdWait;
7use crate::fd::FdReadyDriver;
8use crate::fd::PollFdReady;
9use crate::interest::InterestSlot;
10use crate::interest::PollEvents;
11use crate::interest::PollInterestSet;
12use crate::io_pool::IoBackend;
13use crate::io_pool::IoDriver;
14use crate::io_pool::IoPool;
15use crate::timer::Instant;
16use crate::timer::PollTimer;
17use crate::timer::TimerDriver;
18use crate::timer::TimerQueue;
19use crate::timer::TimerQueueId;
20use crate::timer::TimerResult;
21use crate::wait::WaitDriver;
22use crate::waker::WakerList;
23use futures::FutureExt;
24use futures::task::ArcWake;
25use futures::task::waker_ref;
26use pal::unix::Errno;
27use pal::unix::SyscallResult;
28use pal::unix::while_eintr;
29use pal_event::Event;
30use parking_lot::Mutex;
31use std::fs::File;
32use std::future::Future;
33use std::io;
34use std::os::unix::prelude::*;
35use std::pin::pin;
36use std::sync::Arc;
37use std::task::Context;
38use std::task::Poll;
39
40/// An single-threaded task pool backed by epoll.
41pub type EpollPool = IoPool<EpollBackend>;
42
43/// A driver to spawn tasks and IO objects on [`EpollPool`].
44pub type EpollDriver = IoDriver<EpollBackend>;
45
46#[derive(Debug)]
47pub struct EpollBackend {
48    epfd: EpollFd,
49    wake_event: Event,
50    state: Mutex<EpollState>,
51}
52
53impl Default for EpollBackend {
54    fn default() -> Self {
55        let epfd = EpollFd::new().expect("epoll not functional");
56        let wake_event = Event::new();
57        // Register for notifications when the wake event is signaled. Use
58        // edge-triggered mode because we can (we immediately consume the event
59        // when it is signaled) and because miri requires it.
60        epfd.add(
61            wake_event.as_fd().as_raw_fd(),
62            libc::EPOLLIN | libc::EPOLLET,
63            0,
64        )
65        .expect("could not add wake event");
66        Self {
67            epfd,
68            wake_event,
69            state: Mutex::new(EpollState {
70                state: PoolState::Running,
71                timers: TimerQueue::default(),
72                fd_ready_to_delete: Vec::new(),
73            }),
74        }
75    }
76}
77
78#[derive(Debug)]
79struct EpollState {
80    state: PoolState,
81    timers: TimerQueue,
82    fd_ready_to_delete: Vec<Arc<FdReadyOp>>,
83}
84
85#[derive(Debug)]
86enum PoolState {
87    Running,
88    RunAgain,
89    Sleeping(Option<Instant>),
90    Waking,
91}
92
93impl PoolState {
94    fn reset(&mut self) {
95        match self {
96            PoolState::Running => {}
97            PoolState::RunAgain => {}
98            PoolState::Sleeping(_) => unreachable!(),
99            PoolState::Waking => {}
100        }
101        *self = PoolState::Running;
102    }
103
104    /// Returns true if the wake event must be signaled.
105    #[must_use]
106    fn wake(&mut self) -> bool {
107        match self {
108            PoolState::Running => {
109                *self = PoolState::RunAgain;
110                false
111            }
112            PoolState::RunAgain => false,
113            PoolState::Sleeping(_) => {
114                *self = PoolState::Waking;
115                true
116            }
117            PoolState::Waking => false,
118        }
119    }
120
121    fn can_sleep(&self) -> bool {
122        match self {
123            PoolState::Running => true,
124            PoolState::RunAgain => false,
125            PoolState::Sleeping(_) => unreachable!(),
126            PoolState::Waking => unreachable!(),
127        }
128    }
129
130    fn sleep(&mut self, deadline: Option<Instant>) {
131        match self {
132            PoolState::Running => {}
133            PoolState::RunAgain => unreachable!(),
134            PoolState::Sleeping(_) => unreachable!(),
135            PoolState::Waking => unreachable!(),
136        }
137        *self = PoolState::Sleeping(deadline);
138    }
139
140    /// Returns true if the wake event must be signaled.
141    #[must_use]
142    fn wake_for_timer(&mut self, deadline: Instant) -> bool {
143        match self {
144            PoolState::Running => false,
145            PoolState::RunAgain => false,
146            PoolState::Waking => false,
147            &mut PoolState::Sleeping(Some(current_deadline)) if current_deadline <= deadline => {
148                false
149            }
150            PoolState::Sleeping(_) => {
151                *self = PoolState::Waking;
152                true
153            }
154        }
155    }
156
157    fn is_referencing_ops(&self) -> bool {
158        match self {
159            PoolState::Running => false,
160            PoolState::RunAgain => false,
161            PoolState::Sleeping(_) => true,
162            PoolState::Waking => true,
163        }
164    }
165}
166
167impl IoBackend for EpollBackend {
168    fn name() -> &'static str {
169        "epoll"
170    }
171
172    fn run<Fut: Future>(self: &Arc<Self>, fut: Fut) -> Fut::Output {
173        let mut fut = pin!(fut);
174
175        let waker = waker_ref(self);
176        let mut cx = Context::from_waker(&waker);
177
178        let mut to_delete: Vec<_> = Vec::new();
179        let mut wakers = WakerList::default();
180
181        let mut state = self.state.lock();
182        loop {
183            state.state.reset();
184
185            // Wake timers.
186            state.timers.wake_expired(&mut wakers);
187            drop(state);
188
189            wakers.wake();
190            to_delete.clear();
191
192            match fut.poll_unpin(&mut cx) {
193                Poll::Ready(r) => break r,
194                Poll::Pending => {}
195            }
196
197            state = self.state.lock();
198            // This list is only populated while in the Sleeping state.
199            assert!(state.fd_ready_to_delete.is_empty());
200
201            if state.state.can_sleep() {
202                let deadline = state.timers.next_deadline();
203                state.state.sleep(deadline);
204                drop(state);
205
206                let timeout = deadline
207                    .map(|deadline| {
208                        let now = Instant::now();
209                        (deadline.max(now) - now)
210                            .as_millis()
211                            .try_into()
212                            .unwrap_or(i32::MAX)
213                    })
214                    .unwrap_or(-1);
215
216                let mut events = [libc::epoll_event { events: 0, u64: 0 }; 8];
217                let n = while_eintr(|| self.epfd.wait(&mut events, timeout))
218                    .expect("epoll_wait failed unexpectedly");
219
220                // Block unnecessary wakeups.
221                let _ = self.state.lock().state.wake();
222
223                for event in &events[..n] {
224                    if event.u64 == 0 {
225                        self.wake_event.try_wait();
226                    } else {
227                        // SAFETY: the operation context is still alive and
228                        // can be dereferenced. It's possible the underlying
229                        // FdReady has been dropped, but in that case the
230                        // associated operation context will have been added
231                        // to the fd_ready_to_delete list.
232                        //
233                        // Note that this is only true until state reverts
234                        // to the Running state, which occurs below.
235                        let op = unsafe { &*(event.u64 as usize as *const FdReadyOp) };
236                        op.wake_ready(event.events, &mut wakers);
237                    }
238                }
239
240                state = self.state.lock();
241
242                // Free any FdReadyOp objects that were deleted while in the epoll_wait call.
243                to_delete.append(&mut state.fd_ready_to_delete);
244            }
245        }
246    }
247}
248
249#[derive(Debug)]
250struct EpollFd(File);
251
252impl EpollFd {
253    fn new() -> Result<Self, Errno> {
254        // SAFETY: epoll_create1 creates a new, uniquely owned fd.
255        let epfd = unsafe {
256            File::from_raw_fd(libc::epoll_create1(libc::EPOLL_CLOEXEC).syscall_result()?)
257        };
258        Ok(Self(epfd))
259    }
260
261    fn add(&self, fd: RawFd, events: i32, context: u64) -> Result<(), Errno> {
262        let mut event = libc::epoll_event {
263            events: events as u32,
264            u64: context,
265        };
266        // SAFETY: safe to call with any fd.
267        unsafe {
268            libc::epoll_ctl(self.0.as_raw_fd(), libc::EPOLL_CTL_ADD, fd, &mut event)
269                .syscall_result()?;
270        }
271        Ok(())
272    }
273
274    fn del(&self, fd: RawFd) -> Result<(), Errno> {
275        // SAFETY: safe to call with any fd.
276        unsafe {
277            libc::epoll_ctl(
278                self.0.as_raw_fd(),
279                libc::EPOLL_CTL_DEL,
280                fd,
281                std::ptr::null_mut(),
282            )
283            .syscall_result()?;
284        }
285        Ok(())
286    }
287
288    fn wait(&self, events: &mut [libc::epoll_event], timeout: i32) -> Result<usize, Errno> {
289        let maxevents = events.len() as i32;
290        // SAFETY: maxevents is set appropriately to write to the events slice.
291        let n = unsafe {
292            libc::epoll_wait(self.0.as_raw_fd(), events.as_mut_ptr(), maxevents, timeout)
293                .syscall_result()?
294        };
295        Ok(n as usize)
296    }
297}
298
299impl ArcWake for EpollBackend {
300    fn wake_by_ref(arc_self: &Arc<Self>) {
301        let wake = arc_self.state.lock().state.wake();
302        if wake {
303            arc_self.wake_event.signal();
304        }
305    }
306}
307
308#[derive(Debug)]
309pub struct FdReady {
310    op: Arc<FdReadyOp>,
311    epoll: Arc<EpollBackend>,
312    fd: RawFd,
313}
314
315#[derive(Debug)]
316struct FdReadyOp {
317    inner: Mutex<FdReadyInner>,
318}
319
320#[derive(Debug)]
321struct FdReadyInner {
322    interests: PollInterestSet,
323}
324
325impl FdReadyOp {
326    fn wake_ready(&self, ep_events: u32, wakers: &mut WakerList) {
327        let revents = PollEvents::from_epoll_events(ep_events);
328        self.inner.lock().interests.wake_ready(revents, wakers)
329    }
330}
331
332impl FdReadyDriver for EpollDriver {
333    type FdReady = FdReady;
334
335    fn new_fd_ready(&self, fd: RawFd) -> io::Result<Self::FdReady> {
336        let op = Arc::new(FdReadyOp {
337            inner: Mutex::new(FdReadyInner {
338                interests: PollInterestSet::default(),
339            }),
340        });
341        self.inner.epfd.add(
342            fd,
343            libc::EPOLLET | libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLPRI | libc::EPOLLRDHUP,
344            Arc::as_ptr(&op) as usize as u64,
345        )?;
346        // Add reference owned by epfd, reclaimed in drop.
347        let _ = Arc::into_raw(op.clone());
348
349        Ok(FdReady {
350            op,
351            epoll: self.inner.clone(),
352            fd,
353        })
354    }
355}
356
357impl Drop for FdReady {
358    fn drop(&mut self) {
359        // N.B. This can fail if `self.fd` was closed before `self` is
360        // dropped. Since other executors don't behave this way, this might
361        // be a common mistake. But it's not recoverable here (e.g. by
362        // ignoring EBADF or ENOENT), since `self.fd` might be reused for
363        // another file descriptor in the race window between closing the fd
364        // and issuing this ioctl.
365        //
366        // Instead, the user of this object should ensure that `self` is
367        // dropped before the associated fd is closed (e.g. by putting the
368        // `FdReady` above the `File` or whatever in their struct).
369        self.epoll
370            .epfd
371            .del(self.fd)
372            .expect("epoll_ctl unexpectedly failed");
373
374        // SAFETY: Reclaiming the reference added in new_fd_ready.
375        let op = unsafe { Arc::from_raw(Arc::as_ptr(&self.op)) };
376        let mut state = self.epoll.state.lock();
377        if state.state.is_referencing_ops() {
378            state.fd_ready_to_delete.push(op);
379            if state.state.wake() {
380                drop(state);
381                self.epoll.wake_event.signal();
382            }
383        }
384    }
385}
386
387impl PollFdReady for FdReady {
388    fn poll_fd_ready(
389        &mut self,
390        cx: &mut Context<'_>,
391        slot: InterestSlot,
392        events: PollEvents,
393    ) -> Poll<PollEvents> {
394        self.op.inner.lock().interests.poll_ready(cx, slot, events)
395    }
396
397    fn clear_fd_ready(&mut self, slot: InterestSlot) {
398        self.op.inner.lock().interests.clear_ready(slot)
399    }
400}
401
402impl WaitDriver for EpollDriver {
403    type Wait = FdWait<FdReady>;
404
405    fn new_wait(&self, fd: RawFd, read_size: usize) -> io::Result<Self::Wait> {
406        Ok(FdWait::new(fd, self.new_fd_ready(fd)?, read_size))
407    }
408}
409
410impl TimerDriver for EpollDriver {
411    type Timer = Timer;
412
413    fn new_timer(&self) -> Self::Timer {
414        let id = self.inner.state.lock().timers.add();
415        Timer {
416            epoll: self.inner.clone(),
417            id,
418        }
419    }
420}
421
422#[derive(Debug)]
423pub struct Timer {
424    epoll: Arc<EpollBackend>,
425    id: TimerQueueId,
426}
427
428impl Drop for Timer {
429    fn drop(&mut self) {
430        let _waker = self.epoll.state.lock().timers.remove(self.id);
431    }
432}
433
434impl PollTimer for Timer {
435    fn poll_timer(&mut self, cx: &mut Context<'_>, deadline: Option<Instant>) -> Poll<Instant> {
436        let mut state = self.epoll.state.lock();
437        if let Some(deadline) = deadline {
438            state.timers.set_deadline(self.id, deadline);
439        }
440        match state.timers.poll_deadline(cx, self.id) {
441            TimerResult::TimedOut(now) => Poll::Ready(now),
442            TimerResult::Pending(deadline) => {
443                if state.state.wake_for_timer(deadline) {
444                    drop(state);
445                    self.epoll.wake_event.signal();
446                }
447                Poll::Pending
448            }
449        }
450    }
451
452    fn set_deadline(&mut self, deadline: Instant) {
453        let mut state = self.epoll.state.lock();
454        if state.timers.set_deadline(self.id, deadline) && state.state.wake_for_timer(deadline) {
455            drop(state);
456            self.epoll.wake_event.signal();
457        }
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use super::EpollPool;
464    use crate::executor_tests;
465
466    #[test]
467    fn waker_works() {
468        EpollPool::run_with(|_| executor_tests::waker_tests())
469    }
470
471    #[test]
472    fn spawn_works() {
473        executor_tests::spawn_tests(|| {
474            let pool = EpollPool::new();
475            (pool.driver(), move || pool.run())
476        })
477    }
478
479    #[test]
480    fn sleep_works() {
481        EpollPool::run_with(executor_tests::sleep_tests)
482    }
483
484    #[test]
485    fn wait_works() {
486        EpollPool::run_with(executor_tests::wait_tests)
487    }
488
489    #[test]
490    fn socket_works() {
491        EpollPool::run_with(executor_tests::socket_tests)
492    }
493}