vmcore/
slim_event.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Event support.
5
6#![forbid(unsafe_code)]
7
8use futures::Stream;
9use parking_lot::Mutex;
10use std::future::Future;
11use std::future::poll_fn;
12use std::pin::Pin;
13use std::task::Context;
14use std::task::Poll;
15use std::task::Waker;
16
17/// An event for signaling a task, without requiring an OS event.
18#[derive(Debug, Default)]
19pub struct SlimEvent {
20    state: Mutex<State>,
21}
22
23#[derive(Debug, Clone, Default)]
24struct State {
25    signaled: bool,
26    waker: Option<Waker>,
27}
28
29impl SlimEvent {
30    /// Returns a new event.
31    pub fn new() -> Self {
32        Self::default()
33    }
34
35    /// Signals the event.
36    pub fn signal(&self) {
37        let waker = {
38            let mut state = self.state.lock();
39            state.signaled = true;
40            state.waker.take()
41        };
42        if let Some(waker) = waker {
43            waker.wake();
44        }
45    }
46
47    /// Polls the event.
48    pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<()> {
49        let _dead_waker;
50        let mut state = self.state.lock();
51        if state.signaled {
52            state.signaled = false;
53            _dead_waker = state.waker.take();
54            Poll::Ready(())
55        } else {
56            if !state
57                .waker
58                .as_ref()
59                .map(|w| cx.waker().will_wake(w))
60                .unwrap_or(false)
61            {
62                _dead_waker = state.waker.replace(cx.waker().clone());
63            }
64            Poll::Pending
65        }
66    }
67
68    /// Waits for the event to be signaled.
69    pub fn wait(&self) -> impl '_ + Unpin + Future<Output = ()> {
70        poll_fn(move |cx| self.poll_wait(cx))
71    }
72
73    /// Returns a stream, with an entry for each received signal.
74    pub fn as_stream(&self) -> SlimEventStream<'_> {
75        SlimEventStream { wait: self }
76    }
77}
78
79/// A stream of signals.
80pub struct SlimEventStream<'a> {
81    wait: &'a SlimEvent,
82}
83
84impl Stream for SlimEventStream<'_> {
85    type Item = ();
86
87    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88        std::task::ready!(self.wait.poll_wait(cx));
89        Poll::Ready(Some(()))
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use crate::slim_event::SlimEvent;
96    use futures::FutureExt;
97    use futures::executor::block_on;
98
99    #[test]
100    fn test() {
101        block_on(async {
102            let e = SlimEvent::new();
103            assert!(e.wait().now_or_never().is_none());
104            e.signal();
105            e.wait().await;
106        })
107    }
108}