use futures::Stream;
use parking_lot::Mutex;
use std::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
#[derive(Debug, Default)]
pub struct SlimEvent {
state: Mutex<State>,
}
#[derive(Debug, Clone, Default)]
struct State {
signaled: bool,
waker: Option<Waker>,
}
impl SlimEvent {
pub fn new() -> Self {
Self::default()
}
pub fn signal(&self) {
let waker = {
let mut state = self.state.lock();
state.signaled = true;
state.waker.take()
};
if let Some(waker) = waker {
waker.wake();
}
}
pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<()> {
let _dead_waker;
let mut state = self.state.lock();
if state.signaled {
state.signaled = false;
_dead_waker = state.waker.take();
Poll::Ready(())
} else {
if !state
.waker
.as_ref()
.map(|w| cx.waker().will_wake(w))
.unwrap_or(false)
{
_dead_waker = state.waker.replace(cx.waker().clone());
}
Poll::Pending
}
}
pub fn wait(&self) -> impl '_ + Unpin + Future<Output = ()> {
poll_fn(move |cx| self.poll_wait(cx))
}
pub fn as_stream(&self) -> SlimEventStream<'_> {
SlimEventStream { wait: self }
}
}
pub struct SlimEventStream<'a> {
wait: &'a SlimEvent,
}
impl Stream for SlimEventStream<'_> {
type Item = ();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
std::task::ready!(self.wait.poll_wait(cx));
Poll::Ready(Some(()))
}
}
#[cfg(test)]
mod tests {
use crate::slim_event::SlimEvent;
use futures::executor::block_on;
use futures::FutureExt;
#[test]
fn test() {
block_on(async {
let e = SlimEvent::new();
assert!(e.wait().now_or_never().is_none());
e.signal();
e.wait().await;
})
}
}