1use crate::InputSource;
7use mesh::MeshPayload;
8use mesh::message::MeshField;
9use std::pin::Pin;
10
11#[derive(MeshPayload)]
13#[mesh(bound = "T: 'static + MeshField + Send")]
14pub struct MeshInputSource<T> {
15 recv: mesh::Receiver<T>,
16 active: mesh::CellUpdater<bool>,
17}
18
19impl<T: 'static + Send> InputSource<T> for MeshInputSource<T> {
20 fn set_active(
21 &mut self,
22 active: bool,
23 ) -> Pin<Box<dyn '_ + std::future::Future<Output = ()> + Send>> {
24 Box::pin(async move {
25 if *self.active.get() != active {
26 self.active.set(active).await;
27 }
28 })
29 }
30}
31
32impl<T: 'static + Send> futures::Stream for MeshInputSource<T> {
33 type Item = T;
34
35 fn poll_next(
36 mut self: Pin<&mut Self>,
37 cx: &mut std::task::Context<'_>,
38 ) -> std::task::Poll<Option<Self::Item>> {
39 Pin::new(&mut self.recv).poll_next(cx)
40 }
41}
42
43#[derive(MeshPayload)]
45#[mesh(bound = "T: 'static + MeshField + Send")]
46pub struct MeshInputSink<T> {
47 send: mesh::Sender<T>,
48 active: mesh::Cell<bool>,
49}
50
51impl<T: 'static + Send> MeshInputSink<T> {
52 pub fn send(&mut self, input: T) {
54 self.send.send(input);
55 }
56
57 pub fn is_active(&self) -> bool {
59 self.active.get()
60 }
61}
62
63pub fn input_pair<T: 'static + Send>() -> (MeshInputSource<T>, MeshInputSink<T>) {
65 let (send, recv) = mesh::channel();
66 let (update, active) = mesh::cell(false);
67 (
68 MeshInputSource {
69 recv,
70 active: update,
71 },
72 MeshInputSink { send, active },
73 )
74}