1use crate::PipetteClient;
7use anyhow::Context;
8use futures::AsyncReadExt;
9use futures::executor::block_on;
10use futures::io::AllowStdIo;
11use futures_concurrency::future::Join;
12use mesh::pipe::ReadPipe;
13use mesh::pipe::WritePipe;
14use pipette_protocol::EnvPair;
15use pipette_protocol::PipetteRequest;
16use std::fmt;
17
18pub struct Command<'a> {
22 client: &'a PipetteClient,
23 program: String,
24 args: Vec<String>,
25 current_dir: Option<String>,
26 stdin: Option<Stdio>,
27 stdout: Option<Stdio>,
28 stderr: Option<Stdio>,
29 env: Vec<EnvPair>,
30 clear_env: bool,
31 chroot: Option<String>,
32 allocate_pty: bool,
33 combine_stderr: bool,
34}
35
36impl<'a> Command<'a> {
37 pub(super) fn new(client: &'a PipetteClient, program: impl AsRef<str>) -> Self {
38 Self {
39 client,
40 program: program.as_ref().to_owned(),
41 args: Vec::new(),
42 current_dir: None,
43 stdin: None,
44 stdout: None,
45 stderr: None,
46 env: Vec::new(),
47 clear_env: false,
48 chroot: None,
49 allocate_pty: false,
50 combine_stderr: false,
51 }
52 }
53
54 pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
56 self.args.push(arg.as_ref().to_owned());
57 self
58 }
59
60 pub fn args<I: IntoIterator>(&mut self, args: I) -> &mut Self
62 where
63 I::Item: AsRef<str>,
64 {
65 self.args
66 .extend(args.into_iter().map(|item| item.as_ref().to_owned()));
67 self
68 }
69
70 pub fn current_dir(&mut self, dir: impl AsRef<str>) -> &mut Self {
72 self.current_dir = Some(dir.as_ref().to_owned());
73 self
74 }
75
76 pub fn env_clear(&mut self) -> &mut Self {
78 self.clear_env = true;
79 self.env.clear();
80 self
81 }
82
83 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
85 self.env.push(EnvPair {
86 name: name.as_ref().to_owned(),
87 value: Some(value.as_ref().to_owned()),
88 });
89 self
90 }
91
92 pub fn env_remove(&mut self, name: impl AsRef<str>) -> &mut Self {
94 self.env.push(EnvPair {
95 name: name.as_ref().to_owned(),
96 value: None,
97 });
98 self
99 }
100
101 pub fn stdin(&mut self, stdin: impl Into<Stdio>) -> &mut Self {
103 self.stdin = Some(stdin.into());
104 self
105 }
106
107 pub fn stdout(&mut self, stdout: impl Into<Stdio>) -> &mut Self {
109 self.stdout = Some(stdout.into());
110 self
111 }
112
113 pub fn stderr(&mut self, stderr: impl Into<Stdio>) -> &mut Self {
115 self.stderr = Some(stderr.into());
116 self
117 }
118
119 pub fn chroot(&mut self, root: impl AsRef<str>) -> &mut Self {
121 self.chroot = Some(root.as_ref().to_owned());
122 self
123 }
124
125 pub fn pty(&mut self, allocate: bool) -> &mut Self {
131 self.allocate_pty = allocate;
132 self
133 }
134
135 pub fn combine_stderr(&mut self, combine: bool) -> &mut Self {
140 self.combine_stderr = combine;
141 self
142 }
143
144 pub async fn spawn(&self) -> anyhow::Result<Child> {
147 self.spawn_inner(&StdioInner::Inherit, true).await
148 }
149
150 pub async fn output(&self) -> anyhow::Result<Output> {
153 let child = self.spawn_inner(&StdioInner::Piped, false).await?;
154 child.wait_with_output().await
155 }
156
157 async fn spawn_inner(
158 &self,
159 default_stdio: &StdioInner,
160 default_stdin: bool,
161 ) -> anyhow::Result<Child> {
162 let (stdin_read, stdin_write) = self
163 .stdin
164 .as_ref()
165 .map_or(
166 if default_stdin {
167 default_stdio
168 } else {
169 &StdioInner::Null
170 },
171 |x| &x.0,
172 )
173 .pipes(StdioFd::Stdin);
174
175 let (stdout_read, stdout_write) = self
176 .stdout
177 .as_ref()
178 .map_or(default_stdio, |x| &x.0)
179 .pipes(StdioFd::Stdout);
180 let (stderr_read, stderr_write) = if self.combine_stderr {
181 (None, None)
182 } else {
183 self.stderr
184 .as_ref()
185 .map_or(default_stdio, |x| &x.0)
186 .pipes(StdioFd::Stderr)
187 };
188
189 let request = pipette_protocol::ExecuteRequest {
190 program: self.program.clone(),
191 args: self.args.clone(),
192 current_dir: self.current_dir.clone(),
193 stdin: stdin_read,
194 stdout: stdout_write,
195 stderr: stderr_write,
196 env: self.env.clone(),
197 clear_env: self.clear_env,
198 chroot: self.chroot.clone(),
199 allocate_pty: self.allocate_pty,
200 combine_stderr: self.combine_stderr,
201 };
202
203 let response = self
204 .client
205 .send
206 .call_failable(PipetteRequest::Execute, request)
207 .await
208 .with_context(|| format!("failed to execute {}", self.program))?;
209
210 Ok(Child {
211 stdin: stdin_write,
212 stdout: stdout_read,
213 stderr: stderr_read,
214 pid: response.pid,
215 result: Ok(response.result),
216 })
217 }
218}
219
220pub struct Stdio(StdioInner);
222
223enum StdioInner {
224 Inherit,
225 Null,
226 Piped,
227}
228
229impl Stdio {
230 pub fn inherit() -> Self {
235 Self(StdioInner::Inherit)
236 }
237
238 pub fn null() -> Self {
240 Self(StdioInner::Null)
241 }
242
243 pub fn piped() -> Self {
245 Self(StdioInner::Piped)
246 }
247}
248
249enum StdioFd {
250 Stdin,
251 Stdout,
252 Stderr,
253}
254
255impl StdioInner {
256 fn pipes(&self, fd: StdioFd) -> (Option<ReadPipe>, Option<WritePipe>) {
257 match self {
258 StdioInner::Null => (None, None),
259 StdioInner::Piped => {
260 let (read, write) = mesh::pipe::pipe();
261 (Some(read), Some(write))
262 }
263 StdioInner::Inherit => {
264 let (read, mut write) = mesh::pipe::pipe();
265 match fd {
266 StdioFd::Stdin => {
267 std::thread::Builder::new()
268 .name("stdin-relay".to_owned())
269 .spawn({
270 move || {
271 block_on(futures::io::copy(
272 AllowStdIo::new(std::io::stdin()),
273 &mut write,
274 ))
275 }
276 })
277 .unwrap();
278 (Some(read), None)
279 }
280 StdioFd::Stdout => {
281 std::thread::Builder::new()
282 .name("stdout-relay".to_owned())
283 .spawn({
284 move || {
285 block_on(futures::io::copy(
286 read,
287 &mut AllowStdIo::new(term::raw_stdout()),
288 ))
289 }
290 })
291 .unwrap();
292 (None, Some(write))
293 }
294 StdioFd::Stderr => {
295 std::thread::Builder::new()
296 .name("stderr-relay".to_owned())
297 .spawn({
298 move || {
299 block_on(futures::io::copy(
300 read,
301 &mut AllowStdIo::new(term::raw_stderr()),
302 ))
303 }
304 })
305 .unwrap();
306 (None, Some(write))
307 }
308 }
309 }
310 }
311 }
312}
313
314pub struct Child {
316 pub stdin: Option<WritePipe>,
318 pub stdout: Option<ReadPipe>,
320 pub stderr: Option<ReadPipe>,
322 pid: u32,
323 result: Result<mesh::OneshotReceiver<pipette_protocol::ExitStatus>, ExitStatus>,
324}
325
326impl Child {
327 pub fn id(&self) -> u32 {
329 self.pid
330 }
331
332 pub async fn wait(&mut self) -> Result<ExitStatus, mesh::RecvError> {
334 match &mut self.result {
335 Ok(recv) => {
336 let status = ExitStatus(recv.await?);
337 self.result = Err(status.clone());
338 Ok(status)
339 }
340 Err(status) => Ok(status.clone()),
341 }
342 }
343
344 pub async fn wait_with_output(mut self) -> anyhow::Result<Output> {
347 self.stdin = None;
348 let mut stdout = Vec::new();
349 let mut stderr = Vec::new();
350 let stdout_pipe = self.stdout.take();
351 let stderr_pipe = self.stderr.take();
352 let stdout_task = async {
353 if let Some(mut pipe) = stdout_pipe {
354 let _ = pipe.read_to_end(&mut stdout).await;
355 }
356 };
357 let stderr_task = async {
358 if let Some(mut pipe) = stderr_pipe {
359 let _ = pipe.read_to_end(&mut stderr).await;
360 }
361 };
362 let wait_task = self.wait();
363 let (status, (), ()) = (wait_task, stdout_task, stderr_task).join().await;
364 let status = status?;
365 Ok(Output {
366 status,
367 stdout,
368 stderr,
369 })
370 }
371}
372
373#[derive(Debug, Clone)]
375pub struct ExitStatus(pipette_protocol::ExitStatus);
376
377impl ExitStatus {
378 pub fn success(&self) -> bool {
380 matches!(self.0, pipette_protocol::ExitStatus::Normal(0))
381 }
382
383 pub fn code(&self) -> Option<i32> {
385 match self.0 {
386 pipette_protocol::ExitStatus::Normal(code) => Some(code),
387 pipette_protocol::ExitStatus::Signal(_) | pipette_protocol::ExitStatus::Unknown => None,
388 }
389 }
390
391 pub fn signal(&self) -> Option<i32> {
394 match self.0 {
395 pipette_protocol::ExitStatus::Signal(signal) => Some(signal),
396 pipette_protocol::ExitStatus::Normal(_) | pipette_protocol::ExitStatus::Unknown => None,
397 }
398 }
399}
400
401impl fmt::Display for ExitStatus {
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 match self.0 {
404 pipette_protocol::ExitStatus::Normal(code) if code >= 0 => {
405 write!(f, "exit code {}", code)
406 }
407 pipette_protocol::ExitStatus::Normal(code) => write!(f, "exit code {:#x}", code as u32),
408 pipette_protocol::ExitStatus::Signal(signal) => {
409 write!(f, "terminated by signal {}", signal)
410 }
411 pipette_protocol::ExitStatus::Unknown => write!(f, "unknown exit status"),
412 }
413 }
414}
415
416pub struct Output {
418 pub status: ExitStatus,
420 pub stdout: Vec<u8>,
422 pub stderr: Vec<u8>,
424}