Skip to main content

pipette_client/
process.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Code to launch a command inside the guest.
5
6use crate::PipetteClient;
7use anyhow::Context;
8use futures::AsyncReadExt;
9use futures::executor::block_on;
10use futures::io::AllowStdIo;
11use futures_concurrency::future::Join;
12use mesh::pipe::ReadPipe;
13use mesh::pipe::WritePipe;
14use pipette_protocol::EnvPair;
15use pipette_protocol::PipetteRequest;
16use std::fmt;
17
18/// A builder for launching a command inside the guest.
19///
20/// This has a similar API to [`std::process::Command`].
21pub struct Command<'a> {
22    client: &'a PipetteClient,
23    program: String,
24    args: Vec<String>,
25    current_dir: Option<String>,
26    stdin: Option<Stdio>,
27    stdout: Option<Stdio>,
28    stderr: Option<Stdio>,
29    env: Vec<EnvPair>,
30    clear_env: bool,
31    chroot: Option<String>,
32    allocate_pty: bool,
33    combine_stderr: bool,
34}
35
36impl<'a> Command<'a> {
37    pub(super) fn new(client: &'a PipetteClient, program: impl AsRef<str>) -> Self {
38        Self {
39            client,
40            program: program.as_ref().to_owned(),
41            args: Vec::new(),
42            current_dir: None,
43            stdin: None,
44            stdout: None,
45            stderr: None,
46            env: Vec::new(),
47            clear_env: false,
48            chroot: None,
49            allocate_pty: false,
50            combine_stderr: false,
51        }
52    }
53
54    /// Adds an argument to the command.
55    pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
56        self.args.push(arg.as_ref().to_owned());
57        self
58    }
59
60    /// Adds multiple arguments to the command.
61    pub fn args<I: IntoIterator>(&mut self, args: I) -> &mut Self
62    where
63        I::Item: AsRef<str>,
64    {
65        self.args
66            .extend(args.into_iter().map(|item| item.as_ref().to_owned()));
67        self
68    }
69
70    /// Sets the current working directory for the command.
71    pub fn current_dir(&mut self, dir: impl AsRef<str>) -> &mut Self {
72        self.current_dir = Some(dir.as_ref().to_owned());
73        self
74    }
75
76    /// Clears the environment before setting the new environment.
77    pub fn env_clear(&mut self) -> &mut Self {
78        self.clear_env = true;
79        self.env.clear();
80        self
81    }
82
83    /// Sets an environment variable for the command.
84    pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
85        self.env.push(EnvPair {
86            name: name.as_ref().to_owned(),
87            value: Some(value.as_ref().to_owned()),
88        });
89        self
90    }
91
92    /// Removes an environment variable for the command.
93    pub fn env_remove(&mut self, name: impl AsRef<str>) -> &mut Self {
94        self.env.push(EnvPair {
95            name: name.as_ref().to_owned(),
96            value: None,
97        });
98        self
99    }
100
101    /// Sets the stdin for the command.
102    pub fn stdin(&mut self, stdin: impl Into<Stdio>) -> &mut Self {
103        self.stdin = Some(stdin.into());
104        self
105    }
106
107    /// Sets the stdout for the command.
108    pub fn stdout(&mut self, stdout: impl Into<Stdio>) -> &mut Self {
109        self.stdout = Some(stdout.into());
110        self
111    }
112
113    /// Sets the stderr for the command.
114    pub fn stderr(&mut self, stderr: impl Into<Stdio>) -> &mut Self {
115        self.stderr = Some(stderr.into());
116        self
117    }
118
119    /// Sets the chroot directory for the command (Linux only).
120    pub fn chroot(&mut self, root: impl AsRef<str>) -> &mut Self {
121        self.chroot = Some(root.as_ref().to_owned());
122        self
123    }
124
125    /// Allocate a PTY for the child process (Linux guests only).
126    ///
127    /// When set, stdin/stdout/stderr are all connected to a PTY secondary,
128    /// enabling terminal features like Ctrl-C signal propagation and
129    /// line editing. The PTY primary is relayed through the stdout pipe.
130    pub fn pty(&mut self, allocate: bool) -> &mut Self {
131        self.allocate_pty = allocate;
132        self
133    }
134
135    /// Redirect stderr to the stdout pipe.
136    ///
137    /// When set, the child's stderr is merged into stdout so callers
138    /// receive interleaved output through a single stream.
139    pub fn combine_stderr(&mut self, combine: bool) -> &mut Self {
140        self.combine_stderr = combine;
141        self
142    }
143
144    /// Spawns the command, defaulting to inheriting (relaying, really) the
145    /// current process for stdin, stdout, and stderr.
146    pub async fn spawn(&self) -> anyhow::Result<Child> {
147        self.spawn_inner(&StdioInner::Inherit, true).await
148    }
149
150    /// Spawns the command, capturing the standard output and standard error
151    /// (if they are not already set).
152    pub async fn output(&self) -> anyhow::Result<Output> {
153        let child = self.spawn_inner(&StdioInner::Piped, false).await?;
154        child.wait_with_output().await
155    }
156
157    async fn spawn_inner(
158        &self,
159        default_stdio: &StdioInner,
160        default_stdin: bool,
161    ) -> anyhow::Result<Child> {
162        let (stdin_read, stdin_write) = self
163            .stdin
164            .as_ref()
165            .map_or(
166                if default_stdin {
167                    default_stdio
168                } else {
169                    &StdioInner::Null
170                },
171                |x| &x.0,
172            )
173            .pipes(StdioFd::Stdin);
174
175        let (stdout_read, stdout_write) = self
176            .stdout
177            .as_ref()
178            .map_or(default_stdio, |x| &x.0)
179            .pipes(StdioFd::Stdout);
180        let (stderr_read, stderr_write) = if self.combine_stderr {
181            (None, None)
182        } else {
183            self.stderr
184                .as_ref()
185                .map_or(default_stdio, |x| &x.0)
186                .pipes(StdioFd::Stderr)
187        };
188
189        let request = pipette_protocol::ExecuteRequest {
190            program: self.program.clone(),
191            args: self.args.clone(),
192            current_dir: self.current_dir.clone(),
193            stdin: stdin_read,
194            stdout: stdout_write,
195            stderr: stderr_write,
196            env: self.env.clone(),
197            clear_env: self.clear_env,
198            chroot: self.chroot.clone(),
199            allocate_pty: self.allocate_pty,
200            combine_stderr: self.combine_stderr,
201        };
202
203        let response = self
204            .client
205            .send
206            .call_failable(PipetteRequest::Execute, request)
207            .await
208            .with_context(|| format!("failed to execute {}", self.program))?;
209
210        Ok(Child {
211            stdin: stdin_write,
212            stdout: stdout_read,
213            stderr: stderr_read,
214            pid: response.pid,
215            result: Ok(response.result),
216        })
217    }
218}
219
220/// Describes what to do with a standard I/O stream for a child process.
221pub struct Stdio(StdioInner);
222
223enum StdioInner {
224    Inherit,
225    Null,
226    Piped,
227}
228
229impl Stdio {
230    /// This stream will be "inherited" from the parent process.
231    ///
232    /// Internally, this will relay the standard input, output, or error of the
233    /// current process to the guest process.
234    pub fn inherit() -> Self {
235        Self(StdioInner::Inherit)
236    }
237
238    /// This stream will be ignored by the child process.
239    pub fn null() -> Self {
240        Self(StdioInner::Null)
241    }
242
243    /// A new pipe will be created to communicate with the child process.
244    pub fn piped() -> Self {
245        Self(StdioInner::Piped)
246    }
247}
248
249enum StdioFd {
250    Stdin,
251    Stdout,
252    Stderr,
253}
254
255impl StdioInner {
256    fn pipes(&self, fd: StdioFd) -> (Option<ReadPipe>, Option<WritePipe>) {
257        match self {
258            StdioInner::Null => (None, None),
259            StdioInner::Piped => {
260                let (read, write) = mesh::pipe::pipe();
261                (Some(read), Some(write))
262            }
263            StdioInner::Inherit => {
264                let (read, mut write) = mesh::pipe::pipe();
265                match fd {
266                    StdioFd::Stdin => {
267                        std::thread::Builder::new()
268                            .name("stdin-relay".to_owned())
269                            .spawn({
270                                move || {
271                                    block_on(futures::io::copy(
272                                        AllowStdIo::new(std::io::stdin()),
273                                        &mut write,
274                                    ))
275                                }
276                            })
277                            .unwrap();
278                        (Some(read), None)
279                    }
280                    StdioFd::Stdout => {
281                        std::thread::Builder::new()
282                            .name("stdout-relay".to_owned())
283                            .spawn({
284                                move || {
285                                    block_on(futures::io::copy(
286                                        read,
287                                        &mut AllowStdIo::new(term::raw_stdout()),
288                                    ))
289                                }
290                            })
291                            .unwrap();
292                        (None, Some(write))
293                    }
294                    StdioFd::Stderr => {
295                        std::thread::Builder::new()
296                            .name("stderr-relay".to_owned())
297                            .spawn({
298                                move || {
299                                    block_on(futures::io::copy(
300                                        read,
301                                        &mut AllowStdIo::new(term::raw_stderr()),
302                                    ))
303                                }
304                            })
305                            .unwrap();
306                        (None, Some(write))
307                    }
308                }
309            }
310        }
311    }
312}
313
314/// A spawned child process, similar to [`std::process::Child`].
315pub struct Child {
316    /// The standard input pipe of the process.
317    pub stdin: Option<WritePipe>,
318    /// The standard output pipe of the process.
319    pub stdout: Option<ReadPipe>,
320    /// The standard error pipe of the process.
321    pub stderr: Option<ReadPipe>,
322    pid: u32,
323    result: Result<mesh::OneshotReceiver<pipette_protocol::ExitStatus>, ExitStatus>,
324}
325
326impl Child {
327    /// Returns the process ID of the child within the guest.
328    pub fn id(&self) -> u32 {
329        self.pid
330    }
331
332    /// Waits for the child to exit, returning the exit status.
333    pub async fn wait(&mut self) -> Result<ExitStatus, mesh::RecvError> {
334        match &mut self.result {
335            Ok(recv) => {
336                let status = ExitStatus(recv.await?);
337                self.result = Err(status.clone());
338                Ok(status)
339            }
340            Err(status) => Ok(status.clone()),
341        }
342    }
343
344    /// Waits for the child to exit, returning the exit status and the
345    /// remaining data from standard output and standard error.
346    pub async fn wait_with_output(mut self) -> anyhow::Result<Output> {
347        self.stdin = None;
348        let mut stdout = Vec::new();
349        let mut stderr = Vec::new();
350        let stdout_pipe = self.stdout.take();
351        let stderr_pipe = self.stderr.take();
352        let stdout_task = async {
353            if let Some(mut pipe) = stdout_pipe {
354                let _ = pipe.read_to_end(&mut stdout).await;
355            }
356        };
357        let stderr_task = async {
358            if let Some(mut pipe) = stderr_pipe {
359                let _ = pipe.read_to_end(&mut stderr).await;
360            }
361        };
362        let wait_task = self.wait();
363        let (status, (), ()) = (wait_task, stdout_task, stderr_task).join().await;
364        let status = status?;
365        Ok(Output {
366            status,
367            stdout,
368            stderr,
369        })
370    }
371}
372
373/// The exit status of a process.
374#[derive(Debug, Clone)]
375pub struct ExitStatus(pipette_protocol::ExitStatus);
376
377impl ExitStatus {
378    /// Returns `true` if the process exited successfully.
379    pub fn success(&self) -> bool {
380        matches!(self.0, pipette_protocol::ExitStatus::Normal(0))
381    }
382
383    /// Returns the exit code of the process, if it exited normally.
384    pub fn code(&self) -> Option<i32> {
385        match self.0 {
386            pipette_protocol::ExitStatus::Normal(code) => Some(code),
387            pipette_protocol::ExitStatus::Signal(_) | pipette_protocol::ExitStatus::Unknown => None,
388        }
389    }
390
391    /// Returns the signal that terminated the process, if it was terminated
392    /// by a signal.
393    pub fn signal(&self) -> Option<i32> {
394        match self.0 {
395            pipette_protocol::ExitStatus::Signal(signal) => Some(signal),
396            pipette_protocol::ExitStatus::Normal(_) | pipette_protocol::ExitStatus::Unknown => None,
397        }
398    }
399}
400
401impl fmt::Display for ExitStatus {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        match self.0 {
404            pipette_protocol::ExitStatus::Normal(code) if code >= 0 => {
405                write!(f, "exit code {}", code)
406            }
407            pipette_protocol::ExitStatus::Normal(code) => write!(f, "exit code {:#x}", code as u32),
408            pipette_protocol::ExitStatus::Signal(signal) => {
409                write!(f, "terminated by signal {}", signal)
410            }
411            pipette_protocol::ExitStatus::Unknown => write!(f, "unknown exit status"),
412        }
413    }
414}
415
416/// The result of a process execution.
417pub struct Output {
418    /// The exit status of the process.
419    pub status: ExitStatus,
420    /// The standard output of the process.
421    pub stdout: Vec<u8>,
422    /// The standard error of the process.
423    pub stderr: Vec<u8>,
424}