1use futures::Stream;
7use parking_lot::Mutex;
8use std::future::Future;
9use std::future::poll_fn;
10use std::pin::Pin;
11use std::task::Context;
12use std::task::Poll;
13use std::task::Waker;
14
15#[derive(Debug, Default)]
17pub struct SlimEvent {
18 state: Mutex<State>,
19}
20
21#[derive(Debug, Clone, Default)]
22struct State {
23 signaled: bool,
24 waker: Option<Waker>,
25}
26
27impl SlimEvent {
28 pub fn new() -> Self {
30 Self::default()
31 }
32
33 pub fn signal(&self) {
35 let waker = {
36 let mut state = self.state.lock();
37 state.signaled = true;
38 state.waker.take()
39 };
40 if let Some(waker) = waker {
41 waker.wake();
42 }
43 }
44
45 pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<()> {
47 let _dead_waker;
48 let mut state = self.state.lock();
49 if state.signaled {
50 state.signaled = false;
51 _dead_waker = state.waker.take();
52 Poll::Ready(())
53 } else {
54 if !state
55 .waker
56 .as_ref()
57 .map(|w| cx.waker().will_wake(w))
58 .unwrap_or(false)
59 {
60 _dead_waker = state.waker.replace(cx.waker().clone());
61 }
62 Poll::Pending
63 }
64 }
65
66 pub fn wait(&self) -> impl '_ + Unpin + Future<Output = ()> {
68 poll_fn(move |cx| self.poll_wait(cx))
69 }
70
71 pub fn as_stream(&self) -> SlimEventStream<'_> {
73 SlimEventStream { wait: self }
74 }
75}
76
77pub struct SlimEventStream<'a> {
79 wait: &'a SlimEvent,
80}
81
82impl Stream for SlimEventStream<'_> {
83 type Item = ();
84
85 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86 std::task::ready!(self.wait.poll_wait(cx));
87 Poll::Ready(Some(()))
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use crate::slim_event::SlimEvent;
94 use futures::FutureExt;
95 use futures::executor::block_on;
96
97 #[test]
98 fn test() {
99 block_on(async {
100 let e = SlimEvent::new();
101 assert!(e.wait().now_or_never().is_none());
102 e.signal();
103 e.wait().await;
104 })
105 }
106}