Skip to main content

mesh_channel/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Higher-level channel types and utilities built on top of `mesh_channel_core`.
5//!
6//! This crate provides the ergonomic channel abstractions that most mesh users
7//! interact with:
8//!
9//! - **[`channel()`] / [`oneshot()`]** — re-exported from `mesh_channel_core`
10//!   for basic message passing.
11//! - **[`rpc`]** — `Rpc<I, R>` and `FailableRpc<I, R>` for request/response
12//!   patterns where the caller awaits a reply.
13//! - **[`cell`]** — `Cell<T>` / `CellUpdater<T>` for reactive push-based
14//!   configuration updates.
15//! - **[`cancel`]** — `CancelContext` for cooperative cancellation with
16//!   optional deadlines, transferable across processes.
17//! - **[`pipe`]** — `ReadPipe` / `WritePipe` implementing `AsyncRead` /
18//!   `AsyncWrite` over mesh with backpressure.
19//! - **[`error`]** — `RemoteError` and `RemoteResult` for error propagation
20//!   across process boundaries.
21//!
22//! Most code should use the `mesh` facade crate rather than depending on this
23//! crate directly.
24
25#![expect(missing_docs)]
26
27mod bidir;
28pub mod cancel;
29pub mod cell;
30mod deadline;
31pub mod error;
32mod lazy;
33pub mod pipe;
34pub mod rpc;
35
36pub use mesh_channel_core::ChannelError;
37pub use mesh_channel_core::ChannelErrorKind;
38pub use mesh_channel_core::OneshotReceiver;
39pub use mesh_channel_core::OneshotSender;
40pub use mesh_channel_core::Receiver;
41pub use mesh_channel_core::Receiver as MpscReceiver;
42pub use mesh_channel_core::RecvError;
43pub use mesh_channel_core::Sender;
44pub use mesh_channel_core::Sender as MpscSender;
45pub use mesh_channel_core::TryRecvError;
46pub use mesh_channel_core::channel;
47pub use mesh_channel_core::channel as mpsc_channel;
48pub use mesh_channel_core::oneshot;
49
50#[cfg(test)]
51mod tests {
52    use super::*;
53    use mesh_node::message::MeshPayload;
54    use mesh_protobuf::SerializedMessage;
55    use pal_async::async_test;
56    use pal_event::Event;
57    use test_with_tracing::test;
58
59    #[test]
60    fn test() {
61        let (send, mut recv) = channel::<(String, String)>();
62        send.send(("abc".to_string(), "def".to_string()));
63        assert_eq!(
64            recv.try_recv().unwrap(),
65            ("abc".to_string(), "def".to_string())
66        );
67    }
68
69    #[test]
70    fn test_send_port() {
71        let (send, mut recv) = channel::<Receiver<u32>>();
72        let (sendi, recvi) = channel::<u32>();
73        send.send(recvi);
74        let mut recvi = recv.try_recv().unwrap();
75        sendi.send(0xf00d);
76        assert_eq!(recvi.try_recv().unwrap(), 0xf00d);
77    }
78
79    #[test]
80    fn test_send_resource() {
81        let (send, mut recv) = channel::<Event>();
82        let event = Event::new();
83        send.send(event.clone());
84        let event2 = recv.try_recv().unwrap();
85        event2.signal();
86        event.wait();
87    }
88
89    #[async_test]
90    async fn test_mpsc() {
91        let (send, mut recv) = mpsc_channel::<u32>();
92        send.send(5);
93        roundtrip((send.clone(),)).0.send(6);
94        drop(send);
95        let a = recv.recv().await.unwrap();
96        let b = recv.recv().await.unwrap();
97        assert!(matches!(recv.recv().await.unwrap_err(), RecvError::Closed));
98        let mut s = [a, b];
99        s.sort_unstable();
100        assert_eq!(&s, &[5, 6]);
101    }
102
103    #[async_test]
104    async fn test_mpsc_again() {
105        let (send, recv) = mpsc_channel::<u32>();
106        drop(recv);
107        send.send(5);
108    }
109
110    /// Serializes and deserializes a mesh message. Used to force an MpscSender
111    /// to clone its underlying port.
112    fn roundtrip<T: MeshPayload>(t: T) -> T {
113        SerializedMessage::from_message(t).into_message().unwrap()
114    }
115}