1use crate::PipetteClient;
7use anyhow::Context;
8use futures::AsyncReadExt;
9use futures::executor::block_on;
10use futures::io::AllowStdIo;
11use futures_concurrency::future::Join;
12use mesh::pipe::ReadPipe;
13use mesh::pipe::WritePipe;
14use pipette_protocol::EnvPair;
15use pipette_protocol::PipetteRequest;
16use std::fmt;
17
18pub struct Command<'a> {
22 client: &'a PipetteClient,
23 program: String,
24 args: Vec<String>,
25 current_dir: Option<String>,
26 stdin: Option<Stdio>,
27 stdout: Option<Stdio>,
28 stderr: Option<Stdio>,
29 env: Vec<EnvPair>,
30 clear_env: bool,
31 chroot: Option<String>,
32}
33
34impl<'a> Command<'a> {
35 pub(super) fn new(client: &'a PipetteClient, program: impl AsRef<str>) -> Self {
36 Self {
37 client,
38 program: program.as_ref().to_owned(),
39 args: Vec::new(),
40 current_dir: None,
41 stdin: None,
42 stdout: None,
43 stderr: None,
44 env: Vec::new(),
45 clear_env: false,
46 chroot: None,
47 }
48 }
49
50 pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
52 self.args.push(arg.as_ref().to_owned());
53 self
54 }
55
56 pub fn args<I: IntoIterator>(&mut self, args: I) -> &mut Self
58 where
59 I::Item: AsRef<str>,
60 {
61 self.args
62 .extend(args.into_iter().map(|item| item.as_ref().to_owned()));
63 self
64 }
65
66 pub fn current_dir(&mut self, dir: impl AsRef<str>) -> &mut Self {
68 self.current_dir = Some(dir.as_ref().to_owned());
69 self
70 }
71
72 pub fn env_clear(&mut self) -> &mut Self {
74 self.clear_env = true;
75 self.env.clear();
76 self
77 }
78
79 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
81 self.env.push(EnvPair {
82 name: name.as_ref().to_owned(),
83 value: Some(value.as_ref().to_owned()),
84 });
85 self
86 }
87
88 pub fn env_remove(&mut self, name: impl AsRef<str>) -> &mut Self {
90 self.env.push(EnvPair {
91 name: name.as_ref().to_owned(),
92 value: None,
93 });
94 self
95 }
96
97 pub fn stdin(&mut self, stdin: impl Into<Stdio>) -> &mut Self {
99 self.stdin = Some(stdin.into());
100 self
101 }
102
103 pub fn stdout(&mut self, stdout: impl Into<Stdio>) -> &mut Self {
105 self.stdout = Some(stdout.into());
106 self
107 }
108
109 pub fn stderr(&mut self, stderr: impl Into<Stdio>) -> &mut Self {
111 self.stderr = Some(stderr.into());
112 self
113 }
114
115 pub fn chroot(&mut self, root: impl AsRef<str>) -> &mut Self {
117 self.chroot = Some(root.as_ref().to_owned());
118 self
119 }
120
121 pub async fn spawn(&self) -> anyhow::Result<Child> {
124 self.spawn_inner(&StdioInner::Inherit, true).await
125 }
126
127 pub async fn output(&self) -> anyhow::Result<Output> {
130 let child = self.spawn_inner(&StdioInner::Piped, false).await?;
131 child.wait_with_output().await
132 }
133
134 async fn spawn_inner(
135 &self,
136 default_stdio: &StdioInner,
137 default_stdin: bool,
138 ) -> anyhow::Result<Child> {
139 let (stdin_read, stdin_write) = self
140 .stdin
141 .as_ref()
142 .map_or(
143 if default_stdin {
144 default_stdio
145 } else {
146 &StdioInner::Null
147 },
148 |x| &x.0,
149 )
150 .pipes(StdioFd::Stdin);
151
152 let (stdout_read, stdout_write) = self
153 .stdout
154 .as_ref()
155 .map_or(default_stdio, |x| &x.0)
156 .pipes(StdioFd::Stdout);
157 let (stderr_read, stderr_write) = self
158 .stderr
159 .as_ref()
160 .map_or(default_stdio, |x| &x.0)
161 .pipes(StdioFd::Stderr);
162
163 let request = pipette_protocol::ExecuteRequest {
164 program: self.program.clone(),
165 args: self.args.clone(),
166 current_dir: self.current_dir.clone(),
167 stdin: stdin_read,
168 stdout: stdout_write,
169 stderr: stderr_write,
170 env: self.env.clone(),
171 clear_env: self.clear_env,
172 chroot: self.chroot.clone(),
173 };
174
175 let response = self
176 .client
177 .send
178 .call_failable(PipetteRequest::Execute, request)
179 .await
180 .with_context(|| format!("failed to execute {}", self.program))?;
181
182 Ok(Child {
183 stdin: stdin_write,
184 stdout: stdout_read,
185 stderr: stderr_read,
186 pid: response.pid,
187 result: Ok(response.result),
188 })
189 }
190}
191
192pub struct Stdio(StdioInner);
194
195enum StdioInner {
196 Inherit,
197 Null,
198 Piped,
199}
200
201impl Stdio {
202 pub fn inherit() -> Self {
207 Self(StdioInner::Inherit)
208 }
209
210 pub fn null() -> Self {
212 Self(StdioInner::Null)
213 }
214
215 pub fn piped() -> Self {
217 Self(StdioInner::Piped)
218 }
219}
220
221enum StdioFd {
222 Stdin,
223 Stdout,
224 Stderr,
225}
226
227impl StdioInner {
228 fn pipes(&self, fd: StdioFd) -> (Option<ReadPipe>, Option<WritePipe>) {
229 match self {
230 StdioInner::Null => (None, None),
231 StdioInner::Piped => {
232 let (read, write) = mesh::pipe::pipe();
233 (Some(read), Some(write))
234 }
235 StdioInner::Inherit => {
236 let (read, mut write) = mesh::pipe::pipe();
237 match fd {
238 StdioFd::Stdin => {
239 std::thread::Builder::new()
240 .name("stdin-relay".to_owned())
241 .spawn({
242 move || {
243 block_on(futures::io::copy(
244 AllowStdIo::new(std::io::stdin()),
245 &mut write,
246 ))
247 }
248 })
249 .unwrap();
250 (Some(read), None)
251 }
252 StdioFd::Stdout => {
253 std::thread::Builder::new()
254 .name("stdout-relay".to_owned())
255 .spawn({
256 move || {
257 block_on(futures::io::copy(
258 read,
259 &mut AllowStdIo::new(std::io::stdout()),
260 ))
261 }
262 })
263 .unwrap();
264 (None, Some(write))
265 }
266 StdioFd::Stderr => {
267 std::thread::Builder::new()
268 .name("stderr-relay".to_owned())
269 .spawn({
270 move || {
271 block_on(futures::io::copy(
272 read,
273 &mut AllowStdIo::new(std::io::stderr()),
274 ))
275 }
276 })
277 .unwrap();
278 (None, Some(write))
279 }
280 }
281 }
282 }
283 }
284}
285
286pub struct Child {
288 pub stdin: Option<WritePipe>,
290 pub stdout: Option<ReadPipe>,
292 pub stderr: Option<ReadPipe>,
294 pid: u32,
295 result: Result<mesh::OneshotReceiver<pipette_protocol::ExitStatus>, ExitStatus>,
296}
297
298impl Child {
299 pub fn id(&self) -> u32 {
301 self.pid
302 }
303
304 pub async fn wait(&mut self) -> Result<ExitStatus, mesh::RecvError> {
306 match &mut self.result {
307 Ok(recv) => {
308 let status = ExitStatus(recv.await?);
309 self.result = Err(status.clone());
310 Ok(status)
311 }
312 Err(status) => Ok(status.clone()),
313 }
314 }
315
316 pub async fn wait_with_output(mut self) -> anyhow::Result<Output> {
319 self.stdin = None;
320 let mut stdout = Vec::new();
321 let mut stderr = Vec::new();
322 let stdout_pipe = self.stdout.take();
323 let stderr_pipe = self.stderr.take();
324 let stdout_task = async {
325 if let Some(mut pipe) = stdout_pipe {
326 let _ = pipe.read_to_end(&mut stdout).await;
327 }
328 };
329 let stderr_task = async {
330 if let Some(mut pipe) = stderr_pipe {
331 let _ = pipe.read_to_end(&mut stderr).await;
332 }
333 };
334 let wait_task = self.wait();
335 let (status, (), ()) = (wait_task, stdout_task, stderr_task).join().await;
336 let status = status?;
337 Ok(Output {
338 status,
339 stdout,
340 stderr,
341 })
342 }
343}
344
345#[derive(Debug, Clone)]
347pub struct ExitStatus(pipette_protocol::ExitStatus);
348
349impl ExitStatus {
350 pub fn success(&self) -> bool {
352 matches!(self.0, pipette_protocol::ExitStatus::Normal(0))
353 }
354
355 pub fn code(&self) -> Option<i32> {
357 match self.0 {
358 pipette_protocol::ExitStatus::Normal(code) => Some(code),
359 pipette_protocol::ExitStatus::Signal(_) | pipette_protocol::ExitStatus::Unknown => None,
360 }
361 }
362
363 pub fn signal(&self) -> Option<i32> {
366 match self.0 {
367 pipette_protocol::ExitStatus::Signal(signal) => Some(signal),
368 pipette_protocol::ExitStatus::Normal(_) | pipette_protocol::ExitStatus::Unknown => None,
369 }
370 }
371}
372
373impl fmt::Display for ExitStatus {
374 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
375 match self.0 {
376 pipette_protocol::ExitStatus::Normal(code) if code >= 0 => {
377 write!(f, "exit code {}", code)
378 }
379 pipette_protocol::ExitStatus::Normal(code) => write!(f, "exit code {:#x}", code as u32),
380 pipette_protocol::ExitStatus::Signal(signal) => {
381 write!(f, "terminated by signal {}", signal)
382 }
383 pipette_protocol::ExitStatus::Unknown => write!(f, "unknown exit status"),
384 }
385 }
386}
387
388pub struct Output {
390 pub status: ExitStatus,
392 pub stdout: Vec<u8>,
394 pub stderr: Vec<u8>,
396}