1use crate::InputSource;
7use mesh::MeshPayload;
8use mesh::message::MeshField;
9use std::pin::Pin;
10
11#[derive(MeshPayload)]
13#[mesh(bound = "T: 'static + MeshField + Send")]
14pub struct MeshInputSource<T> {
15 recv: mesh::Receiver<T>,
16 active: mesh::CellUpdater<bool>,
17}
18
19impl<T: 'static + Send> InputSource<T> for MeshInputSource<T> {
20 fn set_active(&mut self, active: bool) -> Pin<Box<dyn '_ + Future<Output = ()> + Send>> {
21 Box::pin(async move {
22 if *self.active.get() != active {
23 self.active.set(active).await;
24 }
25 })
26 }
27}
28
29impl<T: 'static + Send> futures::Stream for MeshInputSource<T> {
30 type Item = T;
31
32 fn poll_next(
33 mut self: Pin<&mut Self>,
34 cx: &mut std::task::Context<'_>,
35 ) -> std::task::Poll<Option<Self::Item>> {
36 Pin::new(&mut self.recv).poll_next(cx)
37 }
38}
39
40#[derive(MeshPayload)]
42#[mesh(bound = "T: 'static + MeshField + Send")]
43pub struct MeshInputSink<T> {
44 send: mesh::Sender<T>,
45 active: mesh::Cell<bool>,
46}
47
48impl<T: 'static + Send> MeshInputSink<T> {
49 pub fn send(&mut self, input: T) {
51 self.send.send(input);
52 }
53
54 pub fn is_active(&self) -> bool {
56 self.active.get()
57 }
58}
59
60pub fn input_pair<T: 'static + Send>() -> (MeshInputSource<T>, MeshInputSink<T>) {
62 let (send, recv) = mesh::channel();
63 let (update, active) = mesh::cell(false);
64 (
65 MeshInputSource {
66 recv,
67 active: update,
68 },
69 MeshInputSink { send, active },
70 )
71}