1use super::epoll_uring::EPOLL_URING_STATE;
7use super::epoll_uring::EPOLL_URING_TOKEN;
8use super::epoll_uring::EpollIoUring;
9use super::epoll_uring::EpollUringThreadState;
10use super::wait::FdWait;
11use crate::fd::FdReadyDriver;
12use crate::fd::PollFdReady;
13use crate::interest::InterestSlot;
14use crate::interest::PollEvents;
15use crate::interest::PollInterestSet;
16use crate::io_pool::IoBackend;
17use crate::io_pool::IoDriver;
18use crate::io_pool::IoPool;
19use crate::timer::Instant;
20use crate::timer::PollTimer;
21use crate::timer::TimerDriver;
22use crate::timer::TimerQueue;
23use crate::timer::TimerQueueId;
24use crate::timer::TimerResult;
25use crate::wait::WaitDriver;
26use crate::waker::WakerList;
27use futures::FutureExt;
28use futures::task::ArcWake;
29use futures::task::waker_ref;
30use once_cell::sync::OnceCell;
31use pal::unix::Errno;
32use pal::unix::SyscallResult;
33use pal::unix::while_eintr;
34use pal_event::Event;
35use parking_lot::Mutex;
36use std::fs::File;
37use std::future::Future;
38use std::io;
39use std::os::unix::prelude::*;
40use std::pin::pin;
41use std::sync::Arc;
42use std::task::Context;
43use std::task::Poll;
44
45pub type EpollPool = IoPool<EpollBackend>;
47
48pub type EpollDriver = IoDriver<EpollBackend>;
50
51#[derive(Debug)]
52pub struct EpollBackend {
53 epfd: EpollFd,
54 wake_event: Event,
55 state: Mutex<EpollState>,
56 io_uring: OnceCell<EpollIoUring>,
57}
58
59impl Default for EpollBackend {
60 fn default() -> Self {
61 let epfd = EpollFd::new().expect("epoll not functional");
62 let wake_event = Event::new();
63 epfd.add(
67 wake_event.as_fd().as_raw_fd(),
68 libc::EPOLLIN | libc::EPOLLET,
69 0,
70 )
71 .expect("could not add wake event");
72 Self {
73 epfd,
74 wake_event,
75 state: Mutex::new(EpollState {
76 state: PoolState::Running,
77 timers: TimerQueue::default(),
78 fd_ready_to_delete: Vec::new(),
79 }),
80 io_uring: OnceCell::new(),
81 }
82 }
83}
84
85#[derive(Debug)]
86struct EpollState {
87 state: PoolState,
88 timers: TimerQueue,
89 fd_ready_to_delete: Vec<Arc<FdReadyOp>>,
90}
91
92#[derive(Debug)]
93enum PoolState {
94 Running,
95 RunAgain,
96 Sleeping(Option<Instant>),
97 Waking,
98}
99
100impl PoolState {
101 fn reset(&mut self) {
102 match self {
103 PoolState::Running => {}
104 PoolState::RunAgain => {}
105 PoolState::Sleeping(_) => unreachable!(),
106 PoolState::Waking => {}
107 }
108 *self = PoolState::Running;
109 }
110
111 #[must_use]
113 fn wake(&mut self) -> bool {
114 match self {
115 PoolState::Running => {
116 *self = PoolState::RunAgain;
117 false
118 }
119 PoolState::RunAgain => false,
120 PoolState::Sleeping(_) => {
121 *self = PoolState::Waking;
122 true
123 }
124 PoolState::Waking => false,
125 }
126 }
127
128 fn can_sleep(&self) -> bool {
129 match self {
130 PoolState::Running => true,
131 PoolState::RunAgain => false,
132 PoolState::Sleeping(_) => unreachable!(),
133 PoolState::Waking => unreachable!(),
134 }
135 }
136
137 fn sleep(&mut self, deadline: Option<Instant>) {
138 match self {
139 PoolState::Running => {}
140 PoolState::RunAgain => unreachable!(),
141 PoolState::Sleeping(_) => unreachable!(),
142 PoolState::Waking => unreachable!(),
143 }
144 *self = PoolState::Sleeping(deadline);
145 }
146
147 #[must_use]
149 fn wake_for_timer(&mut self, deadline: Instant) -> bool {
150 match self {
151 PoolState::Running => false,
152 PoolState::RunAgain => false,
153 PoolState::Waking => false,
154 &mut PoolState::Sleeping(Some(current_deadline)) if current_deadline <= deadline => {
155 false
156 }
157 PoolState::Sleeping(_) => {
158 *self = PoolState::Waking;
159 true
160 }
161 }
162 }
163
164 fn is_referencing_ops(&self) -> bool {
165 match self {
166 PoolState::Running => false,
167 PoolState::RunAgain => false,
168 PoolState::Sleeping(_) => true,
169 PoolState::Waking => true,
170 }
171 }
172}
173
174impl IoBackend for EpollBackend {
175 fn name() -> &'static str {
176 "epoll"
177 }
178
179 fn run<Fut: Future>(self: &Arc<Self>, fut: Fut) -> Fut::Output {
180 let mut fut = pin!(fut);
181
182 let waker = waker_ref(self);
183 let mut cx = Context::from_waker(&waker);
184
185 let mut to_delete: Vec<_> = Vec::new();
186 let mut wakers = WakerList::default();
187
188 let uring_thread_state = EpollUringThreadState::new(std::ptr::null());
192
193 EPOLL_URING_STATE.with(|cell| {
194 cell.lend(&uring_thread_state, || {
195 let mut state = self.state.lock();
196 loop {
197 state.state.reset();
198
199 state.timers.wake_expired(&mut wakers);
201 drop(state);
202
203 if let Some(uring) = self.io_uring.get() {
208 unsafe { uring_thread_state.set_ring(uring) }
211 .process_completions(&mut wakers);
212 }
213
214 wakers.wake();
215 to_delete.clear();
216
217 match fut.poll_unpin(&mut cx) {
218 Poll::Ready(r) => break r,
219 Poll::Pending => {}
220 }
221
222 state = self.state.lock();
223 assert!(state.fd_ready_to_delete.is_empty());
225
226 if state.state.can_sleep() {
227 let deadline = state.timers.next_deadline();
228 state.state.sleep(deadline);
229 drop(state);
230
231 if let Some(uring) = self.io_uring.get() {
236 unsafe { uring_thread_state.set_ring(uring) }.flush();
239 }
240
241 let timeout = deadline
242 .map(|deadline| {
243 let now = Instant::now();
244 (deadline.max(now) - now)
245 .as_millis()
246 .try_into()
247 .unwrap_or(i32::MAX)
248 })
249 .unwrap_or(-1);
250
251 let mut events = [libc::epoll_event { events: 0, u64: 0 }; 8];
252 let n = while_eintr(|| self.epfd.wait(&mut events, timeout))
253 .expect("epoll_wait failed unexpectedly");
254
255 let _ = self.state.lock().state.wake();
257
258 for event in &events[..n] {
259 if event.u64 == 0 {
260 self.wake_event.try_wait();
261 } else if event.u64 == EPOLL_URING_TOKEN {
262 } else {
265 let op = unsafe { &*(event.u64 as usize as *const FdReadyOp) };
274 op.wake_ready(event.events, &mut wakers);
275 }
276 }
277
278 state = self.state.lock();
279
280 to_delete.append(&mut state.fd_ready_to_delete);
282 }
283 }
284 }) }) }
287}
288
289#[derive(Debug)]
290struct EpollFd(File);
291
292impl EpollFd {
293 fn new() -> Result<Self, Errno> {
294 let epfd = unsafe {
296 File::from_raw_fd(libc::epoll_create1(libc::EPOLL_CLOEXEC).syscall_result()?)
297 };
298 Ok(Self(epfd))
299 }
300
301 fn add(&self, fd: RawFd, events: i32, context: u64) -> Result<(), Errno> {
302 let mut event = libc::epoll_event {
303 events: events as u32,
304 u64: context,
305 };
306 unsafe {
308 libc::epoll_ctl(self.0.as_raw_fd(), libc::EPOLL_CTL_ADD, fd, &mut event)
309 .syscall_result()?;
310 }
311 Ok(())
312 }
313
314 fn del(&self, fd: RawFd) -> Result<(), Errno> {
315 unsafe {
317 libc::epoll_ctl(
318 self.0.as_raw_fd(),
319 libc::EPOLL_CTL_DEL,
320 fd,
321 std::ptr::null_mut(),
322 )
323 .syscall_result()?;
324 }
325 Ok(())
326 }
327
328 fn wait(&self, events: &mut [libc::epoll_event], timeout: i32) -> Result<usize, Errno> {
329 let maxevents = events.len() as i32;
330 let n = unsafe {
332 libc::epoll_wait(self.0.as_raw_fd(), events.as_mut_ptr(), maxevents, timeout)
333 .syscall_result()?
334 };
335 Ok(n as usize)
336 }
337}
338
339impl ArcWake for EpollBackend {
340 fn wake_by_ref(arc_self: &Arc<Self>) {
341 let wake = arc_self.state.lock().state.wake();
342 if wake {
343 arc_self.wake_event.signal();
344 }
345 }
346}
347
348#[derive(Debug)]
349pub struct FdReady {
350 op: Arc<FdReadyOp>,
351 epoll: Arc<EpollBackend>,
352 fd: RawFd,
353}
354
355#[derive(Debug)]
356struct FdReadyOp {
357 inner: Mutex<FdReadyInner>,
358}
359
360#[derive(Debug)]
361struct FdReadyInner {
362 interests: PollInterestSet,
363}
364
365impl FdReadyOp {
366 fn wake_ready(&self, ep_events: u32, wakers: &mut WakerList) {
367 let revents = PollEvents::from_epoll_events(ep_events);
368 self.inner.lock().interests.wake_ready(revents, wakers)
369 }
370}
371
372impl FdReadyDriver for EpollDriver {
373 type FdReady = FdReady;
374
375 fn new_fd_ready(&self, fd: RawFd) -> io::Result<Self::FdReady> {
376 let op = Arc::new(FdReadyOp {
377 inner: Mutex::new(FdReadyInner {
378 interests: PollInterestSet::default(),
379 }),
380 });
381 self.inner.epfd.add(
382 fd,
383 libc::EPOLLET | libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLPRI | libc::EPOLLRDHUP,
384 Arc::as_ptr(&op) as usize as u64,
385 )?;
386 let _ = Arc::into_raw(op.clone());
388
389 Ok(FdReady {
390 op,
391 epoll: self.inner.clone(),
392 fd,
393 })
394 }
395}
396
397impl Drop for FdReady {
398 fn drop(&mut self) {
399 self.epoll
410 .epfd
411 .del(self.fd)
412 .expect("epoll_ctl unexpectedly failed");
413
414 let op = unsafe { Arc::from_raw(Arc::as_ptr(&self.op)) };
416 let mut state = self.epoll.state.lock();
417 if state.state.is_referencing_ops() {
418 state.fd_ready_to_delete.push(op);
419 if state.state.wake() {
420 drop(state);
421 self.epoll.wake_event.signal();
422 }
423 }
424 }
425}
426
427impl PollFdReady for FdReady {
428 fn poll_fd_ready(
429 &mut self,
430 cx: &mut Context<'_>,
431 slot: InterestSlot,
432 events: PollEvents,
433 ) -> Poll<PollEvents> {
434 self.op.inner.lock().interests.poll_ready(cx, slot, events)
435 }
436
437 fn clear_fd_ready(&mut self, slot: InterestSlot) {
438 self.op.inner.lock().interests.clear_ready(slot)
439 }
440}
441
442impl WaitDriver for EpollDriver {
443 type Wait = FdWait<FdReady>;
444
445 fn new_wait(&self, fd: RawFd, read_size: usize) -> io::Result<Self::Wait> {
446 Ok(FdWait::new(fd, self.new_fd_ready(fd)?, read_size))
447 }
448}
449
450impl TimerDriver for EpollDriver {
451 type Timer = Timer;
452
453 fn new_timer(&self) -> Self::Timer {
454 let id = self.inner.state.lock().timers.add();
455 Timer {
456 epoll: self.inner.clone(),
457 id,
458 }
459 }
460}
461
462impl crate::io_uring::IoUringDriver for EpollDriver {
463 type Submitter = EpollIoUring;
464
465 fn io_uring_submitter(&self) -> Option<&EpollIoUring> {
466 self.inner
467 .io_uring
468 .get_or_try_init(|| {
469 EpollIoUring::new(
470 self.inner.epfd.0.as_raw_fd(),
471 self.inner.wake_event.as_fd().as_raw_fd(),
472 )
473 })
474 .ok()
475 }
476}
477
478#[derive(Debug)]
479pub struct Timer {
480 epoll: Arc<EpollBackend>,
481 id: TimerQueueId,
482}
483
484impl Drop for Timer {
485 fn drop(&mut self) {
486 let _waker = self.epoll.state.lock().timers.remove(self.id);
487 }
488}
489
490impl PollTimer for Timer {
491 fn poll_timer(&mut self, cx: &mut Context<'_>, deadline: Option<Instant>) -> Poll<Instant> {
492 let mut state = self.epoll.state.lock();
493 if let Some(deadline) = deadline {
494 state.timers.set_deadline(self.id, deadline);
495 }
496 match state.timers.poll_deadline(cx, self.id) {
497 TimerResult::TimedOut(now) => Poll::Ready(now),
498 TimerResult::Pending(deadline) => {
499 if state.state.wake_for_timer(deadline) {
500 drop(state);
501 self.epoll.wake_event.signal();
502 }
503 Poll::Pending
504 }
505 }
506 }
507
508 fn set_deadline(&mut self, deadline: Instant) {
509 let mut state = self.epoll.state.lock();
510 if state.timers.set_deadline(self.id, deadline) && state.state.wake_for_timer(deadline) {
511 drop(state);
512 self.epoll.wake_event.signal();
513 }
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::EpollPool;
520 use crate::executor_tests;
521
522 #[test]
523 fn waker_works() {
524 EpollPool::run_with(|_| executor_tests::waker_tests())
525 }
526
527 #[test]
528 fn spawn_works() {
529 executor_tests::spawn_tests(|| {
530 let pool = EpollPool::new();
531 (pool.driver(), move || pool.run())
532 })
533 }
534
535 #[test]
536 fn sleep_works() {
537 EpollPool::run_with(executor_tests::sleep_tests)
538 }
539
540 #[test]
541 fn wait_works() {
542 EpollPool::run_with(executor_tests::wait_tests)
543 }
544
545 #[test]
546 fn socket_works() {
547 EpollPool::run_with(executor_tests::socket_tests)
548 }
549
550 #[test]
551 fn uring_works() {
552 EpollPool::run_with(executor_tests::io_uring_tests::uring_tests)
553 }
554}