vmcore/
slim_event.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Event support.
5
6use futures::Stream;
7use parking_lot::Mutex;
8use std::future::Future;
9use std::future::poll_fn;
10use std::pin::Pin;
11use std::task::Context;
12use std::task::Poll;
13use std::task::Waker;
14
15/// An event for signaling a task, without requiring an OS event.
16#[derive(Debug, Default)]
17pub struct SlimEvent {
18    state: Mutex<State>,
19}
20
21#[derive(Debug, Clone, Default)]
22struct State {
23    signaled: bool,
24    waker: Option<Waker>,
25}
26
27impl SlimEvent {
28    /// Returns a new event.
29    pub fn new() -> Self {
30        Self::default()
31    }
32
33    /// Signals the event.
34    pub fn signal(&self) {
35        let waker = {
36            let mut state = self.state.lock();
37            state.signaled = true;
38            state.waker.take()
39        };
40        if let Some(waker) = waker {
41            waker.wake();
42        }
43    }
44
45    /// Polls the event.
46    pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<()> {
47        let _dead_waker;
48        let mut state = self.state.lock();
49        if state.signaled {
50            state.signaled = false;
51            _dead_waker = state.waker.take();
52            Poll::Ready(())
53        } else {
54            if !state
55                .waker
56                .as_ref()
57                .map(|w| cx.waker().will_wake(w))
58                .unwrap_or(false)
59            {
60                _dead_waker = state.waker.replace(cx.waker().clone());
61            }
62            Poll::Pending
63        }
64    }
65
66    /// Waits for the event to be signaled.
67    pub fn wait(&self) -> impl '_ + Unpin + Future<Output = ()> {
68        poll_fn(move |cx| self.poll_wait(cx))
69    }
70
71    /// Returns a stream, with an entry for each received signal.
72    pub fn as_stream(&self) -> SlimEventStream<'_> {
73        SlimEventStream { wait: self }
74    }
75}
76
77/// A stream of signals.
78pub struct SlimEventStream<'a> {
79    wait: &'a SlimEvent,
80}
81
82impl Stream for SlimEventStream<'_> {
83    type Item = ();
84
85    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86        std::task::ready!(self.wait.poll_wait(cx));
87        Poll::Ready(Some(()))
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use crate::slim_event::SlimEvent;
94    use futures::FutureExt;
95    use futures::executor::block_on;
96
97    #[test]
98    fn test() {
99        block_on(async {
100            let e = SlimEvent::new();
101            assert!(e.wait().now_or_never().is_none());
102            e.signal();
103            e.wait().await;
104        })
105    }
106}