pipette_client/
process.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Code to launch a command inside the guest.
5
6use crate::PipetteClient;
7use anyhow::Context;
8use futures::AsyncReadExt;
9use futures::executor::block_on;
10use futures::io::AllowStdIo;
11use futures_concurrency::future::Join;
12use mesh::pipe::ReadPipe;
13use mesh::pipe::WritePipe;
14use pipette_protocol::EnvPair;
15use pipette_protocol::PipetteRequest;
16use std::fmt;
17
18/// A builder for launching a command inside the guest.
19///
20/// This has a similar API to [`std::process::Command`].
21pub struct Command<'a> {
22    client: &'a PipetteClient,
23    program: String,
24    args: Vec<String>,
25    current_dir: Option<String>,
26    stdin: Option<Stdio>,
27    stdout: Option<Stdio>,
28    stderr: Option<Stdio>,
29    env: Vec<EnvPair>,
30    clear_env: bool,
31    chroot: Option<String>,
32}
33
34impl<'a> Command<'a> {
35    pub(super) fn new(client: &'a PipetteClient, program: impl AsRef<str>) -> Self {
36        Self {
37            client,
38            program: program.as_ref().to_owned(),
39            args: Vec::new(),
40            current_dir: None,
41            stdin: None,
42            stdout: None,
43            stderr: None,
44            env: Vec::new(),
45            clear_env: false,
46            chroot: None,
47        }
48    }
49
50    /// Adds an argument to the command.
51    pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
52        self.args.push(arg.as_ref().to_owned());
53        self
54    }
55
56    /// Adds multiple arguments to the command.
57    pub fn args<I: IntoIterator>(&mut self, args: I) -> &mut Self
58    where
59        I::Item: AsRef<str>,
60    {
61        self.args
62            .extend(args.into_iter().map(|item| item.as_ref().to_owned()));
63        self
64    }
65
66    /// Sets the current working directory for the command.
67    pub fn current_dir(&mut self, dir: impl AsRef<str>) -> &mut Self {
68        self.current_dir = Some(dir.as_ref().to_owned());
69        self
70    }
71
72    /// Clears the environment before setting the new environment.
73    pub fn env_clear(&mut self) -> &mut Self {
74        self.clear_env = true;
75        self.env.clear();
76        self
77    }
78
79    /// Sets an environment variable for the command.
80    pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
81        self.env.push(EnvPair {
82            name: name.as_ref().to_owned(),
83            value: Some(value.as_ref().to_owned()),
84        });
85        self
86    }
87
88    /// Removes an environment variable for the command.
89    pub fn env_remove(&mut self, name: impl AsRef<str>) -> &mut Self {
90        self.env.push(EnvPair {
91            name: name.as_ref().to_owned(),
92            value: None,
93        });
94        self
95    }
96
97    /// Sets the stdin for the command.
98    pub fn stdin(&mut self, stdin: impl Into<Stdio>) -> &mut Self {
99        self.stdin = Some(stdin.into());
100        self
101    }
102
103    /// Sets the stdout for the command.
104    pub fn stdout(&mut self, stdout: impl Into<Stdio>) -> &mut Self {
105        self.stdout = Some(stdout.into());
106        self
107    }
108
109    /// Sets the stderr for the command.
110    pub fn stderr(&mut self, stderr: impl Into<Stdio>) -> &mut Self {
111        self.stderr = Some(stderr.into());
112        self
113    }
114
115    /// Sets the chroot directory for the command (Linux only).
116    pub fn chroot(&mut self, root: impl AsRef<str>) -> &mut Self {
117        self.chroot = Some(root.as_ref().to_owned());
118        self
119    }
120
121    /// Spawns the command, defaulting to inheriting (relaying, really) the
122    /// current process for stdin, stdout, and stderr.
123    pub async fn spawn(&self) -> anyhow::Result<Child> {
124        self.spawn_inner(&StdioInner::Inherit, true).await
125    }
126
127    /// Spawns the command, capturing the standard output and standard error
128    /// (if they are not already set).
129    pub async fn output(&self) -> anyhow::Result<Output> {
130        let child = self.spawn_inner(&StdioInner::Piped, false).await?;
131        child.wait_with_output().await
132    }
133
134    async fn spawn_inner(
135        &self,
136        default_stdio: &StdioInner,
137        default_stdin: bool,
138    ) -> anyhow::Result<Child> {
139        let (stdin_read, stdin_write) = self
140            .stdin
141            .as_ref()
142            .map_or(
143                if default_stdin {
144                    default_stdio
145                } else {
146                    &StdioInner::Null
147                },
148                |x| &x.0,
149            )
150            .pipes(StdioFd::Stdin);
151
152        let (stdout_read, stdout_write) = self
153            .stdout
154            .as_ref()
155            .map_or(default_stdio, |x| &x.0)
156            .pipes(StdioFd::Stdout);
157        let (stderr_read, stderr_write) = self
158            .stderr
159            .as_ref()
160            .map_or(default_stdio, |x| &x.0)
161            .pipes(StdioFd::Stderr);
162
163        let request = pipette_protocol::ExecuteRequest {
164            program: self.program.clone(),
165            args: self.args.clone(),
166            current_dir: self.current_dir.clone(),
167            stdin: stdin_read,
168            stdout: stdout_write,
169            stderr: stderr_write,
170            env: self.env.clone(),
171            clear_env: self.clear_env,
172            chroot: self.chroot.clone(),
173        };
174
175        let response = self
176            .client
177            .send
178            .call_failable(PipetteRequest::Execute, request)
179            .await
180            .with_context(|| format!("failed to execute {}", self.program))?;
181
182        Ok(Child {
183            stdin: stdin_write,
184            stdout: stdout_read,
185            stderr: stderr_read,
186            pid: response.pid,
187            result: Ok(response.result),
188        })
189    }
190}
191
192/// Describes what to do with a standard I/O stream for a child process.
193pub struct Stdio(StdioInner);
194
195enum StdioInner {
196    Inherit,
197    Null,
198    Piped,
199}
200
201impl Stdio {
202    /// This stream will be "inherited" from the parent process.
203    ///
204    /// Internally, this will relay the standard input, output, or error of the
205    /// current process to the guest process.
206    pub fn inherit() -> Self {
207        Self(StdioInner::Inherit)
208    }
209
210    /// This stream will be ignored by the child process.
211    pub fn null() -> Self {
212        Self(StdioInner::Null)
213    }
214
215    /// A new pipe will be created to communicate with the child process.
216    pub fn piped() -> Self {
217        Self(StdioInner::Piped)
218    }
219}
220
221enum StdioFd {
222    Stdin,
223    Stdout,
224    Stderr,
225}
226
227impl StdioInner {
228    fn pipes(&self, fd: StdioFd) -> (Option<ReadPipe>, Option<WritePipe>) {
229        match self {
230            StdioInner::Null => (None, None),
231            StdioInner::Piped => {
232                let (read, write) = mesh::pipe::pipe();
233                (Some(read), Some(write))
234            }
235            StdioInner::Inherit => {
236                let (read, mut write) = mesh::pipe::pipe();
237                match fd {
238                    StdioFd::Stdin => {
239                        std::thread::Builder::new()
240                            .name("stdin-relay".to_owned())
241                            .spawn({
242                                move || {
243                                    block_on(futures::io::copy(
244                                        AllowStdIo::new(std::io::stdin()),
245                                        &mut write,
246                                    ))
247                                }
248                            })
249                            .unwrap();
250                        (Some(read), None)
251                    }
252                    StdioFd::Stdout => {
253                        std::thread::Builder::new()
254                            .name("stdout-relay".to_owned())
255                            .spawn({
256                                move || {
257                                    block_on(futures::io::copy(
258                                        read,
259                                        &mut AllowStdIo::new(std::io::stdout()),
260                                    ))
261                                }
262                            })
263                            .unwrap();
264                        (None, Some(write))
265                    }
266                    StdioFd::Stderr => {
267                        std::thread::Builder::new()
268                            .name("stderr-relay".to_owned())
269                            .spawn({
270                                move || {
271                                    block_on(futures::io::copy(
272                                        read,
273                                        &mut AllowStdIo::new(std::io::stderr()),
274                                    ))
275                                }
276                            })
277                            .unwrap();
278                        (None, Some(write))
279                    }
280                }
281            }
282        }
283    }
284}
285
286/// A spawned child process, similar to [`std::process::Child`].
287pub struct Child {
288    /// The standard input pipe of the process.
289    pub stdin: Option<WritePipe>,
290    /// The standard output pipe of the process.
291    pub stdout: Option<ReadPipe>,
292    /// The standard error pipe of the process.
293    pub stderr: Option<ReadPipe>,
294    pid: u32,
295    result: Result<mesh::OneshotReceiver<pipette_protocol::ExitStatus>, ExitStatus>,
296}
297
298impl Child {
299    /// Returns the process ID of the child within the guest.
300    pub fn id(&self) -> u32 {
301        self.pid
302    }
303
304    /// Waits for the child to exit, returning the exit status.
305    pub async fn wait(&mut self) -> Result<ExitStatus, mesh::RecvError> {
306        match &mut self.result {
307            Ok(recv) => {
308                let status = ExitStatus(recv.await?);
309                self.result = Err(status.clone());
310                Ok(status)
311            }
312            Err(status) => Ok(status.clone()),
313        }
314    }
315
316    /// Waits for the child to exit, returning the exit status and the
317    /// remaining data from standard output and standard error.
318    pub async fn wait_with_output(mut self) -> anyhow::Result<Output> {
319        self.stdin = None;
320        let mut stdout = Vec::new();
321        let mut stderr = Vec::new();
322        let stdout_pipe = self.stdout.take();
323        let stderr_pipe = self.stderr.take();
324        let stdout_task = async {
325            if let Some(mut pipe) = stdout_pipe {
326                let _ = pipe.read_to_end(&mut stdout).await;
327            }
328        };
329        let stderr_task = async {
330            if let Some(mut pipe) = stderr_pipe {
331                let _ = pipe.read_to_end(&mut stderr).await;
332            }
333        };
334        let wait_task = self.wait();
335        let (status, (), ()) = (wait_task, stdout_task, stderr_task).join().await;
336        let status = status?;
337        Ok(Output {
338            status,
339            stdout,
340            stderr,
341        })
342    }
343}
344
345/// The exit status of a process.
346#[derive(Debug, Clone)]
347pub struct ExitStatus(pipette_protocol::ExitStatus);
348
349impl ExitStatus {
350    /// Returns `true` if the process exited successfully.
351    pub fn success(&self) -> bool {
352        matches!(self.0, pipette_protocol::ExitStatus::Normal(0))
353    }
354
355    /// Returns the exit code of the process, if it exited normally.
356    pub fn code(&self) -> Option<i32> {
357        match self.0 {
358            pipette_protocol::ExitStatus::Normal(code) => Some(code),
359            pipette_protocol::ExitStatus::Signal(_) | pipette_protocol::ExitStatus::Unknown => None,
360        }
361    }
362
363    /// Returns the signal that terminated the process, if it was terminated
364    /// by a signal.
365    pub fn signal(&self) -> Option<i32> {
366        match self.0 {
367            pipette_protocol::ExitStatus::Signal(signal) => Some(signal),
368            pipette_protocol::ExitStatus::Normal(_) | pipette_protocol::ExitStatus::Unknown => None,
369        }
370    }
371}
372
373impl fmt::Display for ExitStatus {
374    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
375        match self.0 {
376            pipette_protocol::ExitStatus::Normal(code) if code >= 0 => {
377                write!(f, "exit code {}", code)
378            }
379            pipette_protocol::ExitStatus::Normal(code) => write!(f, "exit code {:#x}", code as u32),
380            pipette_protocol::ExitStatus::Signal(signal) => {
381                write!(f, "terminated by signal {}", signal)
382            }
383            pipette_protocol::ExitStatus::Unknown => write!(f, "unknown exit status"),
384        }
385    }
386}
387
388/// The result of a process execution.
389pub struct Output {
390    /// The exit status of the process.
391    pub status: ExitStatus,
392    /// The standard output of the process.
393    pub stdout: Vec<u8>,
394    /// The standard error of the process.
395    pub stderr: Vec<u8>,
396}