1#![expect(unsafe_code)]
21
22use anyhow::Context;
23use base64::Engine;
24use debug_ptr::DebugPtr;
25use futures::FutureExt;
26use futures::Stream;
27use futures::StreamExt;
28use futures::executor::block_on;
29use futures_concurrency::future::Race;
30use inspect::Inspect;
31use inspect::SensitivityLevel;
32use mesh::MeshPayload;
33use mesh::OneshotReceiver;
34use mesh::local_node::Port;
35use mesh::message::MeshField;
36use mesh::payload::Protobuf;
37use mesh::rpc::FailableRpc;
38use mesh::rpc::RpcSend;
39#[cfg(unix)]
40use mesh_remote::InvitationAddress;
41#[cfg(unix)]
42use pal::unix::process::Builder as ProcessBuilder;
43#[cfg(windows)]
44use pal::windows::process;
45#[cfg(windows)]
46use pal::windows::process::Builder as ProcessBuilder;
47#[cfg(unix)]
48use pal_async::DefaultPool;
49use pal_async::task::Spawn;
50use pal_async::task::Task;
51use slab::Slab;
52use std::borrow::Cow;
53use std::ffi::OsString;
54use std::fs::File;
55#[cfg(unix)]
56use std::os::unix::prelude::*;
57#[cfg(windows)]
58use std::os::windows::prelude::*;
59use std::path::Path;
60use std::path::PathBuf;
61use std::pin::Pin;
62use tracing::Instrument;
63use tracing::instrument;
64use unicycle::FuturesUnordered;
65
66#[cfg(windows)]
67mod plat {
68 pub type IpcNode = mesh_remote::windows::AlpcNode;
69 pub type IpcNodeDriver = pal_async::windows::TpPool;
70}
71
72#[cfg(unix)]
73mod plat {
74 pub type IpcNode = mesh_remote::unix::UnixNode;
75 pub type IpcNodeDriver = pal_async::DefaultDriver;
76}
77
78use plat::IpcNode;
79use plat::IpcNodeDriver;
80
81#[cfg(unix)]
82const IPC_FD: i32 = 3;
83
84const INVITATION_ENV_NAME: &str = "MESH_WORKER_INVITATION";
89
90#[derive(Protobuf)]
91struct Invitation {
92 node_name: String,
93 #[cfg(windows)]
94 credentials: mesh_remote::windows::AlpcInvitationCredentials,
95 #[cfg(unix)]
96 address: InvitationAddress,
97 #[cfg(windows)]
98 directory_handle: usize,
99 #[cfg(unix)]
100 socket_fd: i32,
101}
102
103static PROCESS_NAME: DebugPtr<String> = DebugPtr::new();
104
105pub fn try_run_mesh_host<U, F, T>(base_name: &str, f: F) -> anyhow::Result<()>
114where
115 U: 'static + MeshPayload + Send,
116 F: AsyncFnOnce(U) -> anyhow::Result<T>,
117{
118 block_on(async {
119 if let Some(r) = node_from_environment().await? {
120 let NodeResult {
121 node_name,
122 node,
123 initial_port,
124 } = r;
125 PROCESS_NAME.store(&node_name);
126 set_program_name(&format!("{base_name}-{node_name}"));
127 let init = OneshotReceiver::<InitialMessage<U>>::from(initial_port)
128 .await
129 .context("failed to receive initial message")?;
130 let _drop = (
131 f(init.init_message).map(Some),
132 handle_host_requests(init.requests).map(|()| None),
133 )
134 .race()
135 .await
136 .transpose()?;
137
138 tracing::debug!("waiting to shut down node");
139 node.shutdown().await;
140 drop(_drop);
141 std::process::exit(0);
142 }
143 Ok(())
144 })
145}
146
147async fn handle_host_requests(mut recv: mesh::Receiver<HostRequest>) {
148 while let Some(req) = recv.next().await {
149 match req {
150 HostRequest::Inspect(deferred) => {
151 deferred.respond(inspect_host);
152 }
153 HostRequest::Crash => panic!("explicit panic request"),
154 }
155 }
156}
157
158fn set_program_name(name: &str) {
159 let _ = name;
160 #[cfg(target_os = "linux")]
161 {
162 let _ = std::fs::write("/proc/self/comm", name);
163 }
164}
165
166struct NodeResult {
167 node_name: String,
168 node: IpcNode,
169 initial_port: Port,
170}
171
172async fn node_from_environment() -> anyhow::Result<Option<NodeResult>> {
176 let invitation_str = match std::env::var(INVITATION_ENV_NAME) {
178 Ok(str) => str,
179 Err(_) => return Ok(None),
180 };
181
182 unsafe {
194 std::env::remove_var(INVITATION_ENV_NAME);
195 }
196
197 let invitation: Invitation = mesh::payload::decode(
198 &base64::engine::general_purpose::STANDARD
199 .decode(invitation_str)
200 .context("failed to base64 decode invitation")?,
201 )
202 .context("failed to protobuf decode invitation")?;
203
204 let (left, right) = Port::new_pair();
205
206 let node;
207 #[cfg(windows)]
208 {
209 let directory =
213 unsafe { OwnedHandle::from_raw_handle(invitation.directory_handle as RawHandle) };
214
215 let invitation =
216 mesh_remote::windows::AlpcInvitation::new(invitation.credentials, directory);
217
218 node = mesh_remote::windows::AlpcNode::join(
220 pal_async::windows::TpPool::system(),
221 invitation,
222 left,
223 )
224 .context("failed to join mesh")?;
225 }
226
227 #[cfg(unix)]
228 {
229 let fd = unsafe { OwnedFd::from_raw_fd(invitation.socket_fd) };
233 let invitation = mesh_remote::unix::UnixInvitation {
234 address: invitation.address,
235 fd,
236 };
237
238 let (_, driver) = DefaultPool::spawn_on_thread("mesh-worker-pool");
240 node = mesh_remote::unix::UnixNode::join(driver, invitation, left)
241 .await
242 .context("failed to join mesh")?;
243 }
244
245 Ok(Some(NodeResult {
246 node_name: invitation.node_name,
247 node,
248 initial_port: right,
249 }))
250}
251
252#[derive(Inspect)]
273pub struct Mesh {
274 #[inspect(rename = "name")]
275 mesh_name: String,
276 #[inspect(flatten, send = "MeshRequest::Inspect")]
277 request: mesh::Sender<MeshRequest>,
278 #[inspect(skip)]
279 task: Task<()>,
280}
281
282pub trait SandboxProfile: Send {
284 fn apply(&mut self, builder: &mut ProcessBuilder<'_>);
288
289 fn finalize(&mut self) -> anyhow::Result<()> {
295 Ok(())
296 }
297}
298
299pub struct ProcessConfig {
301 name: String,
302 process_name: Option<PathBuf>,
303 process_args: Vec<OsString>,
304 stderr: Option<File>,
305 skip_worker_arg: bool,
306 sandbox_profile: Option<Box<dyn SandboxProfile + Sync>>,
307 env_vars: Vec<(OsString, OsString)>,
308}
309
310impl ProcessConfig {
311 pub fn new(name: impl Into<String>) -> Self {
314 Self {
315 name: name.into(),
316 process_name: None,
317 process_args: Vec::new(),
318 stderr: None,
319 skip_worker_arg: false,
320 sandbox_profile: None,
321 env_vars: Vec::new(),
322 }
323 }
324
325 pub fn new_with_sandbox(
328 name: impl Into<String>,
329 sandbox_profile: Box<dyn SandboxProfile + Sync>,
330 ) -> Self {
331 Self {
332 name: name.into(),
333 process_name: None,
334 process_args: Vec::new(),
335 stderr: None,
336 skip_worker_arg: false,
337 sandbox_profile: Some(sandbox_profile),
338 env_vars: Vec::new(),
339 }
340 }
341
342 pub fn process_name(mut self, name: impl Into<PathBuf>) -> Self {
344 self.process_name = Some(name.into());
345 self
346 }
347
348 pub fn skip_worker_arg(mut self, skip: bool) -> Self {
355 self.skip_worker_arg = skip;
356 self
357 }
358
359 pub fn args<I>(mut self, args: I) -> Self
361 where
362 I: IntoIterator,
363 I::Item: Into<OsString>,
364 {
365 self.process_args.extend(args.into_iter().map(|x| x.into()));
366 self
367 }
368
369 pub fn env<I>(mut self, env_vars: I) -> Self
371 where
372 I: IntoIterator,
373 I::Item: Into<(OsString, OsString)>,
374 {
375 self.env_vars.extend(env_vars.into_iter().map(|x| x.into()));
376 self
377 }
378
379 pub fn stderr(mut self, file: Option<File>) -> Self {
381 self.stderr = file;
382 self
383 }
384}
385
386struct MeshInner {
387 requests: mesh::Receiver<MeshRequest>,
388 hosts: Slab<MeshHostInner>,
389 waiters: FuturesUnordered<Task<usize>>,
391 node: IpcNode,
393 node_driver: IpcNodeDriver,
397 mesh_name: String,
399 #[cfg(windows)]
402 job: pal::windows::job::Job,
403}
404
405struct MeshHostInner {
406 name: String,
407 pid: i32,
408 node_id: mesh::NodeId,
409 send: mesh::Sender<HostRequest>,
410}
411
412enum MeshRequest {
413 NewHost(FailableRpc<NewHostParams, i32>),
414 Listen(FailableRpc<ListenParams, Task<()>>),
415 Inspect(inspect::Deferred),
416 Crash(i32),
417}
418
419struct ListenParams {
420 path: PathBuf,
421 port_factory: Box<dyn Fn() -> Port + Send>,
422}
423
424struct NewHostParams {
425 config: ProcessConfig,
426 recv: Port,
427 request_send: mesh::Sender<HostRequest>,
428}
429
430impl Mesh {
431 pub fn new(mesh_name: String) -> anyhow::Result<Self> {
433 #[cfg(windows)]
434 let job = {
435 let job = pal::windows::job::Job::new().context("failed to create job object")?;
436 job.set_terminate_on_close()
437 .context("failed to set job object terminate on close")?;
438 job
439 };
440
441 #[cfg(windows)]
442 let (node, node_driver) = {
443 let driver = pal_async::windows::TpPool::system();
444 let node = mesh_remote::windows::AlpcNode::new_named(driver.clone())
448 .context("AlpcNode creation failure")?;
449 (node, driver)
450 };
451 #[cfg(unix)]
452 let (node, node_driver) = {
453 let (_, driver) = DefaultPool::spawn_on_thread("mesh-worker-pool");
455 let node = mesh_remote::unix::UnixNode::new(driver.clone());
456 (node, driver)
457 };
458
459 let (request, requests) = mesh::channel();
460
461 let mut inner = MeshInner {
462 requests,
463 hosts: Default::default(),
464 waiters: Default::default(),
465 node,
466 node_driver: node_driver.clone(),
467 mesh_name: mesh_name.clone(),
468 #[cfg(windows)]
469 job,
470 };
471
472 let task = node_driver.spawn(
473 format!("mesh-{}", &mesh_name),
474 async move { inner.run().await },
475 );
476
477 Ok(Self {
478 request,
479 mesh_name,
480 task,
481 })
482 }
483
484 pub async fn launch_host<T: 'static + MeshField + Send>(
492 &self,
493 config: ProcessConfig,
494 initial_message: T,
495 ) -> anyhow::Result<i32> {
496 let (request_send, request_recv) = mesh::channel();
497
498 let (init_send, init_recv) = mesh::oneshot::<InitialMessage<T>>();
499 init_send.send(InitialMessage {
500 requests: request_recv,
501 init_message: initial_message,
502 });
503
504 self.request
505 .call_failable(
506 MeshRequest::NewHost,
507 NewHostParams {
508 config,
509 recv: init_recv.into(),
510 request_send,
511 },
512 )
513 .await
514 .context("failed to launch new host")
515 }
516
517 pub async fn shutdown(self) {
521 let span = tracing::span!(
522 tracing::Level::INFO,
523 "mesh_shutdown",
524 name = self.mesh_name.as_str(),
525 );
526
527 async {
528 drop(self.request);
529 self.task.await;
530 }
531 .instrument(span)
532 .await;
533 }
534
535 pub fn crash(&self, pid: i32) {
537 self.request.send(MeshRequest::Crash(pid));
538 }
539
540 pub async fn listen<T: 'static + MeshField + Send>(
555 &self,
556 path: &Path,
557 ) -> anyhow::Result<Listener<T>> {
558 let (send, recv) = mesh::channel::<T>();
559 let port_factory: Box<dyn Fn() -> Port + Send> = Box::new(move || send.clone().into());
560
561 let task = self
562 .request
563 .call_failable(
564 MeshRequest::Listen,
565 ListenParams {
566 path: path.to_owned(),
567 port_factory,
568 },
569 )
570 .await
571 .context("listen failed")?;
572
573 Ok(Listener { recv, _task: task })
574 }
575}
576
577pub struct Listener<T> {
585 recv: mesh::Receiver<T>,
586 _task: Task<()>,
587}
588
589impl<T: 'static + MeshField + Send> Stream for Listener<T> {
590 type Item = T;
591
592 fn poll_next(
593 mut self: Pin<&mut Self>,
594 cx: &mut std::task::Context<'_>,
595 ) -> std::task::Poll<Option<Self::Item>> {
596 self.recv.poll_next_unpin(cx)
597 }
598}
599
600pub async fn connect<T: 'static + MeshField + Send>(
606 driver: impl pal_async::driver::Driver + Spawn + Clone,
607 path: &Path,
608) -> anyhow::Result<(mesh::Sender<T>, Connection)> {
609 let (send, recv) = mesh::channel::<T>();
610
611 #[cfg(windows)]
612 let node = mesh_remote::windows::AlpcNode::join_by_socket(driver, path, recv.into())
613 .await
614 .context("failed to connect to mesh listener")?;
615
616 #[cfg(unix)]
617 let node = mesh_remote::unix::UnixNode::join_by_path(driver, path, recv.into())
618 .await
619 .context("failed to connect to mesh listener")?;
620
621 Ok((send, Connection { node }))
622}
623
624pub struct Connection {
628 node: IpcNode,
629}
630
631impl Connection {
632 pub async fn shutdown(self) {
635 self.node.shutdown().await;
636 }
637}
638
639#[derive(MeshPayload)]
640struct InitialMessage<T> {
641 requests: mesh::Receiver<HostRequest>,
642 init_message: T,
643}
644
645#[derive(Debug, MeshPayload)]
646enum HostRequest {
647 #[mesh(transparent)]
648 Inspect(inspect::Deferred),
649 Crash,
650}
651
652fn inspect_host(resp: &mut inspect::Response<'_>) {
653 resp.field("tasks", inspect_task::inspect_task_list());
654}
655
656#[derive(Inspect)]
657struct HostInspect<'a> {
658 #[inspect(safe)]
659 name: &'a str,
660 #[inspect(debug, safe)]
661 node_id: mesh::NodeId,
662 #[cfg(target_os = "linux")]
663 #[inspect(safe)]
664 rlimit: inspect_rlimit::InspectRlimit,
665}
666
667impl MeshInner {
668 async fn run(&mut self) {
669 enum Event {
670 Request(MeshRequest),
671 Done(usize),
672 }
673
674 loop {
675 let event = futures::select! { request = self.requests.select_next_some() => Event::Request(request),
677 n = self.waiters.select_next_some() => Event::Done(n),
678 complete => break,
679 };
680
681 match event {
682 Event::Request(request) => match request {
683 MeshRequest::NewHost(rpc) => {
684 rpc.handle_failable(async |params| self.spawn_process(params).await)
685 .await
686 }
687 MeshRequest::Listen(rpc) => {
688 rpc.handle_failable(async |params| self.start_listener(params))
689 .await
690 }
691 MeshRequest::Inspect(deferred) => {
692 deferred.respond(|resp| {
693 resp.sensitivity_child("hosts", SensitivityLevel::Safe, |req| {
694 let mut resp = req.respond();
695 for host in self.hosts.iter().map(|(_, host)| host) {
696 resp.sensitivity_field_mut(
697 &host.pid.to_string(),
698 SensitivityLevel::Safe,
699 &mut inspect::adhoc(|req| {
700 req.respond()
701 .merge(&HostInspect {
702 name: &host.name,
703 node_id: host.node_id,
704 #[cfg(target_os = "linux")]
705 rlimit: inspect_rlimit::InspectRlimit::for_pid(
706 host.pid,
707 ),
708 })
709 .merge(inspect::send(
710 &host.send,
711 HostRequest::Inspect,
712 ));
713 }),
714 );
715 }
716 })
717 .sensitivity_field_mut(
718 &format!("hosts/{}", std::process::id()),
719 SensitivityLevel::Safe,
720 &mut inspect::adhoc(|req| {
721 let mut resp = req.respond();
722 resp.merge(&HostInspect {
723 name: &self.mesh_name,
724 node_id: self.node.id(),
725 #[cfg(target_os = "linux")]
726 rlimit: inspect_rlimit::InspectRlimit::new(),
727 });
728 inspect_host(&mut resp);
729 }),
730 );
731 });
732 }
733 MeshRequest::Crash(pid) => {
734 if pid == std::process::id() as i32 {
735 panic!("explicit panic request");
736 }
737
738 let mut found = false;
739 for (_, host) in &self.hosts {
740 if host.pid == pid {
741 host.send.send(HostRequest::Crash);
742 found = true;
743 break;
744 }
745 }
746
747 if !found {
748 tracing::error!("failed to crash process, pid {pid} not found");
749 }
750 }
751 },
752 Event::Done(id) => {
753 self.hosts.remove(id);
754 }
755 }
756 }
757 }
758
759 fn start_listener(&self, params: ListenParams) -> anyhow::Result<Task<()>> {
760 if let Ok(true) = Self::is_socket(¶ms.path) {
762 let _ = std::fs::remove_file(¶ms.path);
763 }
764
765 let mut listener = self
766 .node
767 .listen(&self.node_driver, ¶ms.path)
768 .context("failed to bind mesh listener")?;
769
770 let driver = self.node_driver.clone();
771 let port_factory = params.port_factory;
772
773 let task = self.node_driver.spawn("mesh-listener", async move {
774 loop {
775 match listener.accept(&driver).await {
776 Ok(pending) => {
777 let port = port_factory();
778 driver
779 .spawn("mesh-listener-handshake", async move {
780 if let Err(e) = pending.finish(port).await {
781 tracing::warn!(
782 error = &e as &dyn std::error::Error,
783 "mesh listener handshake failed",
784 );
785 }
786 })
787 .detach();
788 }
789 Err(e) => {
790 tracing::error!(
791 error = &e as &dyn std::error::Error,
792 "mesh listener accept failed",
793 );
794 break;
795 }
796 }
797 }
798 });
799
800 Ok(task)
801 }
802
803 #[cfg(unix)]
805 fn is_socket(path: &Path) -> std::io::Result<bool> {
806 use std::os::unix::fs::FileTypeExt;
807 Ok(std::fs::symlink_metadata(path)?.file_type().is_socket())
808 }
809
810 #[cfg(windows)]
812 fn is_socket(path: &Path) -> std::io::Result<bool> {
813 pal::windows::fs::is_unix_socket(path)
814 }
815
816 #[instrument(name = "mesh_spawn_process", skip(self, params), fields(mesh_name = self.mesh_name.as_str(), pid = tracing::field::Empty))]
818 async fn spawn_process(&mut self, params: NewHostParams) -> anyhow::Result<i32> {
819 let NewHostParams {
820 config,
821 recv,
822 request_send,
823 } = params;
824
825 let pid;
826 let node_id;
827
828 let (arg0, process_name) = if let Some(n) = &config.process_name {
832 (None, Cow::Borrowed(n))
833 } else {
834 (
835 std::env::args_os().next(),
836 Cow::Owned(std::env::current_exe().context("failed to get current exe path")?),
837 )
838 };
839
840 let name = config.name.clone();
841
842 #[cfg(windows)]
843 let child = {
844 let (invitation, handle) = self.node.invite(recv).context("mesh node invite error")?;
845 node_id = invitation.node_id();
846 let (credentials, directory) = invitation.into_parts();
847
848 let invitation_env = base64::engine::general_purpose::STANDARD.encode(
849 mesh::payload::encode(Invitation {
850 node_name: name.clone(),
851 credentials,
852 directory_handle: directory.as_raw_handle() as usize,
853 }),
854 );
855
856 let mut args = config.process_args;
857 if !config.skip_worker_arg {
858 args.push(name.clone().into());
859 }
860
861 let mut builder = process::Builder::from_args(
862 arg0.as_ref()
863 .map_or_else(|| process_name.as_os_str(), |x| x.as_os_str()),
864 &args,
865 );
866 if arg0.is_some() {
867 builder.application_name(process_name.as_path());
868 }
869 builder
870 .stdin(process::Stdio::Null)
871 .stdout(process::Stdio::Null)
872 .handle(&directory)
873 .env(INVITATION_ENV_NAME, invitation_env)
874 .extend_env(config.env_vars)
875 .job(self.job.as_handle());
876
877 if let Some(log_file) = config.stderr.as_ref() {
878 builder.stderr(process::Stdio::Handle(log_file.as_handle()));
879 }
880
881 if let Some(mut sandbox_profile) = config.sandbox_profile {
882 sandbox_profile.apply(&mut builder);
883 }
884
885 let child = std::thread::scope(|s| s.spawn(|| builder.spawn()).join().unwrap())
888 .context("failed to launch mesh process")?;
889 handle.await;
891 pid = child.id() as i32;
892 tracing::Span::current().record("pid", pid);
893
894 pal_async::windows::PolledProcess::new(&self.node_driver, child)
895 .expect("failed to create process wait")
896 };
897 #[cfg(unix)]
898 let child = {
899 use pal::unix::process;
900
901 let invitation = self
902 .node
903 .invite(recv)
904 .await
905 .context("mesh node invite error")?;
906
907 node_id = invitation.address.local_addr.node;
908
909 let invitation_env = base64::engine::general_purpose::STANDARD.encode(
910 mesh::payload::encode(Invitation {
911 node_name: name.clone(),
912 address: invitation.address,
913 socket_fd: IPC_FD,
914 }),
915 );
916
917 let mut command = process::Builder::new(process_name.into_owned());
918 if let Some(arg0) = arg0 {
919 command.arg0(arg0);
920 }
921 command
922 .args(&config.process_args)
923 .stdin(process::Stdio::Null)
924 .stdout(process::Stdio::Null)
925 .dup_fd(invitation.fd.as_fd(), IPC_FD)
926 .env(INVITATION_ENV_NAME, invitation_env);
927
928 for (key, value) in &config.env_vars {
929 command.env(key, value);
930 }
931
932 if !config.skip_worker_arg {
933 command.arg(&name);
934 }
935
936 if let Some(log_file) = config.stderr.as_ref() {
937 command.stderr(process::Stdio::Fd(log_file.as_fd()));
938 }
939
940 if let Some(mut sandbox_profile) = config.sandbox_profile {
941 sandbox_profile.apply(&mut command);
942 }
943
944 let child = std::thread::scope(|s| s.spawn(|| command.spawn()).join().unwrap())
947 .context("failed to launch mesh process")?;
948 pid = child.id();
949 tracing::Span::current().record("pid", pid);
950
951 pal_async::process::PolledChild::<process::Child>::new(&self.node_driver, child)
952 .expect("failed to create process wait")
953 };
954
955 let id = self.hosts.insert(MeshHostInner {
956 name: config.name,
957 pid,
958 node_id,
959 send: request_send,
960 });
961
962 let task = self
963 .node_driver
964 .spawn(format!("wait-mesh-child-{}", pid), async move {
965 wait_mesh_child(child, &name, pid).await;
966 id
967 });
968
969 self.waiters.push(task);
970 Ok(pid)
971 }
972}
973
974#[cfg(windows)]
975async fn wait_mesh_child(mut child: pal_async::windows::PolledProcess, name: &str, pid: i32) {
976 match child.wait().await {
977 Ok(0) => {
978 tracing::info!(pid, name, "mesh child exited successfully");
979 }
980 Ok(code) => {
981 tracing::error!(pid, name, code, "mesh child abnormal exit");
982 }
983 Err(e) => {
984 tracing::error!(
985 pid,
986 name,
987 error = &e as &dyn std::error::Error,
988 "mesh child wait failed"
989 );
990 }
991 }
992}
993
994#[cfg(unix)]
995async fn wait_mesh_child(
996 mut child: pal_async::process::PolledChild<pal::unix::process::Child>,
997 name: &str,
998 pid: i32,
999) {
1000 match child.wait().await {
1001 Ok(status) if status.code() == Some(0) => {
1002 tracing::info!(pid, name, "mesh child exited successfully");
1003 }
1004 Ok(exit_status) => {
1005 tracing::error!(pid, name, %exit_status, "mesh child abnormal exit");
1006 }
1007 Err(e) => {
1008 tracing::error!(
1009 pid,
1010 name,
1011 error = &e as &dyn std::error::Error,
1012 "mesh child wait failed"
1013 );
1014 }
1015 }
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020 use super::*;
1021 use pal_async::DefaultDriver;
1022 use pal_async::async_test;
1023 use pal_async::task::Spawn;
1024 use test_with_tracing::test;
1025
1026 #[async_test]
1027 async fn test_listen(driver: DefaultDriver) {
1028 let dir = tempfile::tempdir().unwrap();
1029 let sock_path = dir.path().join("mesh-listen.sock");
1030
1031 let mesh = Mesh::new("test-listen".to_string()).unwrap();
1032 let mut listener = mesh.listen::<String>(&sock_path).await.unwrap();
1033
1034 let client_driver = driver.clone();
1036 let client_path = sock_path.clone();
1037 let client_task = driver.spawn("client", async move {
1038 let (sender, conn) = connect::<String>(client_driver, &client_path)
1039 .await
1040 .unwrap();
1041 sender.send("hello from client".to_string());
1042 conn
1047 });
1048
1049 let msg = listener.next().await.unwrap();
1051 assert_eq!(msg, "hello from client");
1052
1053 client_task.await.shutdown().await;
1054
1055 drop(listener);
1057
1058 mesh.shutdown().await;
1059 }
1060}