1use super::wait::FdWait;
7use crate::fd::FdReadyDriver;
8use crate::fd::PollFdReady;
9use crate::interest::InterestSlot;
10use crate::interest::PollEvents;
11use crate::interest::PollInterestSet;
12use crate::io_pool::IoBackend;
13use crate::io_pool::IoDriver;
14use crate::io_pool::IoPool;
15use crate::timer::Instant;
16use crate::timer::PollTimer;
17use crate::timer::TimerDriver;
18use crate::timer::TimerQueue;
19use crate::timer::TimerQueueId;
20use crate::timer::TimerResult;
21use crate::wait::WaitDriver;
22use crate::waker::WakerList;
23use futures::FutureExt;
24use futures::task::ArcWake;
25use futures::task::waker_ref;
26use pal::unix::Errno;
27use pal::unix::SyscallResult;
28use pal::unix::while_eintr;
29use pal_event::Event;
30use parking_lot::Mutex;
31use std::fs::File;
32use std::future::Future;
33use std::io;
34use std::os::unix::prelude::*;
35use std::pin::pin;
36use std::sync::Arc;
37use std::task::Context;
38use std::task::Poll;
39
40pub type EpollPool = IoPool<EpollBackend>;
42
43pub type EpollDriver = IoDriver<EpollBackend>;
45
46#[derive(Debug)]
47pub struct EpollBackend {
48 epfd: EpollFd,
49 wake_event: Event,
50 state: Mutex<EpollState>,
51}
52
53impl Default for EpollBackend {
54 fn default() -> Self {
55 let epfd = EpollFd::new().expect("epoll not functional");
56 let wake_event = Event::new();
57 epfd.add(
61 wake_event.as_fd().as_raw_fd(),
62 libc::EPOLLIN | libc::EPOLLET,
63 0,
64 )
65 .expect("could not add wake event");
66 Self {
67 epfd,
68 wake_event,
69 state: Mutex::new(EpollState {
70 state: PoolState::Running,
71 timers: TimerQueue::default(),
72 fd_ready_to_delete: Vec::new(),
73 }),
74 }
75 }
76}
77
78#[derive(Debug)]
79struct EpollState {
80 state: PoolState,
81 timers: TimerQueue,
82 fd_ready_to_delete: Vec<Arc<FdReadyOp>>,
83}
84
85#[derive(Debug)]
86enum PoolState {
87 Running,
88 RunAgain,
89 Sleeping(Option<Instant>),
90 Waking,
91}
92
93impl PoolState {
94 fn reset(&mut self) {
95 match self {
96 PoolState::Running => {}
97 PoolState::RunAgain => {}
98 PoolState::Sleeping(_) => unreachable!(),
99 PoolState::Waking => {}
100 }
101 *self = PoolState::Running;
102 }
103
104 #[must_use]
106 fn wake(&mut self) -> bool {
107 match self {
108 PoolState::Running => {
109 *self = PoolState::RunAgain;
110 false
111 }
112 PoolState::RunAgain => false,
113 PoolState::Sleeping(_) => {
114 *self = PoolState::Waking;
115 true
116 }
117 PoolState::Waking => false,
118 }
119 }
120
121 fn can_sleep(&self) -> bool {
122 match self {
123 PoolState::Running => true,
124 PoolState::RunAgain => false,
125 PoolState::Sleeping(_) => unreachable!(),
126 PoolState::Waking => unreachable!(),
127 }
128 }
129
130 fn sleep(&mut self, deadline: Option<Instant>) {
131 match self {
132 PoolState::Running => {}
133 PoolState::RunAgain => unreachable!(),
134 PoolState::Sleeping(_) => unreachable!(),
135 PoolState::Waking => unreachable!(),
136 }
137 *self = PoolState::Sleeping(deadline);
138 }
139
140 #[must_use]
142 fn wake_for_timer(&mut self, deadline: Instant) -> bool {
143 match self {
144 PoolState::Running => false,
145 PoolState::RunAgain => false,
146 PoolState::Waking => false,
147 &mut PoolState::Sleeping(Some(current_deadline)) if current_deadline <= deadline => {
148 false
149 }
150 PoolState::Sleeping(_) => {
151 *self = PoolState::Waking;
152 true
153 }
154 }
155 }
156
157 fn is_referencing_ops(&self) -> bool {
158 match self {
159 PoolState::Running => false,
160 PoolState::RunAgain => false,
161 PoolState::Sleeping(_) => true,
162 PoolState::Waking => true,
163 }
164 }
165}
166
167impl IoBackend for EpollBackend {
168 fn name() -> &'static str {
169 "epoll"
170 }
171
172 fn run<Fut: Future>(self: &Arc<Self>, fut: Fut) -> Fut::Output {
173 let mut fut = pin!(fut);
174
175 let waker = waker_ref(self);
176 let mut cx = Context::from_waker(&waker);
177
178 let mut to_delete: Vec<_> = Vec::new();
179 let mut wakers = WakerList::default();
180
181 let mut state = self.state.lock();
182 loop {
183 state.state.reset();
184
185 state.timers.wake_expired(&mut wakers);
187 drop(state);
188
189 wakers.wake();
190 to_delete.clear();
191
192 match fut.poll_unpin(&mut cx) {
193 Poll::Ready(r) => break r,
194 Poll::Pending => {}
195 }
196
197 state = self.state.lock();
198 assert!(state.fd_ready_to_delete.is_empty());
200
201 if state.state.can_sleep() {
202 let deadline = state.timers.next_deadline();
203 state.state.sleep(deadline);
204 drop(state);
205
206 let timeout = deadline
207 .map(|deadline| {
208 let now = Instant::now();
209 (deadline.max(now) - now)
210 .as_millis()
211 .try_into()
212 .unwrap_or(i32::MAX)
213 })
214 .unwrap_or(-1);
215
216 let mut events = [libc::epoll_event { events: 0, u64: 0 }; 8];
217 let n = while_eintr(|| self.epfd.wait(&mut events, timeout))
218 .expect("epoll_wait failed unexpectedly");
219
220 let _ = self.state.lock().state.wake();
222
223 for event in &events[..n] {
224 if event.u64 == 0 {
225 self.wake_event.try_wait();
226 } else {
227 let op = unsafe { &*(event.u64 as usize as *const FdReadyOp) };
236 op.wake_ready(event.events, &mut wakers);
237 }
238 }
239
240 state = self.state.lock();
241
242 to_delete.append(&mut state.fd_ready_to_delete);
244 }
245 }
246 }
247}
248
249#[derive(Debug)]
250struct EpollFd(File);
251
252impl EpollFd {
253 fn new() -> Result<Self, Errno> {
254 let epfd = unsafe {
256 File::from_raw_fd(libc::epoll_create1(libc::EPOLL_CLOEXEC).syscall_result()?)
257 };
258 Ok(Self(epfd))
259 }
260
261 fn add(&self, fd: RawFd, events: i32, context: u64) -> Result<(), Errno> {
262 let mut event = libc::epoll_event {
263 events: events as u32,
264 u64: context,
265 };
266 unsafe {
268 libc::epoll_ctl(self.0.as_raw_fd(), libc::EPOLL_CTL_ADD, fd, &mut event)
269 .syscall_result()?;
270 }
271 Ok(())
272 }
273
274 fn del(&self, fd: RawFd) -> Result<(), Errno> {
275 unsafe {
277 libc::epoll_ctl(
278 self.0.as_raw_fd(),
279 libc::EPOLL_CTL_DEL,
280 fd,
281 std::ptr::null_mut(),
282 )
283 .syscall_result()?;
284 }
285 Ok(())
286 }
287
288 fn wait(&self, events: &mut [libc::epoll_event], timeout: i32) -> Result<usize, Errno> {
289 let maxevents = events.len() as i32;
290 let n = unsafe {
292 libc::epoll_wait(self.0.as_raw_fd(), events.as_mut_ptr(), maxevents, timeout)
293 .syscall_result()?
294 };
295 Ok(n as usize)
296 }
297}
298
299impl ArcWake for EpollBackend {
300 fn wake_by_ref(arc_self: &Arc<Self>) {
301 let wake = arc_self.state.lock().state.wake();
302 if wake {
303 arc_self.wake_event.signal();
304 }
305 }
306}
307
308#[derive(Debug)]
309pub struct FdReady {
310 op: Arc<FdReadyOp>,
311 epoll: Arc<EpollBackend>,
312 fd: RawFd,
313}
314
315#[derive(Debug)]
316struct FdReadyOp {
317 inner: Mutex<FdReadyInner>,
318}
319
320#[derive(Debug)]
321struct FdReadyInner {
322 interests: PollInterestSet,
323}
324
325impl FdReadyOp {
326 fn wake_ready(&self, ep_events: u32, wakers: &mut WakerList) {
327 let revents = PollEvents::from_epoll_events(ep_events);
328 self.inner.lock().interests.wake_ready(revents, wakers)
329 }
330}
331
332impl FdReadyDriver for EpollDriver {
333 type FdReady = FdReady;
334
335 fn new_fd_ready(&self, fd: RawFd) -> io::Result<Self::FdReady> {
336 let op = Arc::new(FdReadyOp {
337 inner: Mutex::new(FdReadyInner {
338 interests: PollInterestSet::default(),
339 }),
340 });
341 self.inner.epfd.add(
342 fd,
343 libc::EPOLLET | libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLPRI | libc::EPOLLRDHUP,
344 Arc::as_ptr(&op) as usize as u64,
345 )?;
346 let _ = Arc::into_raw(op.clone());
348
349 Ok(FdReady {
350 op,
351 epoll: self.inner.clone(),
352 fd,
353 })
354 }
355}
356
357impl Drop for FdReady {
358 fn drop(&mut self) {
359 self.epoll
370 .epfd
371 .del(self.fd)
372 .expect("epoll_ctl unexpectedly failed");
373
374 let op = unsafe { Arc::from_raw(Arc::as_ptr(&self.op)) };
376 let mut state = self.epoll.state.lock();
377 if state.state.is_referencing_ops() {
378 state.fd_ready_to_delete.push(op);
379 if state.state.wake() {
380 drop(state);
381 self.epoll.wake_event.signal();
382 }
383 }
384 }
385}
386
387impl PollFdReady for FdReady {
388 fn poll_fd_ready(
389 &mut self,
390 cx: &mut Context<'_>,
391 slot: InterestSlot,
392 events: PollEvents,
393 ) -> Poll<PollEvents> {
394 self.op.inner.lock().interests.poll_ready(cx, slot, events)
395 }
396
397 fn clear_fd_ready(&mut self, slot: InterestSlot) {
398 self.op.inner.lock().interests.clear_ready(slot)
399 }
400}
401
402impl WaitDriver for EpollDriver {
403 type Wait = FdWait<FdReady>;
404
405 fn new_wait(&self, fd: RawFd, read_size: usize) -> io::Result<Self::Wait> {
406 Ok(FdWait::new(fd, self.new_fd_ready(fd)?, read_size))
407 }
408}
409
410impl TimerDriver for EpollDriver {
411 type Timer = Timer;
412
413 fn new_timer(&self) -> Self::Timer {
414 let id = self.inner.state.lock().timers.add();
415 Timer {
416 epoll: self.inner.clone(),
417 id,
418 }
419 }
420}
421
422#[derive(Debug)]
423pub struct Timer {
424 epoll: Arc<EpollBackend>,
425 id: TimerQueueId,
426}
427
428impl Drop for Timer {
429 fn drop(&mut self) {
430 let _waker = self.epoll.state.lock().timers.remove(self.id);
431 }
432}
433
434impl PollTimer for Timer {
435 fn poll_timer(&mut self, cx: &mut Context<'_>, deadline: Option<Instant>) -> Poll<Instant> {
436 let mut state = self.epoll.state.lock();
437 if let Some(deadline) = deadline {
438 state.timers.set_deadline(self.id, deadline);
439 }
440 match state.timers.poll_deadline(cx, self.id) {
441 TimerResult::TimedOut(now) => Poll::Ready(now),
442 TimerResult::Pending(deadline) => {
443 if state.state.wake_for_timer(deadline) {
444 drop(state);
445 self.epoll.wake_event.signal();
446 }
447 Poll::Pending
448 }
449 }
450 }
451
452 fn set_deadline(&mut self, deadline: Instant) {
453 let mut state = self.epoll.state.lock();
454 if state.timers.set_deadline(self.id, deadline) && state.state.wake_for_timer(deadline) {
455 drop(state);
456 self.epoll.wake_event.signal();
457 }
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::EpollPool;
464 use crate::executor_tests;
465
466 #[test]
467 fn waker_works() {
468 EpollPool::run_with(|_| executor_tests::waker_tests())
469 }
470
471 #[test]
472 fn spawn_works() {
473 executor_tests::spawn_tests(|| {
474 let pool = EpollPool::new();
475 (pool.driver(), move || pool.run())
476 })
477 }
478
479 #[test]
480 fn sleep_works() {
481 EpollPool::run_with(executor_tests::sleep_tests)
482 }
483
484 #[test]
485 fn wait_works() {
486 EpollPool::run_with(executor_tests::wait_tests)
487 }
488
489 #[test]
490 fn socket_works() {
491 EpollPool::run_with(executor_tests::socket_tests)
492 }
493}