pal_async/
local.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! A local executor, for running a single task with IO on the current thread.
5
6use self::timer::Timer;
7use crate::sys::local as sys;
8use crate::timer::Instant;
9use crate::timer::PollTimer;
10use crate::timer::TimerDriver;
11use crate::timer::TimerQueue;
12use crate::timer::TimerResult;
13use crate::waker::WakerList;
14use futures::task::ArcWake;
15use futures::task::waker_ref;
16use parking_lot::Condvar;
17use parking_lot::MappedMutexGuard;
18use parking_lot::Mutex;
19use parking_lot::MutexGuard;
20use std::future::Future;
21use std::pin::Pin;
22use std::pin::pin;
23use std::sync::Arc;
24use std::task::Context;
25use std::task::Poll;
26
27/// Blocks the current thread until the given future completes.
28pub fn block_on<Fut>(fut: Fut) -> Fut::Output
29where
30    Fut: Future,
31{
32    block_with_io(|_| fut)
33}
34
35/// Polls a future that needs to issue IO until it completes.
36pub fn block_with_io<F, R>(f: F) -> R
37where
38    F: AsyncFnOnce(LocalDriver) -> R,
39{
40    let mut executor = LocalExecutor::new();
41    let fut = f(executor.driver());
42    executor.run_until(pin!(fut))
43}
44
45/// An executor that runs on a single thread and runs only one future.
46struct LocalExecutor {
47    inner: Arc<LocalInner>,
48}
49
50impl LocalExecutor {
51    fn new() -> Self {
52        Self {
53            inner: Arc::new(LocalInner::default()),
54        }
55    }
56
57    fn driver(&self) -> LocalDriver {
58        LocalDriver {
59            inner: self.inner.clone(),
60        }
61    }
62
63    fn run_until<F: Future>(&mut self, mut fut: Pin<&mut F>) -> F::Output {
64        let waker = waker_ref(&self.inner);
65        let mut cx = Context::from_waker(&waker);
66        loop {
67            match fut.as_mut().poll(&mut cx) {
68                Poll::Ready(r) => break r,
69                Poll::Pending => self.inner.wait(),
70            }
71        }
72    }
73}
74
75/// An IO driver for single-task use on a single thread.
76#[derive(Debug, Clone)]
77pub struct LocalDriver {
78    pub(crate) inner: Arc<LocalInner>,
79}
80
81#[derive(Default, Debug)]
82pub(crate) struct LocalInner {
83    state: Mutex<LocalState>,
84    wait_state: Mutex<sys::WaitState>,
85    condvar: Condvar,
86    wait_cancel: sys::WaitCancel,
87}
88
89#[derive(Debug, PartialEq, Eq)]
90enum OpState {
91    // The executor is running.
92    Running,
93    // The executor should poll its task again without waiting.
94    RunAgain,
95    // The executor is waiting on IO.
96    Waiting,
97    // The executor wait has been cancelled.
98    Woken,
99}
100
101impl Default for OpState {
102    fn default() -> Self {
103        Self::Running
104    }
105}
106
107#[derive(Debug, Default)]
108struct LocalState {
109    op_state: OpState,
110    sys: sys::State,
111    timers: TimerQueue,
112}
113
114impl LocalInner {
115    pub fn lock_sys_state(&self) -> MappedMutexGuard<'_, sys::State> {
116        MutexGuard::map(self.lock_state(), |x| &mut x.sys)
117    }
118
119    // Locks the state for mutation.
120    //
121    // If the executor is currently waiting, then wakes up the executor first to
122    // ensure that the executor never sees state changes between pre_wait and
123    // post_wait.
124    fn lock_state(&self) -> MutexGuard<'_, LocalState> {
125        let mut guard = self.state.lock();
126        loop {
127            match guard.op_state {
128                OpState::Running | OpState::RunAgain => break,
129
130                OpState::Waiting => {
131                    guard.op_state = OpState::Woken;
132                    // Although it would be better to call this outside the
133                    // lock, doing so might result in live lock since the
134                    // executor could loop around and take the lock again before
135                    // we get a chance to. With this approach, the condition
136                    // variable notify will put this thread directly on the
137                    // mutex queue.
138                    self.wait_cancel.cancel_wait();
139                    self.condvar.wait(&mut guard);
140                }
141                OpState::Woken => {
142                    self.condvar.wait(&mut guard);
143                }
144            }
145        }
146        guard
147    }
148
149    fn wait(&self) {
150        let mut state = self.state.lock();
151        if state.op_state != OpState::Running {
152            assert_eq!(state.op_state, OpState::RunAgain);
153            state.op_state = OpState::Running;
154            return;
155        }
156
157        let mut wait_state = self
158            .wait_state
159            .try_lock()
160            .expect("wait should not be called concurrently");
161
162        state.sys.pre_wait(&mut wait_state, &self.wait_cancel);
163
164        let timeout = state.timers.next_deadline().map(|deadline| {
165            let now = Instant::now();
166            deadline.max(now) - now
167        });
168
169        {
170            state.op_state = OpState::Waiting;
171            drop(state);
172            wait_state.wait(&self.wait_cancel, timeout);
173            state = self.state.lock();
174            state.op_state = OpState::Running;
175        }
176
177        let mut wakers = WakerList::default();
178        state.sys.post_wait(&mut wait_state, &mut wakers);
179        state.timers.wake_expired(&mut wakers);
180        drop(state);
181        wakers.wake();
182        // Notify mutators that the wait has finished.
183        self.condvar.notify_all();
184    }
185}
186
187impl ArcWake for LocalInner {
188    fn wake_by_ref(arc_self: &Arc<Self>) {
189        let mut state = arc_self.state.lock();
190        match state.op_state {
191            OpState::Running => state.op_state = OpState::RunAgain,
192            OpState::RunAgain => {}
193            OpState::Waiting => {
194                state.op_state = OpState::Woken;
195                drop(state);
196                arc_self.wait_cancel.cancel_wait();
197            }
198            OpState::Woken => {}
199        }
200    }
201}
202
203// Use a separate module so that `Timer` is not visible.
204mod timer {
205    use super::LocalInner;
206    use crate::timer::TimerQueueId;
207    use std::sync::Arc;
208
209    #[derive(Debug)]
210    pub struct Timer {
211        pub(super) inner: Arc<LocalInner>,
212        pub(super) id: TimerQueueId,
213    }
214}
215
216impl TimerDriver for LocalDriver {
217    type Timer = Timer;
218
219    fn new_timer(&self) -> Self::Timer {
220        let id = self.inner.lock_state().timers.add();
221        Timer {
222            inner: self.inner.clone(),
223            id,
224        }
225    }
226}
227
228impl Drop for Timer {
229    fn drop(&mut self) {
230        let _waker = self.inner.lock_state().timers.remove(self.id);
231    }
232}
233
234impl PollTimer for Timer {
235    fn poll_timer(&mut self, cx: &mut Context<'_>, deadline: Option<Instant>) -> Poll<Instant> {
236        let mut state = self.inner.lock_state();
237        if let Some(deadline) = deadline {
238            state.timers.set_deadline(self.id, deadline);
239        }
240        match state.timers.poll_deadline(cx, self.id) {
241            TimerResult::TimedOut(now) => Poll::Ready(now),
242            TimerResult::Pending(_) => Poll::Pending,
243        }
244    }
245
246    fn set_deadline(&mut self, deadline: Instant) {
247        self.inner
248            .lock_state()
249            .timers
250            .set_deadline(self.id, deadline);
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::block_with_io;
257    use crate::executor_tests;
258
259    #[test]
260    fn waker_works() {
261        block_with_io(|_| executor_tests::waker_tests())
262    }
263
264    #[test]
265    fn sleep_works() {
266        block_with_io(executor_tests::sleep_tests)
267    }
268
269    #[test]
270    fn wait_works() {
271        block_with_io(executor_tests::wait_tests)
272    }
273
274    #[test]
275    fn socket_works() {
276        block_with_io(executor_tests::socket_tests)
277    }
278
279    #[cfg(windows)]
280    #[test]
281    fn overlapped_file_works() {
282        block_with_io(executor_tests::windows::overlapped_file_tests)
283    }
284}