1use self::timer::Timer;
7use crate::sys::local as sys;
8use crate::timer::Instant;
9use crate::timer::PollTimer;
10use crate::timer::TimerDriver;
11use crate::timer::TimerQueue;
12use crate::timer::TimerResult;
13use crate::waker::WakerList;
14use futures::task::ArcWake;
15use futures::task::waker_ref;
16use parking_lot::Condvar;
17use parking_lot::MappedMutexGuard;
18use parking_lot::Mutex;
19use parking_lot::MutexGuard;
20use std::future::Future;
21use std::pin::Pin;
22use std::pin::pin;
23use std::sync::Arc;
24use std::task::Context;
25use std::task::Poll;
26
27pub fn block_on<Fut>(fut: Fut) -> Fut::Output
29where
30 Fut: Future,
31{
32 block_with_io(|_| fut)
33}
34
35pub fn block_with_io<F, R>(f: F) -> R
37where
38 F: AsyncFnOnce(LocalDriver) -> R,
39{
40 let mut executor = LocalExecutor::new();
41 let fut = f(executor.driver());
42 executor.run_until(pin!(fut))
43}
44
45struct LocalExecutor {
47 inner: Arc<LocalInner>,
48}
49
50impl LocalExecutor {
51 fn new() -> Self {
52 Self {
53 inner: Arc::new(LocalInner::default()),
54 }
55 }
56
57 fn driver(&self) -> LocalDriver {
58 LocalDriver {
59 inner: self.inner.clone(),
60 }
61 }
62
63 fn run_until<F: Future>(&mut self, mut fut: Pin<&mut F>) -> F::Output {
64 let waker = waker_ref(&self.inner);
65 let mut cx = Context::from_waker(&waker);
66 loop {
67 match fut.as_mut().poll(&mut cx) {
68 Poll::Ready(r) => break r,
69 Poll::Pending => self.inner.wait(),
70 }
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
77pub struct LocalDriver {
78 pub(crate) inner: Arc<LocalInner>,
79}
80
81#[derive(Default, Debug)]
82pub(crate) struct LocalInner {
83 state: Mutex<LocalState>,
84 wait_state: Mutex<sys::WaitState>,
85 condvar: Condvar,
86 wait_cancel: sys::WaitCancel,
87}
88
89#[derive(Debug, PartialEq, Eq)]
90enum OpState {
91 Running,
93 RunAgain,
95 Waiting,
97 Woken,
99}
100
101impl Default for OpState {
102 fn default() -> Self {
103 Self::Running
104 }
105}
106
107#[derive(Debug, Default)]
108struct LocalState {
109 op_state: OpState,
110 sys: sys::State,
111 timers: TimerQueue,
112}
113
114impl LocalInner {
115 pub fn lock_sys_state(&self) -> MappedMutexGuard<'_, sys::State> {
116 MutexGuard::map(self.lock_state(), |x| &mut x.sys)
117 }
118
119 fn lock_state(&self) -> MutexGuard<'_, LocalState> {
125 let mut guard = self.state.lock();
126 loop {
127 match guard.op_state {
128 OpState::Running | OpState::RunAgain => break,
129
130 OpState::Waiting => {
131 guard.op_state = OpState::Woken;
132 self.wait_cancel.cancel_wait();
139 self.condvar.wait(&mut guard);
140 }
141 OpState::Woken => {
142 self.condvar.wait(&mut guard);
143 }
144 }
145 }
146 guard
147 }
148
149 fn wait(&self) {
150 let mut state = self.state.lock();
151 if state.op_state != OpState::Running {
152 assert_eq!(state.op_state, OpState::RunAgain);
153 state.op_state = OpState::Running;
154 return;
155 }
156
157 let mut wait_state = self
158 .wait_state
159 .try_lock()
160 .expect("wait should not be called concurrently");
161
162 state.sys.pre_wait(&mut wait_state, &self.wait_cancel);
163
164 let timeout = state.timers.next_deadline().map(|deadline| {
165 let now = Instant::now();
166 deadline.max(now) - now
167 });
168
169 {
170 state.op_state = OpState::Waiting;
171 drop(state);
172 wait_state.wait(&self.wait_cancel, timeout);
173 state = self.state.lock();
174 state.op_state = OpState::Running;
175 }
176
177 let mut wakers = WakerList::default();
178 state.sys.post_wait(&mut wait_state, &mut wakers);
179 state.timers.wake_expired(&mut wakers);
180 drop(state);
181 wakers.wake();
182 self.condvar.notify_all();
184 }
185}
186
187impl ArcWake for LocalInner {
188 fn wake_by_ref(arc_self: &Arc<Self>) {
189 let mut state = arc_self.state.lock();
190 match state.op_state {
191 OpState::Running => state.op_state = OpState::RunAgain,
192 OpState::RunAgain => {}
193 OpState::Waiting => {
194 state.op_state = OpState::Woken;
195 drop(state);
196 arc_self.wait_cancel.cancel_wait();
197 }
198 OpState::Woken => {}
199 }
200 }
201}
202
203mod timer {
205 use super::LocalInner;
206 use crate::timer::TimerQueueId;
207 use std::sync::Arc;
208
209 #[derive(Debug)]
210 pub struct Timer {
211 pub(super) inner: Arc<LocalInner>,
212 pub(super) id: TimerQueueId,
213 }
214}
215
216impl TimerDriver for LocalDriver {
217 type Timer = Timer;
218
219 fn new_timer(&self) -> Self::Timer {
220 let id = self.inner.lock_state().timers.add();
221 Timer {
222 inner: self.inner.clone(),
223 id,
224 }
225 }
226}
227
228impl Drop for Timer {
229 fn drop(&mut self) {
230 let _waker = self.inner.lock_state().timers.remove(self.id);
231 }
232}
233
234impl PollTimer for Timer {
235 fn poll_timer(&mut self, cx: &mut Context<'_>, deadline: Option<Instant>) -> Poll<Instant> {
236 let mut state = self.inner.lock_state();
237 if let Some(deadline) = deadline {
238 state.timers.set_deadline(self.id, deadline);
239 }
240 match state.timers.poll_deadline(cx, self.id) {
241 TimerResult::TimedOut(now) => Poll::Ready(now),
242 TimerResult::Pending(_) => Poll::Pending,
243 }
244 }
245
246 fn set_deadline(&mut self, deadline: Instant) {
247 self.inner
248 .lock_state()
249 .timers
250 .set_deadline(self.id, deadline);
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::block_with_io;
257 use crate::executor_tests;
258
259 #[test]
260 fn waker_works() {
261 block_with_io(|_| executor_tests::waker_tests())
262 }
263
264 #[test]
265 fn sleep_works() {
266 block_with_io(executor_tests::sleep_tests)
267 }
268
269 #[test]
270 fn wait_works() {
271 block_with_io(executor_tests::wait_tests)
272 }
273
274 #[test]
275 fn socket_works() {
276 block_with_io(executor_tests::socket_tests)
277 }
278
279 #[cfg(windows)]
280 #[test]
281 fn overlapped_file_works() {
282 block_with_io(executor_tests::windows::overlapped_file_tests)
283 }
284}