pipette/
execute.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Handler for the execute request.
5
6#![cfg(any(target_os = "linux", target_os = "windows"))]
7
8use futures::executor::block_on;
9use futures::io::AllowStdIo;
10use std::process::Stdio;
11
12pub fn handle_execute(
13    mut request: pipette_protocol::ExecuteRequest,
14) -> anyhow::Result<pipette_protocol::ExecuteResponse> {
15    tracing::debug!(?request, "execute request");
16
17    let mut command = std::process::Command::new(&request.program);
18    command.args(&request.args);
19    if let Some(dir) = &request.current_dir {
20        command.current_dir(dir);
21    }
22    if request.clear_env {
23        command.env_clear();
24    }
25    for pipette_protocol::EnvPair { name, value } in request.env {
26        if let Some(value) = value {
27            command.env(name, value);
28        } else {
29            command.env_remove(name);
30        }
31    }
32    if request.stdin.is_some() {
33        command.stdin(Stdio::piped());
34    } else {
35        command.stdin(Stdio::null());
36    }
37    if request.stdout.is_some() {
38        command.stdout(Stdio::piped());
39    } else {
40        command.stdout(Stdio::null());
41    }
42    if request.stderr.is_some() {
43        command.stderr(Stdio::piped());
44    } else {
45        command.stderr(Stdio::null());
46    }
47    let mut child = command.spawn()?;
48    let pid = child.id();
49    let (send, recv) = mesh::oneshot();
50
51    if let (Some(stdin_write), Some(stdin_read)) = (child.stdin.take(), request.stdin.take()) {
52        std::thread::spawn(move || {
53            let _ = block_on(futures::io::copy(
54                stdin_read,
55                &mut AllowStdIo::new(stdin_write),
56            ));
57        });
58    }
59    if let (Some(stdout_read), Some(mut stdout_write)) =
60        (child.stdout.take(), request.stdout.take())
61    {
62        std::thread::spawn(move || {
63            let _ = block_on(futures::io::copy(
64                AllowStdIo::new(stdout_read),
65                &mut stdout_write,
66            ));
67        });
68    }
69    if let (Some(stderr_read), Some(mut stderr_write)) =
70        (child.stderr.take(), request.stderr.take())
71    {
72        std::thread::spawn(move || {
73            let _ = block_on(futures::io::copy(
74                AllowStdIo::new(stderr_read),
75                &mut stderr_write,
76            ));
77        });
78    }
79
80    std::thread::spawn(move || {
81        let exit_status = child.wait().unwrap();
82        let status = convert_exit_status(exit_status);
83        tracing::debug!(pid, ?status, "process exited");
84        send.send(status);
85    });
86    Ok(pipette_protocol::ExecuteResponse { pid, result: recv })
87}
88
89fn convert_exit_status(exit_status: std::process::ExitStatus) -> pipette_protocol::ExitStatus {
90    if let Some(code) = exit_status.code() {
91        return pipette_protocol::ExitStatus::Normal(code);
92    }
93
94    #[cfg(unix)]
95    if let Some(signal) = std::os::unix::process::ExitStatusExt::signal(&exit_status) {
96        return pipette_protocol::ExitStatus::Signal(signal);
97    }
98
99    pipette_protocol::ExitStatus::Unknown
100}