pal_async/unix/
wait.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Code for managing asynchronous waits of eventfds and similar objects.
5
6use crate::fd::PollFdReady;
7use crate::interest::InterestSlot;
8use crate::interest::PollEvents;
9use crate::wait::MAXIMUM_WAIT_READ_SIZE;
10use crate::wait::PollWait;
11use pal::unix::Errno;
12use pal::unix::SyscallResult;
13use std::os::unix::prelude::*;
14use std::task::Context;
15use std::task::Poll;
16
17/// A [`PollWait`] implementation that waits for an fd to be signaled, then
18/// reads from it.
19#[derive(Debug)]
20pub struct FdWait<T> {
21    fd_ready: T,
22    fd: RawFd,
23    read_size: usize,
24}
25
26impl<T: PollFdReady> FdWait<T> {
27    /// Returns a new instance that waits for `fd` to be ready via `fd_ready`,
28    /// then reads `read_size` bytes from it.
29    ///
30    /// Panics if `read_size` is greater than [`MAXIMUM_WAIT_READ_SIZE`].
31    pub fn new(fd: RawFd, fd_ready: T, read_size: usize) -> Self {
32        assert!(read_size <= MAXIMUM_WAIT_READ_SIZE);
33        Self {
34            fd_ready,
35            fd,
36            read_size,
37        }
38    }
39}
40
41impl<T: 'static + PollFdReady> PollWait for FdWait<T> {
42    fn poll_wait(&mut self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
43        loop {
44            std::task::ready!(
45                self.fd_ready
46                    .poll_fd_ready(cx, InterestSlot::Read, PollEvents::IN)
47            );
48
49            self.fd_ready.clear_fd_ready(InterestSlot::Read);
50
51            let mut buf = [0u64; 1];
52            assert!(self.read_size <= size_of_val(&buf));
53
54            // Consume the event's signal state so that we can get subsequent signals.
55            //
56            // SAFETY: calling with owned fd and appropriately sized buffer.
57            let r = unsafe {
58                libc::read(self.fd, buf.as_mut_ptr().cast(), self.read_size).syscall_result()
59            };
60
61            match r {
62                Ok(_) => break,
63                Err(Errno(libc::EAGAIN)) => {
64                    // The event is not actually ready, presumably due to a
65                    // race. Loop around again.
66                }
67                Err(err) => Err(err)?,
68            }
69        }
70
71        Poll::Ready(Ok(()))
72    }
73
74    fn poll_cancel_wait(&mut self, _cx: &mut Context<'_>) -> Poll<bool> {
75        // No need to cancel anything, since the wait signal is synchronously
76        // consumed in `poll_wait`.
77        Poll::Ready(false)
78    }
79}