1#![forbid(unsafe_code)]
7
8use futures::Stream;
9use parking_lot::Mutex;
10use std::future::Future;
11use std::future::poll_fn;
12use std::pin::Pin;
13use std::task::Context;
14use std::task::Poll;
15use std::task::Waker;
16
17#[derive(Debug, Default)]
19pub struct SlimEvent {
20 state: Mutex<State>,
21}
22
23#[derive(Debug, Clone, Default)]
24struct State {
25 signaled: bool,
26 waker: Option<Waker>,
27}
28
29impl SlimEvent {
30 pub fn new() -> Self {
32 Self::default()
33 }
34
35 pub fn signal(&self) {
37 let waker = {
38 let mut state = self.state.lock();
39 state.signaled = true;
40 state.waker.take()
41 };
42 if let Some(waker) = waker {
43 waker.wake();
44 }
45 }
46
47 pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<()> {
49 let _dead_waker;
50 let mut state = self.state.lock();
51 if state.signaled {
52 state.signaled = false;
53 _dead_waker = state.waker.take();
54 Poll::Ready(())
55 } else {
56 if !state
57 .waker
58 .as_ref()
59 .map(|w| cx.waker().will_wake(w))
60 .unwrap_or(false)
61 {
62 _dead_waker = state.waker.replace(cx.waker().clone());
63 }
64 Poll::Pending
65 }
66 }
67
68 pub fn wait(&self) -> impl '_ + Unpin + Future<Output = ()> {
70 poll_fn(move |cx| self.poll_wait(cx))
71 }
72
73 pub fn as_stream(&self) -> SlimEventStream<'_> {
75 SlimEventStream { wait: self }
76 }
77}
78
79pub struct SlimEventStream<'a> {
81 wait: &'a SlimEvent,
82}
83
84impl Stream for SlimEventStream<'_> {
85 type Item = ();
86
87 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88 std::task::ready!(self.wait.poll_wait(cx));
89 Poll::Ready(Some(()))
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use crate::slim_event::SlimEvent;
96 use futures::FutureExt;
97 use futures::executor::block_on;
98
99 #[test]
100 fn test() {
101 block_on(async {
102 let e = SlimEvent::new();
103 assert!(e.wait().now_or_never().is_none());
104 e.signal();
105 e.wait().await;
106 })
107 }
108}