mesh_channel/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4#![expect(missing_docs)]
5
6mod bidir;
7pub mod cancel;
8pub mod cell;
9mod deadline;
10pub mod error;
11mod lazy;
12pub mod pipe;
13pub mod rpc;
14
15pub use mesh_channel_core::ChannelError;
16pub use mesh_channel_core::ChannelErrorKind;
17pub use mesh_channel_core::OneshotReceiver;
18pub use mesh_channel_core::OneshotSender;
19pub use mesh_channel_core::Receiver;
20pub use mesh_channel_core::Receiver as MpscReceiver;
21pub use mesh_channel_core::RecvError;
22pub use mesh_channel_core::Sender;
23pub use mesh_channel_core::Sender as MpscSender;
24pub use mesh_channel_core::TryRecvError;
25pub use mesh_channel_core::channel;
26pub use mesh_channel_core::channel as mpsc_channel;
27pub use mesh_channel_core::oneshot;
28
29#[cfg(test)]
30mod tests {
31    use super::*;
32    use mesh_node::message::MeshPayload;
33    use mesh_protobuf::SerializedMessage;
34    use pal_async::async_test;
35    use pal_event::Event;
36    use test_with_tracing::test;
37
38    #[test]
39    fn test() {
40        let (send, mut recv) = channel::<(String, String)>();
41        send.send(("abc".to_string(), "def".to_string()));
42        assert_eq!(
43            recv.try_recv().unwrap(),
44            ("abc".to_string(), "def".to_string())
45        );
46    }
47
48    #[test]
49    fn test_send_port() {
50        let (send, mut recv) = channel::<Receiver<u32>>();
51        let (sendi, recvi) = channel::<u32>();
52        send.send(recvi);
53        let mut recvi = recv.try_recv().unwrap();
54        sendi.send(0xf00d);
55        assert_eq!(recvi.try_recv().unwrap(), 0xf00d);
56    }
57
58    #[test]
59    fn test_send_resource() {
60        let (send, mut recv) = channel::<Event>();
61        let event = Event::new();
62        send.send(event.clone());
63        let event2 = recv.try_recv().unwrap();
64        event2.signal();
65        event.wait();
66    }
67
68    #[async_test]
69    async fn test_mpsc() {
70        let (send, mut recv) = mpsc_channel::<u32>();
71        send.send(5);
72        roundtrip((send.clone(),)).0.send(6);
73        drop(send);
74        let a = recv.recv().await.unwrap();
75        let b = recv.recv().await.unwrap();
76        assert!(matches!(recv.recv().await.unwrap_err(), RecvError::Closed));
77        let mut s = [a, b];
78        s.sort_unstable();
79        assert_eq!(&s, &[5, 6]);
80    }
81
82    #[async_test]
83    async fn test_mpsc_again() {
84        let (send, recv) = mpsc_channel::<u32>();
85        drop(recv);
86        send.send(5);
87    }
88
89    /// Serializes and deserializes a mesh message. Used to force an MpscSender
90    /// to clone its underlying port.
91    fn roundtrip<T: MeshPayload>(t: T) -> T {
92        SerializedMessage::from_message(t).into_message().unwrap()
93    }
94}