1use crate::PipetteClient;
7use anyhow::Context;
8use futures::AsyncReadExt;
9use futures::executor::block_on;
10use futures::io::AllowStdIo;
11use futures_concurrency::future::Join;
12use mesh::pipe::ReadPipe;
13use mesh::pipe::WritePipe;
14use pipette_protocol::EnvPair;
15use pipette_protocol::PipetteRequest;
16use std::fmt;
17
18pub struct Command<'a> {
22 client: &'a PipetteClient,
23 program: String,
24 args: Vec<String>,
25 current_dir: Option<String>,
26 stdin: Option<Stdio>,
27 stdout: Option<Stdio>,
28 stderr: Option<Stdio>,
29 env: Vec<EnvPair>,
30 clear_env: bool,
31}
32
33impl<'a> Command<'a> {
34 pub(super) fn new(client: &'a PipetteClient, program: impl AsRef<str>) -> Self {
35 Self {
36 client,
37 program: program.as_ref().to_owned(),
38 args: Vec::new(),
39 current_dir: None,
40 stdin: None,
41 stdout: None,
42 stderr: None,
43 env: Vec::new(),
44 clear_env: false,
45 }
46 }
47
48 pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
50 self.args.push(arg.as_ref().to_owned());
51 self
52 }
53
54 pub fn args<I: IntoIterator>(&mut self, args: I) -> &mut Self
56 where
57 I::Item: AsRef<str>,
58 {
59 self.args
60 .extend(args.into_iter().map(|item| item.as_ref().to_owned()));
61 self
62 }
63
64 pub fn current_dir(&mut self, dir: impl AsRef<str>) -> &mut Self {
66 self.current_dir = Some(dir.as_ref().to_owned());
67 self
68 }
69
70 pub fn env_clear(&mut self) -> &mut Self {
72 self.clear_env = true;
73 self.env.clear();
74 self
75 }
76
77 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
79 self.env.push(EnvPair {
80 name: name.as_ref().to_owned(),
81 value: Some(value.as_ref().to_owned()),
82 });
83 self
84 }
85
86 pub fn env_remove(&mut self, name: impl AsRef<str>) -> &mut Self {
88 self.env.push(EnvPair {
89 name: name.as_ref().to_owned(),
90 value: None,
91 });
92 self
93 }
94
95 pub fn stdin(&mut self, stdin: impl Into<Stdio>) -> &mut Self {
97 self.stdin = Some(stdin.into());
98 self
99 }
100
101 pub fn stdout(&mut self, stdout: impl Into<Stdio>) -> &mut Self {
103 self.stdout = Some(stdout.into());
104 self
105 }
106
107 pub fn stderr(&mut self, stderr: impl Into<Stdio>) -> &mut Self {
109 self.stderr = Some(stderr.into());
110 self
111 }
112
113 pub async fn spawn(&self) -> anyhow::Result<Child> {
116 self.spawn_inner(&StdioInner::Inherit, true).await
117 }
118
119 pub async fn output(&self) -> anyhow::Result<Output> {
122 let child = self.spawn_inner(&StdioInner::Piped, false).await?;
123 child.wait_with_output().await
124 }
125
126 async fn spawn_inner(
127 &self,
128 default_stdio: &StdioInner,
129 default_stdin: bool,
130 ) -> anyhow::Result<Child> {
131 let (stdin_read, stdin_write) = self
132 .stdin
133 .as_ref()
134 .map_or(
135 if default_stdin {
136 default_stdio
137 } else {
138 &StdioInner::Null
139 },
140 |x| &x.0,
141 )
142 .pipes(StdioFd::Stdin);
143
144 let (stdout_read, stdout_write) = self
145 .stdout
146 .as_ref()
147 .map_or(default_stdio, |x| &x.0)
148 .pipes(StdioFd::Stdout);
149 let (stderr_read, stderr_write) = self
150 .stderr
151 .as_ref()
152 .map_or(default_stdio, |x| &x.0)
153 .pipes(StdioFd::Stderr);
154
155 let request = pipette_protocol::ExecuteRequest {
156 program: self.program.clone(),
157 args: self.args.clone(),
158 current_dir: self.current_dir.clone(),
159 stdin: stdin_read,
160 stdout: stdout_write,
161 stderr: stderr_write,
162 env: self.env.clone(),
163 clear_env: self.clear_env,
164 };
165
166 let response = self
167 .client
168 .send
169 .call_failable(PipetteRequest::Execute, request)
170 .await
171 .with_context(|| format!("failed to execute {}", self.program))?;
172
173 Ok(Child {
174 stdin: stdin_write,
175 stdout: stdout_read,
176 stderr: stderr_read,
177 pid: response.pid,
178 result: Ok(response.result),
179 })
180 }
181}
182
183pub struct Stdio(StdioInner);
185
186enum StdioInner {
187 Inherit,
188 Null,
189 Piped,
190}
191
192impl Stdio {
193 pub fn inherit() -> Self {
198 Self(StdioInner::Inherit)
199 }
200
201 pub fn null() -> Self {
203 Self(StdioInner::Null)
204 }
205
206 pub fn piped() -> Self {
208 Self(StdioInner::Piped)
209 }
210}
211
212enum StdioFd {
213 Stdin,
214 Stdout,
215 Stderr,
216}
217
218impl StdioInner {
219 fn pipes(&self, fd: StdioFd) -> (Option<ReadPipe>, Option<WritePipe>) {
220 match self {
221 StdioInner::Null => (None, None),
222 StdioInner::Piped => {
223 let (read, write) = mesh::pipe::pipe();
224 (Some(read), Some(write))
225 }
226 StdioInner::Inherit => {
227 let (read, mut write) = mesh::pipe::pipe();
228 match fd {
229 StdioFd::Stdin => {
230 std::thread::Builder::new()
231 .name("stdin-relay".to_owned())
232 .spawn({
233 move || {
234 block_on(futures::io::copy(
235 AllowStdIo::new(std::io::stdin()),
236 &mut write,
237 ))
238 }
239 })
240 .unwrap();
241 (Some(read), None)
242 }
243 StdioFd::Stdout => {
244 std::thread::Builder::new()
245 .name("stdout-relay".to_owned())
246 .spawn({
247 move || {
248 block_on(futures::io::copy(
249 read,
250 &mut AllowStdIo::new(std::io::stdout()),
251 ))
252 }
253 })
254 .unwrap();
255 (None, Some(write))
256 }
257 StdioFd::Stderr => {
258 std::thread::Builder::new()
259 .name("stderr-relay".to_owned())
260 .spawn({
261 move || {
262 block_on(futures::io::copy(
263 read,
264 &mut AllowStdIo::new(std::io::stderr()),
265 ))
266 }
267 })
268 .unwrap();
269 (None, Some(write))
270 }
271 }
272 }
273 }
274 }
275}
276
277pub struct Child {
279 pub stdin: Option<WritePipe>,
281 pub stdout: Option<ReadPipe>,
283 pub stderr: Option<ReadPipe>,
285 pid: u32,
286 result: Result<mesh::OneshotReceiver<pipette_protocol::ExitStatus>, ExitStatus>,
287}
288
289impl Child {
290 pub fn id(&self) -> u32 {
292 self.pid
293 }
294
295 pub async fn wait(&mut self) -> Result<ExitStatus, mesh::RecvError> {
297 match &mut self.result {
298 Ok(recv) => {
299 let status = ExitStatus(recv.await?);
300 self.result = Err(status.clone());
301 Ok(status)
302 }
303 Err(status) => Ok(status.clone()),
304 }
305 }
306
307 pub async fn wait_with_output(mut self) -> anyhow::Result<Output> {
310 self.stdin = None;
311 let mut stdout = Vec::new();
312 let mut stderr = Vec::new();
313 let stdout_pipe = self.stdout.take();
314 let stderr_pipe = self.stderr.take();
315 let stdout_task = async {
316 if let Some(mut pipe) = stdout_pipe {
317 let _ = pipe.read_to_end(&mut stdout).await;
318 }
319 };
320 let stderr_task = async {
321 if let Some(mut pipe) = stderr_pipe {
322 let _ = pipe.read_to_end(&mut stderr).await;
323 }
324 };
325 let wait_task = self.wait();
326 let (status, (), ()) = (wait_task, stdout_task, stderr_task).join().await;
327 let status = status?;
328 Ok(Output {
329 status,
330 stdout,
331 stderr,
332 })
333 }
334}
335
336#[derive(Debug, Clone)]
338pub struct ExitStatus(pipette_protocol::ExitStatus);
339
340impl ExitStatus {
341 pub fn success(&self) -> bool {
343 matches!(self.0, pipette_protocol::ExitStatus::Normal(0))
344 }
345
346 pub fn code(&self) -> Option<i32> {
348 match self.0 {
349 pipette_protocol::ExitStatus::Normal(code) => Some(code),
350 pipette_protocol::ExitStatus::Signal(_) | pipette_protocol::ExitStatus::Unknown => None,
351 }
352 }
353
354 pub fn signal(&self) -> Option<i32> {
357 match self.0 {
358 pipette_protocol::ExitStatus::Signal(signal) => Some(signal),
359 pipette_protocol::ExitStatus::Normal(_) | pipette_protocol::ExitStatus::Unknown => None,
360 }
361 }
362}
363
364impl fmt::Display for ExitStatus {
365 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366 match self.0 {
367 pipette_protocol::ExitStatus::Normal(code) if code >= 0 => {
368 write!(f, "exit code {}", code)
369 }
370 pipette_protocol::ExitStatus::Normal(code) => write!(f, "exit code {:#x}", code as u32),
371 pipette_protocol::ExitStatus::Signal(signal) => {
372 write!(f, "terminated by signal {}", signal)
373 }
374 pipette_protocol::ExitStatus::Unknown => write!(f, "unknown exit status"),
375 }
376 }
377}
378
379pub struct Output {
381 pub status: ExitStatus,
383 pub stdout: Vec<u8>,
385 pub stderr: Vec<u8>,
387}