pipette_client/
process.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Code to launch a command inside the guest.
5
6use crate::PipetteClient;
7use anyhow::Context;
8use futures::AsyncReadExt;
9use futures::executor::block_on;
10use futures::io::AllowStdIo;
11use futures_concurrency::future::Join;
12use mesh::pipe::ReadPipe;
13use mesh::pipe::WritePipe;
14use pipette_protocol::EnvPair;
15use pipette_protocol::PipetteRequest;
16use std::fmt;
17
18/// A builder for launching a command inside the guest.
19///
20/// This has a similar API to [`std::process::Command`].
21pub struct Command<'a> {
22    client: &'a PipetteClient,
23    program: String,
24    args: Vec<String>,
25    current_dir: Option<String>,
26    stdin: Option<Stdio>,
27    stdout: Option<Stdio>,
28    stderr: Option<Stdio>,
29    env: Vec<EnvPair>,
30    clear_env: bool,
31}
32
33impl<'a> Command<'a> {
34    pub(super) fn new(client: &'a PipetteClient, program: impl AsRef<str>) -> Self {
35        Self {
36            client,
37            program: program.as_ref().to_owned(),
38            args: Vec::new(),
39            current_dir: None,
40            stdin: None,
41            stdout: None,
42            stderr: None,
43            env: Vec::new(),
44            clear_env: false,
45        }
46    }
47
48    /// Adds an argument to the command.
49    pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
50        self.args.push(arg.as_ref().to_owned());
51        self
52    }
53
54    /// Adds multiple arguments to the command.
55    pub fn args<I: IntoIterator>(&mut self, args: I) -> &mut Self
56    where
57        I::Item: AsRef<str>,
58    {
59        self.args
60            .extend(args.into_iter().map(|item| item.as_ref().to_owned()));
61        self
62    }
63
64    /// Sets the current working directory for the command.
65    pub fn current_dir(&mut self, dir: impl AsRef<str>) -> &mut Self {
66        self.current_dir = Some(dir.as_ref().to_owned());
67        self
68    }
69
70    /// Clears the environment before setting the new environment.
71    pub fn env_clear(&mut self) -> &mut Self {
72        self.clear_env = true;
73        self.env.clear();
74        self
75    }
76
77    /// Sets an environment variable for the command.
78    pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
79        self.env.push(EnvPair {
80            name: name.as_ref().to_owned(),
81            value: Some(value.as_ref().to_owned()),
82        });
83        self
84    }
85
86    /// Removes an environment variable for the command.
87    pub fn env_remove(&mut self, name: impl AsRef<str>) -> &mut Self {
88        self.env.push(EnvPair {
89            name: name.as_ref().to_owned(),
90            value: None,
91        });
92        self
93    }
94
95    /// Sets the stdin for the command.
96    pub fn stdin(&mut self, stdin: impl Into<Stdio>) -> &mut Self {
97        self.stdin = Some(stdin.into());
98        self
99    }
100
101    /// Sets the stdout for the command.
102    pub fn stdout(&mut self, stdout: impl Into<Stdio>) -> &mut Self {
103        self.stdout = Some(stdout.into());
104        self
105    }
106
107    /// Sets the stderr for the command.
108    pub fn stderr(&mut self, stderr: impl Into<Stdio>) -> &mut Self {
109        self.stderr = Some(stderr.into());
110        self
111    }
112
113    /// Spawns the command, defaulting to inheriting (relaying, really) the
114    /// current process for stdin, stdout, and stderr.
115    pub async fn spawn(&self) -> anyhow::Result<Child> {
116        self.spawn_inner(&StdioInner::Inherit, true).await
117    }
118
119    /// Spawns the command, capturing the standard output and standard error
120    /// (if they are not already set).
121    pub async fn output(&self) -> anyhow::Result<Output> {
122        let child = self.spawn_inner(&StdioInner::Piped, false).await?;
123        child.wait_with_output().await
124    }
125
126    async fn spawn_inner(
127        &self,
128        default_stdio: &StdioInner,
129        default_stdin: bool,
130    ) -> anyhow::Result<Child> {
131        let (stdin_read, stdin_write) = self
132            .stdin
133            .as_ref()
134            .map_or(
135                if default_stdin {
136                    default_stdio
137                } else {
138                    &StdioInner::Null
139                },
140                |x| &x.0,
141            )
142            .pipes(StdioFd::Stdin);
143
144        let (stdout_read, stdout_write) = self
145            .stdout
146            .as_ref()
147            .map_or(default_stdio, |x| &x.0)
148            .pipes(StdioFd::Stdout);
149        let (stderr_read, stderr_write) = self
150            .stderr
151            .as_ref()
152            .map_or(default_stdio, |x| &x.0)
153            .pipes(StdioFd::Stderr);
154
155        let request = pipette_protocol::ExecuteRequest {
156            program: self.program.clone(),
157            args: self.args.clone(),
158            current_dir: self.current_dir.clone(),
159            stdin: stdin_read,
160            stdout: stdout_write,
161            stderr: stderr_write,
162            env: self.env.clone(),
163            clear_env: self.clear_env,
164        };
165
166        let response = self
167            .client
168            .send
169            .call_failable(PipetteRequest::Execute, request)
170            .await
171            .with_context(|| format!("failed to execute {}", self.program))?;
172
173        Ok(Child {
174            stdin: stdin_write,
175            stdout: stdout_read,
176            stderr: stderr_read,
177            pid: response.pid,
178            result: Ok(response.result),
179        })
180    }
181}
182
183/// Describes what to do with a standard I/O stream for a child process.
184pub struct Stdio(StdioInner);
185
186enum StdioInner {
187    Inherit,
188    Null,
189    Piped,
190}
191
192impl Stdio {
193    /// This stream will be "inherited" from the parent process.
194    ///
195    /// Internally, this will relay the standard input, output, or error of the
196    /// current process to the guest process.
197    pub fn inherit() -> Self {
198        Self(StdioInner::Inherit)
199    }
200
201    /// This stream will be ignored by the child process.
202    pub fn null() -> Self {
203        Self(StdioInner::Null)
204    }
205
206    /// A new pipe will be created to communicate with the child process.
207    pub fn piped() -> Self {
208        Self(StdioInner::Piped)
209    }
210}
211
212enum StdioFd {
213    Stdin,
214    Stdout,
215    Stderr,
216}
217
218impl StdioInner {
219    fn pipes(&self, fd: StdioFd) -> (Option<ReadPipe>, Option<WritePipe>) {
220        match self {
221            StdioInner::Null => (None, None),
222            StdioInner::Piped => {
223                let (read, write) = mesh::pipe::pipe();
224                (Some(read), Some(write))
225            }
226            StdioInner::Inherit => {
227                let (read, mut write) = mesh::pipe::pipe();
228                match fd {
229                    StdioFd::Stdin => {
230                        std::thread::Builder::new()
231                            .name("stdin-relay".to_owned())
232                            .spawn({
233                                move || {
234                                    block_on(futures::io::copy(
235                                        AllowStdIo::new(std::io::stdin()),
236                                        &mut write,
237                                    ))
238                                }
239                            })
240                            .unwrap();
241                        (Some(read), None)
242                    }
243                    StdioFd::Stdout => {
244                        std::thread::Builder::new()
245                            .name("stdout-relay".to_owned())
246                            .spawn({
247                                move || {
248                                    block_on(futures::io::copy(
249                                        read,
250                                        &mut AllowStdIo::new(std::io::stdout()),
251                                    ))
252                                }
253                            })
254                            .unwrap();
255                        (None, Some(write))
256                    }
257                    StdioFd::Stderr => {
258                        std::thread::Builder::new()
259                            .name("stderr-relay".to_owned())
260                            .spawn({
261                                move || {
262                                    block_on(futures::io::copy(
263                                        read,
264                                        &mut AllowStdIo::new(std::io::stderr()),
265                                    ))
266                                }
267                            })
268                            .unwrap();
269                        (None, Some(write))
270                    }
271                }
272            }
273        }
274    }
275}
276
277/// A spawned child process, similar to [`std::process::Child`].
278pub struct Child {
279    /// The standard input pipe of the process.
280    pub stdin: Option<WritePipe>,
281    /// The standard output pipe of the process.
282    pub stdout: Option<ReadPipe>,
283    /// The standard error pipe of the process.
284    pub stderr: Option<ReadPipe>,
285    pid: u32,
286    result: Result<mesh::OneshotReceiver<pipette_protocol::ExitStatus>, ExitStatus>,
287}
288
289impl Child {
290    /// Returns the process ID of the child within the guest.
291    pub fn id(&self) -> u32 {
292        self.pid
293    }
294
295    /// Waits for the child to exit, returning the exit status.
296    pub async fn wait(&mut self) -> Result<ExitStatus, mesh::RecvError> {
297        match &mut self.result {
298            Ok(recv) => {
299                let status = ExitStatus(recv.await?);
300                self.result = Err(status.clone());
301                Ok(status)
302            }
303            Err(status) => Ok(status.clone()),
304        }
305    }
306
307    /// Waits for the child to exit, returning the exit status and the
308    /// remaining data from standard output and standard error.
309    pub async fn wait_with_output(mut self) -> anyhow::Result<Output> {
310        self.stdin = None;
311        let mut stdout = Vec::new();
312        let mut stderr = Vec::new();
313        let stdout_pipe = self.stdout.take();
314        let stderr_pipe = self.stderr.take();
315        let stdout_task = async {
316            if let Some(mut pipe) = stdout_pipe {
317                let _ = pipe.read_to_end(&mut stdout).await;
318            }
319        };
320        let stderr_task = async {
321            if let Some(mut pipe) = stderr_pipe {
322                let _ = pipe.read_to_end(&mut stderr).await;
323            }
324        };
325        let wait_task = self.wait();
326        let (status, (), ()) = (wait_task, stdout_task, stderr_task).join().await;
327        let status = status?;
328        Ok(Output {
329            status,
330            stdout,
331            stderr,
332        })
333    }
334}
335
336/// The exit status of a process.
337#[derive(Debug, Clone)]
338pub struct ExitStatus(pipette_protocol::ExitStatus);
339
340impl ExitStatus {
341    /// Returns `true` if the process exited successfully.
342    pub fn success(&self) -> bool {
343        matches!(self.0, pipette_protocol::ExitStatus::Normal(0))
344    }
345
346    /// Returns the exit code of the process, if it exited normally.
347    pub fn code(&self) -> Option<i32> {
348        match self.0 {
349            pipette_protocol::ExitStatus::Normal(code) => Some(code),
350            pipette_protocol::ExitStatus::Signal(_) | pipette_protocol::ExitStatus::Unknown => None,
351        }
352    }
353
354    /// Returns the signal that terminated the process, if it was terminated
355    /// by a signal.
356    pub fn signal(&self) -> Option<i32> {
357        match self.0 {
358            pipette_protocol::ExitStatus::Signal(signal) => Some(signal),
359            pipette_protocol::ExitStatus::Normal(_) | pipette_protocol::ExitStatus::Unknown => None,
360        }
361    }
362}
363
364impl fmt::Display for ExitStatus {
365    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366        match self.0 {
367            pipette_protocol::ExitStatus::Normal(code) if code >= 0 => {
368                write!(f, "exit code {}", code)
369            }
370            pipette_protocol::ExitStatus::Normal(code) => write!(f, "exit code {:#x}", code as u32),
371            pipette_protocol::ExitStatus::Signal(signal) => {
372                write!(f, "terminated by signal {}", signal)
373            }
374            pipette_protocol::ExitStatus::Unknown => write!(f, "unknown exit status"),
375        }
376    }
377}
378
379/// The result of a process execution.
380pub struct Output {
381    /// The exit status of the process.
382    pub status: ExitStatus,
383    /// The standard output of the process.
384    pub stdout: Vec<u8>,
385    /// The standard error of the process.
386    pub stderr: Vec<u8>,
387}