Skip to main content

mesh_process/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Infrastructure to create a multi-process mesh and spawn child processes
5//! within it.
6//!
7//! Call [`Mesh::new()`] to create a process group. Workers launched on the mesh
8//! can run in child processes connected by the platform IPC transport
9//! (`mesh_remote`): Unix domain sockets on Linux, ALPC on Windows.
10//!
11//! The child process receives an invitation via an environment variable and
12//! calls [`try_run_mesh_host()`] early in `main()` to join the mesh. Once
13//! joined, ports (and the resources they carry) flow transparently between
14//! parent and child.
15//!
16//! This crate is used by OpenVMM to launch worker processes and by OpenHCL to
17//! run device emulators in isolated child processes.
18
19// UNSAFETY: Needed to accept a raw Fd/Handle from our spawning process.
20#![expect(unsafe_code)]
21
22use anyhow::Context;
23use base64::Engine;
24use debug_ptr::DebugPtr;
25use futures::FutureExt;
26use futures::Stream;
27use futures::StreamExt;
28use futures::executor::block_on;
29use futures_concurrency::future::Race;
30use inspect::Inspect;
31use inspect::SensitivityLevel;
32use mesh::MeshPayload;
33use mesh::OneshotReceiver;
34use mesh::local_node::Port;
35use mesh::message::MeshField;
36use mesh::payload::Protobuf;
37use mesh::rpc::FailableRpc;
38use mesh::rpc::RpcSend;
39#[cfg(unix)]
40use mesh_remote::InvitationAddress;
41#[cfg(unix)]
42use pal::unix::process::Builder as ProcessBuilder;
43#[cfg(windows)]
44use pal::windows::process;
45#[cfg(windows)]
46use pal::windows::process::Builder as ProcessBuilder;
47#[cfg(unix)]
48use pal_async::DefaultPool;
49use pal_async::task::Spawn;
50use pal_async::task::Task;
51use slab::Slab;
52use std::borrow::Cow;
53use std::ffi::OsString;
54use std::fs::File;
55#[cfg(unix)]
56use std::os::unix::prelude::*;
57#[cfg(windows)]
58use std::os::windows::prelude::*;
59use std::path::Path;
60use std::path::PathBuf;
61use std::pin::Pin;
62use tracing::Instrument;
63use tracing::instrument;
64use unicycle::FuturesUnordered;
65
66#[cfg(windows)]
67mod plat {
68    pub type IpcNode = mesh_remote::windows::AlpcNode;
69    pub type IpcNodeDriver = pal_async::windows::TpPool;
70}
71
72#[cfg(unix)]
73mod plat {
74    pub type IpcNode = mesh_remote::unix::UnixNode;
75    pub type IpcNodeDriver = pal_async::DefaultDriver;
76}
77
78use plat::IpcNode;
79use plat::IpcNodeDriver;
80
81#[cfg(unix)]
82const IPC_FD: i32 = 3;
83
84/// The environment variable for passing the mesh IPC invitation information to
85/// a child process. This is passed through the environment instead of a command
86/// line argument so that other processes cannot steal the invitation details
87/// and use it to break into the mesh.
88const INVITATION_ENV_NAME: &str = "MESH_WORKER_INVITATION";
89
90#[derive(Protobuf)]
91struct Invitation {
92    node_name: String,
93    #[cfg(windows)]
94    credentials: mesh_remote::windows::AlpcInvitationCredentials,
95    #[cfg(unix)]
96    address: InvitationAddress,
97    #[cfg(windows)]
98    directory_handle: usize,
99    #[cfg(unix)]
100    socket_fd: i32,
101}
102
103static PROCESS_NAME: DebugPtr<String> = DebugPtr::new();
104
105/// Runs a mesh host in the current thread, then exits the process, if this
106/// process was launched by [`Mesh::launch_host`].
107///
108/// The mesh invitation is provided via environment variables. If a mesh
109/// invitation is not available this function will return immediately with `Ok`.
110/// If a mesh invitation is available, this function joins the mesh and runs the
111/// future returned by `f` until `f` returns or the parent process shuts down
112/// the mesh.
113pub fn try_run_mesh_host<U, F, T>(base_name: &str, f: F) -> anyhow::Result<()>
114where
115    U: 'static + MeshPayload + Send,
116    F: AsyncFnOnce(U) -> anyhow::Result<T>,
117{
118    block_on(async {
119        if let Some(r) = node_from_environment().await? {
120            let NodeResult {
121                node_name,
122                node,
123                initial_port,
124            } = r;
125            PROCESS_NAME.store(&node_name);
126            set_program_name(&format!("{base_name}-{node_name}"));
127            let init = OneshotReceiver::<InitialMessage<U>>::from(initial_port)
128                .await
129                .context("failed to receive initial message")?;
130            let _drop = (
131                f(init.init_message).map(Some),
132                handle_host_requests(init.requests).map(|()| None),
133            )
134                .race()
135                .await
136                .transpose()?;
137
138            tracing::debug!("waiting to shut down node");
139            node.shutdown().await;
140            drop(_drop);
141            std::process::exit(0);
142        }
143        Ok(())
144    })
145}
146
147async fn handle_host_requests(mut recv: mesh::Receiver<HostRequest>) {
148    while let Some(req) = recv.next().await {
149        match req {
150            HostRequest::Inspect(deferred) => {
151                deferred.respond(inspect_host);
152            }
153            HostRequest::Crash => panic!("explicit panic request"),
154        }
155    }
156}
157
158fn set_program_name(name: &str) {
159    let _ = name;
160    #[cfg(target_os = "linux")]
161    {
162        let _ = std::fs::write("/proc/self/comm", name);
163    }
164}
165
166struct NodeResult {
167    node_name: String,
168    node: IpcNode,
169    initial_port: Port,
170}
171
172/// Create an IPC node from an invitation provided via the process environment.
173///
174/// Returns `None` if the invitation is not present in the environment.
175async fn node_from_environment() -> anyhow::Result<Option<NodeResult>> {
176    // return early with no node if the invitation is not present in the environment.
177    let invitation_str = match std::env::var(INVITATION_ENV_NAME) {
178        Ok(str) => str,
179        Err(_) => return Ok(None),
180    };
181
182    // Clear the string to avoid leaking the invitation information into child
183    // processes.
184    //
185    // TODO: this function is unsafe because
186    // it can cause UB if non-Rust code is concurrently accessing the
187    // environment in another thread. To be completely sound,
188    // either this function and its callers need to become
189    // `unsafe`, or we need to avoid using the environment to propagate the
190    // invitation so that we can avoid this call.
191    //
192    // SAFETY: Seems to work so far.
193    unsafe {
194        std::env::remove_var(INVITATION_ENV_NAME);
195    }
196
197    let invitation: Invitation = mesh::payload::decode(
198        &base64::engine::general_purpose::STANDARD
199            .decode(invitation_str)
200            .context("failed to base64 decode invitation")?,
201    )
202    .context("failed to protobuf decode invitation")?;
203
204    let (left, right) = Port::new_pair();
205
206    let node;
207    #[cfg(windows)]
208    {
209        // SAFETY: trusting the initiating process to pass a valid handle. A
210        // malicious process could pass a bad handle here, but a malicious
211        // process could also just corrupt our memory arbitrarily, so...
212        let directory =
213            unsafe { OwnedHandle::from_raw_handle(invitation.directory_handle as RawHandle) };
214
215        let invitation =
216            mesh_remote::windows::AlpcInvitation::new(invitation.credentials, directory);
217
218        // join the node w/ the provided invitation and the send port of the channel.
219        node = mesh_remote::windows::AlpcNode::join(
220            pal_async::windows::TpPool::system(),
221            invitation,
222            left,
223        )
224        .context("failed to join mesh")?;
225    }
226
227    #[cfg(unix)]
228    {
229        // SAFETY: trusting the initiating process to pass a valid fd. A
230        // malicious process could pass a bad fd here, but a malicious
231        // process could also just corrupt our memory arbitrarily, so...
232        let fd = unsafe { OwnedFd::from_raw_fd(invitation.socket_fd) };
233        let invitation = mesh_remote::unix::UnixInvitation {
234            address: invitation.address,
235            fd,
236        };
237
238        // FUTURE: use pool provided by the caller.
239        let (_, driver) = DefaultPool::spawn_on_thread("mesh-worker-pool");
240        node = mesh_remote::unix::UnixNode::join(driver, invitation, left)
241            .await
242            .context("failed to join mesh")?;
243    }
244
245    Ok(Some(NodeResult {
246        node_name: invitation.node_name,
247        node,
248        initial_port: right,
249    }))
250}
251
252/// Represents a mesh::Node with the ability to spawn new processes that can
253/// communicate with any other process belonging to the same mesh.
254///
255/// # Process creation
256/// A `Mesh` instance can spawn new processes with an initial communication
257/// channel associated with the mesh. All processes originating from the same
258/// mesh can potentially communicate and exchange channels with each other.
259///
260/// Each spawned process can be configured differently via [`ProcessConfig`].
261/// Processes are created with [`Mesh::launch_host`].
262///
263/// ```no_run
264/// # use mesh_process::{Mesh, ProcessConfig};
265/// # futures::executor::block_on(async {
266/// let mesh = Mesh::new("remote_mesh".to_string()).unwrap();
267/// let (send, recv) = mesh::channel();
268/// mesh.launch_host(ProcessConfig::new("test"), recv).await.unwrap();
269/// send.send(String::from("message for new process"));
270/// # })
271/// ```
272#[derive(Inspect)]
273pub struct Mesh {
274    #[inspect(rename = "name")]
275    mesh_name: String,
276    #[inspect(flatten, send = "MeshRequest::Inspect")]
277    request: mesh::Sender<MeshRequest>,
278    #[inspect(skip)]
279    task: Task<()>,
280}
281
282/// Sandbox profile trait used for mesh hosts.
283pub trait SandboxProfile: Send {
284    /// Apply executes in the parent context and configures any sandbox
285    /// features that will be applied to the newly created process via
286    /// the pal builder object.
287    fn apply(&mut self, builder: &mut ProcessBuilder<'_>);
288
289    /// Finalize is intended to execute in the child process context after
290    /// application specific initialization is complete. It's optional as not
291    /// every sandbox profile will need to perform additional sandboxing.
292    /// In addition, the child will need to be aware enough to instantiate its
293    /// sandbox profile and invoke this method.
294    fn finalize(&mut self) -> anyhow::Result<()> {
295        Ok(())
296    }
297}
298
299/// Configuration for launching a new process in the mesh.
300pub struct ProcessConfig {
301    name: String,
302    process_name: Option<PathBuf>,
303    process_args: Vec<OsString>,
304    stderr: Option<File>,
305    skip_worker_arg: bool,
306    sandbox_profile: Option<Box<dyn SandboxProfile + Sync>>,
307    env_vars: Vec<(OsString, OsString)>,
308}
309
310impl ProcessConfig {
311    /// Returns new process configuration using the current process as the
312    /// process name.
313    pub fn new(name: impl Into<String>) -> Self {
314        Self {
315            name: name.into(),
316            process_name: None,
317            process_args: Vec::new(),
318            stderr: None,
319            skip_worker_arg: false,
320            sandbox_profile: None,
321            env_vars: Vec::new(),
322        }
323    }
324
325    /// Returns a new process configuration using the current process as the
326    /// process name.
327    pub fn new_with_sandbox(
328        name: impl Into<String>,
329        sandbox_profile: Box<dyn SandboxProfile + Sync>,
330    ) -> Self {
331        Self {
332            name: name.into(),
333            process_name: None,
334            process_args: Vec::new(),
335            stderr: None,
336            skip_worker_arg: false,
337            sandbox_profile: Some(sandbox_profile),
338            env_vars: Vec::new(),
339        }
340    }
341
342    /// Sets the process name.
343    pub fn process_name(mut self, name: impl Into<PathBuf>) -> Self {
344        self.process_name = Some(name.into());
345        self
346    }
347
348    /// Specifies whether to  appending `<node name>` to the process's command
349    /// line.
350    ///
351    /// This is done by default to make it easier to identify the process in
352    /// task lists, but if your process parses the command line then this may
353    /// get in the way.
354    pub fn skip_worker_arg(mut self, skip: bool) -> Self {
355        self.skip_worker_arg = skip;
356        self
357    }
358
359    /// Adds arguments to the process command line.
360    pub fn args<I>(mut self, args: I) -> Self
361    where
362        I: IntoIterator,
363        I::Item: Into<OsString>,
364    {
365        self.process_args.extend(args.into_iter().map(|x| x.into()));
366        self
367    }
368
369    /// Adds environment variables when launching the process.
370    pub fn env<I>(mut self, env_vars: I) -> Self
371    where
372        I: IntoIterator,
373        I::Item: Into<(OsString, OsString)>,
374    {
375        self.env_vars.extend(env_vars.into_iter().map(|x| x.into()));
376        self
377    }
378
379    /// Sets the process's stderr to `file`.
380    pub fn stderr(mut self, file: Option<File>) -> Self {
381        self.stderr = file;
382        self
383    }
384}
385
386struct MeshInner {
387    requests: mesh::Receiver<MeshRequest>,
388    hosts: Slab<MeshHostInner>,
389    /// Handles for spawned host processes.
390    waiters: FuturesUnordered<Task<usize>>,
391    /// Mesh node for host process communication.
392    node: IpcNode,
393    /// IO driver for the mesh node, used for listener accept loops,
394    /// handshakes, and general async task spawning. This is the same
395    /// driver that was passed to the node on creation.
396    node_driver: IpcNodeDriver,
397    /// Name for this mesh instance, used for tracing/debugging.
398    mesh_name: String,
399    /// Job object. When closed, it will terminate all the child processes. This
400    /// is used to ensure the child processes don't outlive the parent.
401    #[cfg(windows)]
402    job: pal::windows::job::Job,
403}
404
405struct MeshHostInner {
406    name: String,
407    pid: i32,
408    node_id: mesh::NodeId,
409    send: mesh::Sender<HostRequest>,
410}
411
412enum MeshRequest {
413    NewHost(FailableRpc<NewHostParams, i32>),
414    Listen(FailableRpc<ListenParams, Task<()>>),
415    Inspect(inspect::Deferred),
416    Crash(i32),
417}
418
419struct ListenParams {
420    path: PathBuf,
421    port_factory: Box<dyn Fn() -> Port + Send>,
422}
423
424struct NewHostParams {
425    config: ProcessConfig,
426    recv: Port,
427    request_send: mesh::Sender<HostRequest>,
428}
429
430impl Mesh {
431    /// Creates a new mesh with the given name.
432    pub fn new(mesh_name: String) -> anyhow::Result<Self> {
433        #[cfg(windows)]
434        let job = {
435            let job = pal::windows::job::Job::new().context("failed to create job object")?;
436            job.set_terminate_on_close()
437                .context("failed to set job object terminate on close")?;
438            job
439        };
440
441        #[cfg(windows)]
442        let (node, node_driver) = {
443            let driver = pal_async::windows::TpPool::system();
444            // Use new_named so that the ALPC directory has a path in the Ob
445            // namespace. This is required for listen() — the listener handshake
446            // creates named invitations, which need a named directory.
447            let node = mesh_remote::windows::AlpcNode::new_named(driver.clone())
448                .context("AlpcNode creation failure")?;
449            (node, driver)
450        };
451        #[cfg(unix)]
452        let (node, node_driver) = {
453            // FUTURE: use pool provided by the caller.
454            let (_, driver) = DefaultPool::spawn_on_thread("mesh-worker-pool");
455            let node = mesh_remote::unix::UnixNode::new(driver.clone());
456            (node, driver)
457        };
458
459        let (request, requests) = mesh::channel();
460
461        let mut inner = MeshInner {
462            requests,
463            hosts: Default::default(),
464            waiters: Default::default(),
465            node,
466            node_driver: node_driver.clone(),
467            mesh_name: mesh_name.clone(),
468            #[cfg(windows)]
469            job,
470        };
471
472        let task = node_driver.spawn(
473            format!("mesh-{}", &mesh_name),
474            async move { inner.run().await },
475        );
476
477        Ok(Self {
478            request,
479            mesh_name,
480            task,
481        })
482    }
483
484    /// Spawns a new host in the mesh with the provided configuration and
485    /// initial message.
486    ///
487    /// The initial message will be provided to the closure passed to
488    /// [`try_run_mesh_host()`].
489    ///
490    /// Returns the process ID of the launched host.
491    pub async fn launch_host<T: 'static + MeshField + Send>(
492        &self,
493        config: ProcessConfig,
494        initial_message: T,
495    ) -> anyhow::Result<i32> {
496        let (request_send, request_recv) = mesh::channel();
497
498        let (init_send, init_recv) = mesh::oneshot::<InitialMessage<T>>();
499        init_send.send(InitialMessage {
500            requests: request_recv,
501            init_message: initial_message,
502        });
503
504        self.request
505            .call_failable(
506                MeshRequest::NewHost,
507                NewHostParams {
508                    config,
509                    recv: init_recv.into(),
510                    request_send,
511                },
512            )
513            .await
514            .context("failed to launch new host")
515    }
516
517    /// Shutdown the mesh and wait for any spawned processes to exit.
518    ///
519    /// The `Mesh` instance is no longer usable after `shutdown`.
520    pub async fn shutdown(self) {
521        let span = tracing::span!(
522            tracing::Level::INFO,
523            "mesh_shutdown",
524            name = self.mesh_name.as_str(),
525        );
526
527        async {
528            drop(self.request);
529            self.task.await;
530        }
531        .instrument(span)
532        .await;
533    }
534
535    /// Crashes the child process with the given process ID.
536    pub fn crash(&self, pid: i32) {
537        self.request.send(MeshRequest::Crash(pid));
538    }
539
540    /// Listen for mesh connections on a Unix socket.
541    ///
542    /// Returns a [`Listener<T>`] that yields items from connecting clients.
543    /// Each client gets a `Sender<T>` bridged to this listener's queue.
544    /// Dropping the [`Listener`] stops accepting new connections.
545    ///
546    /// # Security
547    ///
548    /// The socket at `path` is an external entry point for other local
549    /// processes to join the mesh. On Unix, `path` must reside in a
550    /// directory accessible only to the intended user (e.g. mode `0700`).
551    /// On Windows, the socket's parent directory should be ACL'd to
552    /// restrict access. Any local user who can connect to the socket can
553    /// join the mesh.
554    pub async fn listen<T: 'static + MeshField + Send>(
555        &self,
556        path: &Path,
557    ) -> anyhow::Result<Listener<T>> {
558        let (send, recv) = mesh::channel::<T>();
559        let port_factory: Box<dyn Fn() -> Port + Send> = Box::new(move || send.clone().into());
560
561        let task = self
562            .request
563            .call_failable(
564                MeshRequest::Listen,
565                ListenParams {
566                    path: path.to_owned(),
567                    port_factory,
568                },
569            )
570            .await
571            .context("listen failed")?;
572
573        Ok(Listener { recv, _task: task })
574    }
575}
576
577/// A listener for incoming mesh connections.
578///
579/// Each connecting client gets a `Sender<T>` bridged into this listener's
580/// queue. When the client sends a `T`, it appears in the stream.
581///
582/// Implements [`Stream`]`<Item = T>`. Dropping the listener stops accepting
583/// new connections.
584pub struct Listener<T> {
585    recv: mesh::Receiver<T>,
586    _task: Task<()>,
587}
588
589impl<T: 'static + MeshField + Send> Stream for Listener<T> {
590    type Item = T;
591
592    fn poll_next(
593        mut self: Pin<&mut Self>,
594        cx: &mut std::task::Context<'_>,
595    ) -> std::task::Poll<Option<Self::Item>> {
596        self.recv.poll_next_unpin(cx)
597    }
598}
599
600/// Connect to a mesh listener at `path`.
601///
602/// Returns a [`mesh::Sender<T>`] for sending messages to the listener, and a
603/// [`Connection`] that keeps the underlying IPC node alive. Drop
604/// [`Connection`] to disconnect.
605pub async fn connect<T: 'static + MeshField + Send>(
606    driver: impl pal_async::driver::Driver + Spawn + Clone,
607    path: &Path,
608) -> anyhow::Result<(mesh::Sender<T>, Connection)> {
609    let (send, recv) = mesh::channel::<T>();
610
611    #[cfg(windows)]
612    let node = mesh_remote::windows::AlpcNode::join_by_socket(driver, path, recv.into())
613        .await
614        .context("failed to connect to mesh listener")?;
615
616    #[cfg(unix)]
617    let node = mesh_remote::unix::UnixNode::join_by_path(driver, path, recv.into())
618        .await
619        .context("failed to connect to mesh listener")?;
620
621    Ok((send, Connection { node }))
622}
623
624/// An active connection to a mesh listener.
625///
626/// Keeps the underlying IPC node alive. Drop to disconnect.
627pub struct Connection {
628    node: IpcNode,
629}
630
631impl Connection {
632    /// Gracefully shut down the connection, waiting for pending messages to be
633    /// delivered.
634    pub async fn shutdown(self) {
635        self.node.shutdown().await;
636    }
637}
638
639#[derive(MeshPayload)]
640struct InitialMessage<T> {
641    requests: mesh::Receiver<HostRequest>,
642    init_message: T,
643}
644
645#[derive(Debug, MeshPayload)]
646enum HostRequest {
647    #[mesh(transparent)]
648    Inspect(inspect::Deferred),
649    Crash,
650}
651
652fn inspect_host(resp: &mut inspect::Response<'_>) {
653    resp.field("tasks", inspect_task::inspect_task_list());
654}
655
656#[derive(Inspect)]
657struct HostInspect<'a> {
658    #[inspect(safe)]
659    name: &'a str,
660    #[inspect(debug, safe)]
661    node_id: mesh::NodeId,
662    #[cfg(target_os = "linux")]
663    #[inspect(safe)]
664    rlimit: inspect_rlimit::InspectRlimit,
665}
666
667impl MeshInner {
668    async fn run(&mut self) {
669        enum Event {
670            Request(MeshRequest),
671            Done(usize),
672        }
673
674        loop {
675            let event = futures::select! { // merge semantics
676                request = self.requests.select_next_some() => Event::Request(request),
677                n = self.waiters.select_next_some() => Event::Done(n),
678                complete => break,
679            };
680
681            match event {
682                Event::Request(request) => match request {
683                    MeshRequest::NewHost(rpc) => {
684                        rpc.handle_failable(async |params| self.spawn_process(params).await)
685                            .await
686                    }
687                    MeshRequest::Listen(rpc) => {
688                        rpc.handle_failable(async |params| self.start_listener(params))
689                            .await
690                    }
691                    MeshRequest::Inspect(deferred) => {
692                        deferred.respond(|resp| {
693                            resp.sensitivity_child("hosts", SensitivityLevel::Safe, |req| {
694                                let mut resp = req.respond();
695                                for host in self.hosts.iter().map(|(_, host)| host) {
696                                    resp.sensitivity_field_mut(
697                                        &host.pid.to_string(),
698                                        SensitivityLevel::Safe,
699                                        &mut inspect::adhoc(|req| {
700                                            req.respond()
701                                                .merge(&HostInspect {
702                                                    name: &host.name,
703                                                    node_id: host.node_id,
704                                                    #[cfg(target_os = "linux")]
705                                                    rlimit: inspect_rlimit::InspectRlimit::for_pid(
706                                                        host.pid,
707                                                    ),
708                                                })
709                                                .merge(inspect::send(
710                                                    &host.send,
711                                                    HostRequest::Inspect,
712                                                ));
713                                        }),
714                                    );
715                                }
716                            })
717                            .sensitivity_field_mut(
718                                &format!("hosts/{}", std::process::id()),
719                                SensitivityLevel::Safe,
720                                &mut inspect::adhoc(|req| {
721                                    let mut resp = req.respond();
722                                    resp.merge(&HostInspect {
723                                        name: &self.mesh_name,
724                                        node_id: self.node.id(),
725                                        #[cfg(target_os = "linux")]
726                                        rlimit: inspect_rlimit::InspectRlimit::new(),
727                                    });
728                                    inspect_host(&mut resp);
729                                }),
730                            );
731                        });
732                    }
733                    MeshRequest::Crash(pid) => {
734                        if pid == std::process::id() as i32 {
735                            panic!("explicit panic request");
736                        }
737
738                        let mut found = false;
739                        for (_, host) in &self.hosts {
740                            if host.pid == pid {
741                                host.send.send(HostRequest::Crash);
742                                found = true;
743                                break;
744                            }
745                        }
746
747                        if !found {
748                            tracing::error!("failed to crash process, pid {pid} not found");
749                        }
750                    }
751                },
752                Event::Done(id) => {
753                    self.hosts.remove(id);
754                }
755            }
756        }
757    }
758
759    fn start_listener(&self, params: ListenParams) -> anyhow::Result<Task<()>> {
760        // Remove a stale socket file if one exists, so that bind doesn't fail.
761        if let Ok(true) = Self::is_socket(&params.path) {
762            let _ = std::fs::remove_file(&params.path);
763        }
764
765        let mut listener = self
766            .node
767            .listen(&self.node_driver, &params.path)
768            .context("failed to bind mesh listener")?;
769
770        let driver = self.node_driver.clone();
771        let port_factory = params.port_factory;
772
773        let task = self.node_driver.spawn("mesh-listener", async move {
774            loop {
775                match listener.accept(&driver).await {
776                    Ok(pending) => {
777                        let port = port_factory();
778                        driver
779                            .spawn("mesh-listener-handshake", async move {
780                                if let Err(e) = pending.finish(port).await {
781                                    tracing::warn!(
782                                        error = &e as &dyn std::error::Error,
783                                        "mesh listener handshake failed",
784                                    );
785                                }
786                            })
787                            .detach();
788                    }
789                    Err(e) => {
790                        tracing::error!(
791                            error = &e as &dyn std::error::Error,
792                            "mesh listener accept failed",
793                        );
794                        break;
795                    }
796                }
797            }
798        });
799
800        Ok(task)
801    }
802
803    /// Checks whether `path` is a socket file.
804    #[cfg(unix)]
805    fn is_socket(path: &Path) -> std::io::Result<bool> {
806        use std::os::unix::fs::FileTypeExt;
807        Ok(std::fs::symlink_metadata(path)?.file_type().is_socket())
808    }
809
810    /// Checks whether `path` is a socket file.
811    #[cfg(windows)]
812    fn is_socket(path: &Path) -> std::io::Result<bool> {
813        pal::windows::fs::is_unix_socket(path)
814    }
815
816    /// Spawns a new process with a mesh channel associated with this `Mesh` instance.
817    #[instrument(name = "mesh_spawn_process", skip(self, params), fields(mesh_name = self.mesh_name.as_str(), pid = tracing::field::Empty))]
818    async fn spawn_process(&mut self, params: NewHostParams) -> anyhow::Result<i32> {
819        let NewHostParams {
820            config,
821            recv,
822            request_send,
823        } = params;
824
825        let pid;
826        let node_id;
827
828        // If no process name was passed, use the current executable path to
829        // ensure we get the right file, but set arg0 to match how this process
830        // was launched.
831        let (arg0, process_name) = if let Some(n) = &config.process_name {
832            (None, Cow::Borrowed(n))
833        } else {
834            (
835                std::env::args_os().next(),
836                Cow::Owned(std::env::current_exe().context("failed to get current exe path")?),
837            )
838        };
839
840        let name = config.name.clone();
841
842        #[cfg(windows)]
843        let child = {
844            let (invitation, handle) = self.node.invite(recv).context("mesh node invite error")?;
845            node_id = invitation.node_id();
846            let (credentials, directory) = invitation.into_parts();
847
848            let invitation_env = base64::engine::general_purpose::STANDARD.encode(
849                mesh::payload::encode(Invitation {
850                    node_name: name.clone(),
851                    credentials,
852                    directory_handle: directory.as_raw_handle() as usize,
853                }),
854            );
855
856            let mut args = config.process_args;
857            if !config.skip_worker_arg {
858                args.push(name.clone().into());
859            }
860
861            let mut builder = process::Builder::from_args(
862                arg0.as_ref()
863                    .map_or_else(|| process_name.as_os_str(), |x| x.as_os_str()),
864                &args,
865            );
866            if arg0.is_some() {
867                builder.application_name(process_name.as_path());
868            }
869            builder
870                .stdin(process::Stdio::Null)
871                .stdout(process::Stdio::Null)
872                .handle(&directory)
873                .env(INVITATION_ENV_NAME, invitation_env)
874                .extend_env(config.env_vars)
875                .job(self.job.as_handle());
876
877            if let Some(log_file) = config.stderr.as_ref() {
878                builder.stderr(process::Stdio::Handle(log_file.as_handle()));
879            }
880
881            if let Some(mut sandbox_profile) = config.sandbox_profile {
882                sandbox_profile.apply(&mut builder);
883            }
884
885            // Launch the child process on a separate thread to isolate
886            // the CreateProcess call from other IO pools.
887            let child = std::thread::scope(|s| s.spawn(|| builder.spawn()).join().unwrap())
888                .context("failed to launch mesh process")?;
889            // Wait for the child to connect to the mesh. TODO: timeout
890            handle.await;
891            pid = child.id() as i32;
892            tracing::Span::current().record("pid", pid);
893
894            pal_async::windows::PolledProcess::new(&self.node_driver, child)
895                .expect("failed to create process wait")
896        };
897        #[cfg(unix)]
898        let child = {
899            use pal::unix::process;
900
901            let invitation = self
902                .node
903                .invite(recv)
904                .await
905                .context("mesh node invite error")?;
906
907            node_id = invitation.address.local_addr.node;
908
909            let invitation_env = base64::engine::general_purpose::STANDARD.encode(
910                mesh::payload::encode(Invitation {
911                    node_name: name.clone(),
912                    address: invitation.address,
913                    socket_fd: IPC_FD,
914                }),
915            );
916
917            let mut command = process::Builder::new(process_name.into_owned());
918            if let Some(arg0) = arg0 {
919                command.arg0(arg0);
920            }
921            command
922                .args(&config.process_args)
923                .stdin(process::Stdio::Null)
924                .stdout(process::Stdio::Null)
925                .dup_fd(invitation.fd.as_fd(), IPC_FD)
926                .env(INVITATION_ENV_NAME, invitation_env);
927
928            for (key, value) in &config.env_vars {
929                command.env(key, value);
930            }
931
932            if !config.skip_worker_arg {
933                command.arg(&name);
934            }
935
936            if let Some(log_file) = config.stderr.as_ref() {
937                command.stderr(process::Stdio::Fd(log_file.as_fd()));
938            }
939
940            if let Some(mut sandbox_profile) = config.sandbox_profile {
941                sandbox_profile.apply(&mut command);
942            }
943
944            // Launch the child process on a separate thread to isolate
945            // the fork() call from other IO pools.
946            let child = std::thread::scope(|s| s.spawn(|| command.spawn()).join().unwrap())
947                .context("failed to launch mesh process")?;
948            pid = child.id();
949            tracing::Span::current().record("pid", pid);
950
951            pal_async::process::PolledChild::<process::Child>::new(&self.node_driver, child)
952                .expect("failed to create process wait")
953        };
954
955        let id = self.hosts.insert(MeshHostInner {
956            name: config.name,
957            pid,
958            node_id,
959            send: request_send,
960        });
961
962        let task = self
963            .node_driver
964            .spawn(format!("wait-mesh-child-{}", pid), async move {
965                wait_mesh_child(child, &name, pid).await;
966                id
967            });
968
969        self.waiters.push(task);
970        Ok(pid)
971    }
972}
973
974#[cfg(windows)]
975async fn wait_mesh_child(mut child: pal_async::windows::PolledProcess, name: &str, pid: i32) {
976    match child.wait().await {
977        Ok(0) => {
978            tracing::info!(pid, name, "mesh child exited successfully");
979        }
980        Ok(code) => {
981            tracing::error!(pid, name, code, "mesh child abnormal exit");
982        }
983        Err(e) => {
984            tracing::error!(
985                pid,
986                name,
987                error = &e as &dyn std::error::Error,
988                "mesh child wait failed"
989            );
990        }
991    }
992}
993
994#[cfg(unix)]
995async fn wait_mesh_child(
996    mut child: pal_async::process::PolledChild<pal::unix::process::Child>,
997    name: &str,
998    pid: i32,
999) {
1000    match child.wait().await {
1001        Ok(status) if status.code() == Some(0) => {
1002            tracing::info!(pid, name, "mesh child exited successfully");
1003        }
1004        Ok(exit_status) => {
1005            tracing::error!(pid, name, %exit_status, "mesh child abnormal exit");
1006        }
1007        Err(e) => {
1008            tracing::error!(
1009                pid,
1010                name,
1011                error = &e as &dyn std::error::Error,
1012                "mesh child wait failed"
1013            );
1014        }
1015    }
1016}
1017
1018#[cfg(test)]
1019mod tests {
1020    use super::*;
1021    use pal_async::DefaultDriver;
1022    use pal_async::async_test;
1023    use pal_async::task::Spawn;
1024    use test_with_tracing::test;
1025
1026    #[async_test]
1027    async fn test_listen(driver: DefaultDriver) {
1028        let dir = tempfile::tempdir().unwrap();
1029        let sock_path = dir.path().join("mesh-listen.sock");
1030
1031        let mesh = Mesh::new("test-listen".to_string()).unwrap();
1032        let mut listener = mesh.listen::<String>(&sock_path).await.unwrap();
1033
1034        // Spawn a client task that connects and sends a message.
1035        let client_driver = driver.clone();
1036        let client_path = sock_path.clone();
1037        let client_task = driver.spawn("client", async move {
1038            let (sender, conn) = connect::<String>(client_driver, &client_path)
1039                .await
1040                .unwrap();
1041            sender.send("hello from client".to_string());
1042            // Return the connection to keep the node alive until the server
1043            // has received the message. Shutting down immediately can race
1044            // with the server establishing a back-connection for port
1045            // bridging on Windows (ALPC).
1046            conn
1047        });
1048
1049        // Receive the message via the listener stream.
1050        let msg = listener.next().await.unwrap();
1051        assert_eq!(msg, "hello from client");
1052
1053        client_task.await.shutdown().await;
1054
1055        // Drop the listener and verify the accept loop stops.
1056        drop(listener);
1057
1058        mesh.shutdown().await;
1059    }
1060}