diag_client/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! The client for connecting to the Underhill diagnostics server.
5
6#![forbid(unsafe_code)]
7
8pub mod kmsg_stream;
9
10use anyhow::Context;
11use diag_proto::ExecRequest;
12use diag_proto::WaitRequest;
13use diag_proto::WaitResponse;
14use diag_proto::network_packet_capture_request::OpData;
15use diag_proto::network_packet_capture_request::Operation;
16use futures::AsyncReadExt;
17use futures::AsyncWrite;
18use futures::AsyncWriteExt;
19use inspect::Node;
20use inspect::ValueKind;
21use kmsg_stream::KmsgStream;
22use mesh_rpc::service::Status;
23use pal_async::driver::Driver;
24use pal_async::socket::PolledSocket;
25use pal_async::task::Spawn;
26use std::io::ErrorKind;
27use std::path::Path;
28use std::path::PathBuf;
29use std::time::Duration;
30use thiserror::Error;
31
32#[cfg(windows)]
33/// Functions for Hyper-V
34pub mod hyperv {
35    use super::ConnectError;
36    use anyhow::Context;
37    use guid::Guid;
38    use pal_async::driver::Driver;
39    use pal_async::socket::PolledSocket;
40    use pal_async::timer::PolledTimer;
41    use std::fs::File;
42    use std::io::Write;
43    use std::process::Command;
44    use std::time::Duration;
45    use vmsocket::VmAddress;
46    use vmsocket::VmSocket;
47    use vmsocket::VmStream;
48
49    /// Defines how to access the serial port
50    pub enum ComPortAccessInfo<'a> {
51        /// Access by number
52        NameAndPortNumber(&'a str, u32),
53        /// Access through a named pipe
54        PortPipePath(&'a str),
55    }
56
57    /// Get ID from name
58    pub fn vm_id_from_name(name: &str) -> anyhow::Result<Guid> {
59        let output = Command::new("hvc.exe")
60            .arg("id")
61            .arg(name)
62            .output()
63            .context("failed to launch hvc")?;
64
65        if output.status.success() {
66            let stdout = std::str::from_utf8(&output.stdout)
67                .context("failed to parse hvc output")?
68                .trim();
69            Ok(stdout
70                .parse()
71                .with_context(|| format!("failed to parse VM ID '{}'", &stdout))?)
72        } else {
73            anyhow::bail!(
74                "{}",
75                std::str::from_utf8(&output.stderr).context("failed to parse hvc error output")?
76            )
77        }
78    }
79
80    /// Connect to Hyper-V socket
81    pub async fn connect_vsock(
82        driver: &(impl Driver + ?Sized),
83        vm_id: Guid,
84        port: u32,
85    ) -> Result<VmStream, ConnectError> {
86        let socket = VmSocket::new()
87            .context("failed to create AF_HYPERV socket")
88            .map_err(ConnectError::other)?;
89
90        socket
91            .set_connect_timeout(Duration::from_secs(1))
92            .context("failed to set connect timeout")
93            .map_err(ConnectError::other)?;
94
95        socket
96            .set_high_vtl(true)
97            .context("failed to set socket for VTL2")
98            .map_err(ConnectError::other)?;
99
100        let mut socket: PolledSocket<socket2::Socket> = PolledSocket::new(driver, socket.into())
101            .context("failed to create polled socket")
102            .map_err(ConnectError::other)?;
103
104        socket
105            .connect(&VmAddress::hyperv_vsock(vm_id, port).into())
106            .await
107            .map_err(ConnectError::connect)?;
108
109        Ok(socket.convert().into_inner())
110    }
111
112    /// Opens a serial port on a Hyper-V VM.
113    ///
114    /// If the VM is not running, it will periodically try to connect to the
115    /// pipe until the VM starts running. In theory, we could instead create a
116    /// named pipe server, which Hyper-V would connect to when the VM starts.
117    /// However, in this mode, once the named pipe is disconnected, Hyper-V
118    /// stops trying to reconnect until the VM is powered off and powered on
119    /// again, so don't do that.
120    pub async fn open_serial_port(
121        driver: &(impl Driver + ?Sized),
122        port: ComPortAccessInfo<'_>,
123    ) -> anyhow::Result<File> {
124        let path = match port {
125            ComPortAccessInfo::NameAndPortNumber(vm, num) => {
126                let output = Command::new("powershell.exe")
127                    .arg("-NoProfile")
128                    .arg(format!(
129                        r#"$x = Get-VMComPort "{vm}" -Number {num} -ErrorAction Stop; $x.Path"#,
130                    ))
131                    .output()
132                    .context("failed to query VM com port")?;
133
134                if !output.status.success() {
135                    let _ = std::io::stderr().write_all(&output.stderr);
136                    anyhow::bail!(
137                        "failed to query VM com port: exit status {}",
138                        output.status.code().unwrap()
139                    );
140                }
141                &String::from_utf8(output.stdout)?
142            }
143            ComPortAccessInfo::PortPipePath(path) => path,
144        };
145
146        let path = path.trim();
147        if path.is_empty() {
148            anyhow::bail!("Requested VM COM port is not configured");
149        }
150
151        let mut timer = None;
152        let pipe = loop {
153            match fs_err::OpenOptions::new().read(true).write(true).open(path) {
154                Ok(pipe) => break pipe.into(),
155                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
156                    // The VM is not running. Wait a bit and try again.
157                    timer
158                        .get_or_insert_with(|| PolledTimer::new(driver))
159                        .sleep(Duration::from_millis(100))
160                        .await;
161                }
162                Err(err) => Err(err)?,
163            }
164        };
165
166        Ok(pipe)
167    }
168}
169
170/// Connect to a vsock with port and path
171pub async fn connect_hybrid_vsock(
172    driver: &(impl Driver + ?Sized),
173    path: &Path,
174    port: u32,
175) -> Result<PolledSocket<socket2::Socket>, ConnectError> {
176    let socket = unix_socket::UnixStream::connect(path).map_err(ConnectError::connect)?;
177    let mut socket = PolledSocket::new(driver, socket).map_err(ConnectError::other)?;
178    socket
179        .write_all(format!("CONNECT {port}\n").as_bytes())
180        .await
181        .map_err(ConnectError::other)?;
182
183    let mut ok = [0; 3];
184    socket
185        .read_exact(&mut ok)
186        .await
187        .map_err(ConnectError::other)?;
188    if &ok != b"OK " {
189        // FUTURE: consider returning an error that can be retried. This may
190        // require some changes to the hybrid vsock protocol, unclear.
191        return Err(ConnectError::other(anyhow::anyhow!(
192            "missing hybrid vsock response"
193        )));
194    }
195
196    for _ in 0.."4294967295\n".len() {
197        let mut b = [0];
198        socket
199            .read_exact(&mut b)
200            .await
201            .map_err(ConnectError::other)?;
202        if b[0] == b'\n' {
203            // Don't need to parse the host port number.
204            return Ok(socket.convert());
205        }
206    }
207    Err(ConnectError::other(anyhow::anyhow!(
208        "invalid hybrid vsock response"
209    )))
210}
211
212enum SocketType<'a> {
213    #[cfg(windows)]
214    VmId {
215        vm_id: guid::Guid,
216        port: u32,
217    },
218    HybridVsock {
219        path: &'a Path,
220        port: u32,
221    },
222}
223
224async fn new_data_connection(
225    driver: &(impl Driver + ?Sized),
226    typ: SocketType<'_>,
227) -> anyhow::Result<(u64, PolledSocket<socket2::Socket>)> {
228    let mut socket = match typ {
229        #[cfg(windows)]
230        SocketType::VmId { vm_id, port } => {
231            let socket = hyperv::connect_vsock(driver, vm_id, port).await?;
232            PolledSocket::new(driver, socket2::Socket::from(socket))?
233        }
234        SocketType::HybridVsock { path, port } => connect_hybrid_vsock(driver, path, port).await?,
235    };
236
237    // Read the 8 byte connection id which is always sent first on the connection.
238    let mut id = [0; 8];
239    socket
240        .read_exact(&mut id)
241        .await
242        .context("reading connection id")?;
243    let id = u64::from_ne_bytes(id);
244    Ok((id, socket))
245}
246
247/// Represents different VM types.
248#[derive(Clone)]
249enum VmType {
250    /// A Hyper-V VM represented by a VM ID GUID, which uses a VmSocket to connect.
251    #[cfg(windows)]
252    HyperV(guid::Guid),
253    /// A VM which uses hybrid vsock over Unix sockets.
254    HybridVsock(PathBuf),
255    /// A VM that cannot be used for data connections.
256    None,
257}
258
259/// The diagnostics client.
260pub struct DiagClient {
261    vm: VmType,
262    ttrpc: mesh_rpc::Client,
263    driver: Box<dyn Driver>,
264}
265
266/// Defines packet capture operations.
267#[derive(PartialEq)]
268pub enum PacketCaptureOperation {
269    /// Query details.
270    Query,
271    /// Start packet capture.
272    Start,
273    /// Stop packet capture.
274    Stop,
275}
276
277/// An error connecting to the diagnostics server.
278#[derive(Debug, Error)]
279#[error("failed to connect")]
280pub struct ConnectError {
281    #[source]
282    err: anyhow::Error,
283    kind: ConnectErrorKind,
284}
285
286#[derive(Debug)]
287enum ConnectErrorKind {
288    Other,
289    VmNotStarted,
290    ServerTimedOut,
291}
292
293impl ConnectError {
294    /// Returns the time to wait before retrying the connection. If `None`, the
295    /// connection should not be retried.
296    pub fn retry_timeout(&self) -> Option<Duration> {
297        match self.kind {
298            ConnectErrorKind::VmNotStarted => Some(Duration::from_secs(1)),
299            ConnectErrorKind::ServerTimedOut => {
300                // The socket infrastructure has an internal timeout.
301                Some(Duration::ZERO)
302            }
303            _ => None,
304        }
305    }
306
307    fn other(err: impl Into<anyhow::Error>) -> Self {
308        Self {
309            err: err.into(),
310            kind: ConnectErrorKind::Other,
311        }
312    }
313
314    fn connect(err: std::io::Error) -> Self {
315        let kind = match err.kind() {
316            ErrorKind::AddrNotAvailable => ConnectErrorKind::VmNotStarted,
317            ErrorKind::TimedOut => ConnectErrorKind::ServerTimedOut,
318            _ => match err.raw_os_error() {
319                #[cfg(windows)]
320                Some(windows_sys::Win32::Networking::WinSock::WSAENETUNREACH) => {
321                    ConnectErrorKind::VmNotStarted
322                }
323                _ => ConnectErrorKind::Other,
324            },
325        };
326        Self {
327            err: anyhow::Error::from(err).context("failed to connect"),
328            kind,
329        }
330    }
331}
332
333struct VmConnector {
334    vm: VmType,
335    driver: Box<dyn Driver>,
336}
337
338impl mesh_rpc::client::Dial for VmConnector {
339    type Stream = PolledSocket<socket2::Socket>;
340
341    async fn dial(&mut self) -> std::io::Result<Self::Stream> {
342        match &self.vm {
343            #[cfg(windows)]
344            VmType::HyperV(guid) => {
345                let socket = hyperv::connect_vsock(
346                    self.driver.as_ref(),
347                    *guid,
348                    diag_proto::VSOCK_CONTROL_PORT,
349                )
350                .await
351                .map_err(std::io::Error::other)?;
352                Ok(PolledSocket::new(&self.driver, socket.into())?)
353            }
354            VmType::HybridVsock(path) => {
355                let socket = connect_hybrid_vsock(
356                    self.driver.as_ref(),
357                    path,
358                    diag_proto::VSOCK_CONTROL_PORT,
359                )
360                .await
361                .map_err(std::io::Error::other)?;
362                Ok(socket)
363            }
364            VmType::None => unreachable!(),
365        }
366    }
367}
368
369impl DiagClient {
370    /// Creates a client from Hyper-V VM name.
371    #[cfg(windows)]
372    pub fn from_hyperv_name(
373        driver: impl Driver + Spawn + Clone,
374        name: &str,
375    ) -> anyhow::Result<Self> {
376        Ok(Self::from_hyperv_id(
377            driver,
378            hyperv::vm_id_from_name(name).map_err(ConnectError::other)?,
379        ))
380    }
381
382    /// Creates a client from a Hyper-V or HCS VM ID.
383    #[cfg(windows)]
384    pub fn from_hyperv_id(driver: impl Driver + Spawn + Clone, vm_id: guid::Guid) -> Self {
385        let vm = VmType::HyperV(vm_id);
386        Self::new(
387            driver.clone(),
388            vm.clone(),
389            VmConnector {
390                vm,
391                driver: Box::new(driver),
392            },
393        )
394    }
395
396    /// Creates a client from a hybrid vsock Unix socket path.
397    pub fn from_hybrid_vsock(driver: impl Driver + Spawn + Clone, path: &Path) -> Self {
398        let vm = VmType::HybridVsock(path.into());
399        Self::new(
400            driver.clone(),
401            vm.clone(),
402            VmConnector {
403                vm,
404                driver: Box::new(driver.clone()),
405            },
406        )
407    }
408
409    /// Creates a client from a dialer.
410    ///
411    /// This client won't be usable with operations that require additional connections.
412    pub fn from_dialer(driver: impl Driver + Spawn, conn: impl mesh_rpc::client::Dial) -> Self {
413        Self::new(driver, VmType::None, conn)
414    }
415
416    fn new(driver: impl Driver + Spawn, vm: VmType, conn: impl mesh_rpc::client::Dial) -> Self {
417        Self {
418            vm,
419            ttrpc: mesh_rpc::client::ClientBuilder::new()
420                // Use a short reconnect timeout (compared to the normal 20
421                // seconds) since the VM may start at any time.
422                .retry_timeout(Duration::from_secs(1))
423                .build(&driver, conn),
424            driver: Box::new(driver),
425        }
426    }
427
428    /// Waits for the paravisor to be ready for RPCs.
429    pub async fn wait_for_server(&self) -> anyhow::Result<()> {
430        match self
431            .ttrpc
432            .call()
433            .wait_ready(true)
434            .start(diag_proto::OpenhclDiag::Ping, ())
435            .await
436        {
437            Ok(()) => {}
438            Err(Status { code, .. }) if code == mesh_rpc::service::Code::Unimplemented as i32 => {
439                // Older versions of the diag server don't support the ping
440                // RPC, but an unimplemented failure is good enough to know
441                // the server is ready.
442            }
443            Err(status) => return Err(grpc_status(status)),
444        }
445        Ok(())
446    }
447
448    /// Creates a builder for execing a command.
449    pub fn exec(&self, command: impl AsRef<str>) -> ExecBuilder<'_> {
450        ExecBuilder {
451            client: self,
452            with_stdin: false,
453            with_stdout: false,
454            with_stderr: false,
455            request: ExecRequest {
456                command: command.as_ref().to_owned(),
457                ..Default::default()
458            },
459        }
460    }
461
462    /// Creates a new data connection socket.
463    ///
464    /// This can be used with [`DiagClient::custom_call`].
465    pub async fn connect_data(&self) -> anyhow::Result<(u64, PolledSocket<socket2::Socket>)> {
466        let socket_type = match &self.vm {
467            #[cfg(windows)]
468            VmType::HyperV(guid) => SocketType::VmId {
469                vm_id: *guid,
470                port: diag_proto::VSOCK_DATA_PORT,
471            },
472            VmType::HybridVsock(path) => SocketType::HybridVsock {
473                path,
474                port: diag_proto::VSOCK_DATA_PORT,
475            },
476            VmType::None => {
477                anyhow::bail!("cannot make additional connections with this client")
478            }
479        };
480        new_data_connection(self.driver.as_ref(), socket_type).await
481    }
482
483    /// Sends an inspection request to the server.
484    pub async fn inspect(
485        &self,
486        path: impl Into<String>,
487        depth: Option<usize>,
488        timeout: Option<Duration>,
489    ) -> anyhow::Result<Node> {
490        let response = self.ttrpc.call().timeout(timeout).start(
491            inspect_proto::InspectService::Inspect,
492            inspect_proto::InspectRequest {
493                path: path.into(),
494                // It would be better to pass an Option<u32> in the proto, but that would break backcompat.
495                depth: depth.unwrap_or(u32::MAX as usize) as u32,
496            },
497        );
498
499        let response = response.await.map_err(grpc_status)?;
500        Ok(response.result)
501    }
502
503    /// Updates an inspectable value.
504    pub async fn update(
505        &self,
506        path: impl Into<String>,
507        value: impl Into<String>,
508    ) -> anyhow::Result<inspect::Value> {
509        let response = self.ttrpc.call().start(
510            inspect_proto::InspectService::Update,
511            inspect_proto::UpdateRequest {
512                path: path.into(),
513                value: value.into(),
514            },
515        );
516
517        let response = response.await.map_err(grpc_status)?;
518
519        Ok(response.new_value)
520    }
521
522    /// Get PID of a given process
523    pub async fn get_pid(&self, name: &str) -> anyhow::Result<i32> {
524        let hosts = self.inspect("mesh/hosts", Some(1), None).await?;
525        let mut plist = Vec::new();
526
527        let Node::Dir(processes) = hosts else {
528            anyhow::bail!("Hosts node is not a dir");
529        };
530        for process in processes {
531            let Node::Dir(pnode) = process.node else {
532                anyhow::bail!("Process node is not a dir");
533            };
534            for entry in pnode {
535                if entry.name == "name" {
536                    let Node::Value(value) = entry.node else {
537                        anyhow::bail!("Name node is not a value");
538                    };
539                    let ValueKind::String(strval) = value.kind else {
540                        anyhow::bail!("Name node is not a string");
541                    };
542                    if strval == name {
543                        return Ok(process.name.parse()?);
544                    }
545                    plist.push(strval);
546                }
547            }
548        }
549
550        anyhow::bail!("PID of {name} not found. Processes: {:?}", plist)
551    }
552
553    /// Starts the VM.
554    pub async fn start(
555        &self,
556        env: impl IntoIterator<Item = (String, Option<String>)>,
557        args: impl IntoIterator<Item = String>,
558    ) -> anyhow::Result<()> {
559        let request = diag_proto::StartRequest {
560            env: env
561                .into_iter()
562                .map(|(name, value)| diag_proto::EnvPair { name, value })
563                .collect(),
564            args: args.into_iter().collect(),
565        };
566        self.ttrpc
567            .call()
568            .start(diag_proto::UnderhillDiag::Start, request)
569            .await
570            .map_err(grpc_status)?;
571
572        Ok(())
573    }
574
575    /// Gets the contents of /dev/kmsg
576    pub async fn kmsg(&self, follow: bool) -> anyhow::Result<KmsgStream> {
577        let (conn, socket) = self.connect_data().await?;
578
579        self.ttrpc
580            .call()
581            .start(
582                diag_proto::UnderhillDiag::Kmsg,
583                diag_proto::KmsgRequest { follow, conn },
584            )
585            .await
586            .map_err(grpc_status)?;
587
588        Ok(KmsgStream::new(socket))
589    }
590
591    /// Gets the contents of the file
592    pub async fn read_file(
593        &self,
594        follow: bool,
595        file_path: String,
596    ) -> anyhow::Result<PolledSocket<socket2::Socket>> {
597        let (conn, socket) = self.connect_data().await?;
598
599        self.ttrpc
600            .call()
601            .start(
602                diag_proto::UnderhillDiag::ReadFile,
603                diag_proto::FileRequest {
604                    follow,
605                    conn,
606                    file_path,
607                },
608            )
609            .await
610            .map_err(grpc_status)?;
611
612        Ok(socket)
613    }
614
615    /// Issues a call to the server using a custom RPC.
616    ///
617    /// This can be used to support extension RPCs that are not part of the main
618    /// diagnostics service.
619    pub fn custom_call(&self) -> mesh_rpc::client::CallBuilder<'_> {
620        self.ttrpc.call()
621    }
622
623    /// Crashes the VM.
624    pub async fn crash(&self, pid: i32) -> anyhow::Result<()> {
625        self.ttrpc
626            .call()
627            .start(
628                diag_proto::UnderhillDiag::Crash,
629                diag_proto::CrashRequest { pid },
630            )
631            .await
632            .map_err(grpc_status)?;
633
634        Ok(())
635    }
636
637    /// Sets up network packet capture trace.
638    pub async fn packet_capture(
639        &self,
640        op: PacketCaptureOperation,
641        num_streams: u32,
642        snaplen: u16,
643    ) -> anyhow::Result<(Vec<PolledSocket<socket2::Socket>>, u32)> {
644        let mut sockets = Vec::new();
645        let op_data = match op {
646            PacketCaptureOperation::Start => {
647                let mut conns = Vec::new();
648                for _ in 0..num_streams {
649                    let (conn, socket) = self.connect_data().await?;
650                    conns.push(conn);
651                    sockets.push(socket);
652                }
653                Some(OpData::StartData(diag_proto::StartPacketCaptureData {
654                    snaplen: snaplen.into(),
655                    conns,
656                }))
657            }
658            _ => None,
659        };
660
661        let operation = match op {
662            PacketCaptureOperation::Query => Operation::Query,
663            PacketCaptureOperation::Start => Operation::Start,
664            PacketCaptureOperation::Stop => Operation::Stop,
665        };
666
667        let response = self
668            .ttrpc
669            .call()
670            .start(
671                diag_proto::UnderhillDiag::PacketCapture,
672                diag_proto::NetworkPacketCaptureRequest {
673                    operation: operation.into(),
674                    op_data,
675                },
676            )
677            .await
678            .map_err(grpc_status)?;
679
680        Ok((sockets, response.num_streams))
681    }
682
683    /// Saves a core dump file being streamed from Underhill
684    pub async fn core_dump(
685        &self,
686        pid: i32,
687        mut writer: impl AsyncWrite + Unpin,
688        mut stderr: impl AsyncWrite + Unpin,
689        verbose: bool,
690    ) -> anyhow::Result<()> {
691        // Launch hcl-dump to dump the target process. Use raw_socket_io so that
692        // the diagnostics process does not have to be running during the core
693        // dump process; this ensures that we can dump the diagnostics process,
694        // too.
695        let mut process = self.exec("/bin/underhill-dump");
696        if verbose {
697            process.args(["-v"]);
698        }
699        let mut process = process
700            .args([pid.to_string()])
701            .stdin(false)
702            .stdout(true)
703            .stderr(true)
704            .raw_socket_io(true)
705            .spawn()
706            .await
707            .context("failed to launch underhill-dump")?;
708
709        let process_stdout = PolledSocket::new(&self.driver, process.stdout.take().unwrap())?;
710        let process_stderr = PolledSocket::new(&self.driver, process.stderr.take().unwrap())?;
711
712        let out = futures::io::copy(process_stdout, &mut writer);
713        let err = futures::io::copy(process_stderr, &mut stderr);
714
715        futures::try_join!(out, err)?;
716
717        let status = process
718            .wait()
719            .await
720            .context("failed to wait for underhill-dump")?;
721
722        if !status.success() {
723            anyhow::bail!(
724                "underhill-dump failed with exit code {}",
725                status.exit_code()
726            );
727        }
728        Ok(())
729    }
730
731    /// Restarts the Underhill worker.
732    pub async fn restart(&self) -> anyhow::Result<()> {
733        self.ttrpc
734            .call()
735            .start(diag_proto::UnderhillDiag::Restart, ())
736            .await
737            .map_err(grpc_status)?;
738
739        Ok(())
740    }
741
742    /// Pause the VM (including all devices).
743    pub async fn pause(&self) -> anyhow::Result<()> {
744        self.ttrpc
745            .call()
746            .start(diag_proto::UnderhillDiag::Pause, ())
747            .await
748            .map_err(grpc_status)?;
749
750        Ok(())
751    }
752
753    /// Resume the VM.
754    pub async fn resume(&self) -> anyhow::Result<()> {
755        self.ttrpc
756            .call()
757            .start(diag_proto::UnderhillDiag::Resume, ())
758            .await
759            .map_err(grpc_status)?;
760
761        Ok(())
762    }
763
764    /// Dumps the VM's VTL2 saved state.
765    pub async fn dump_saved_state(&self) -> anyhow::Result<Vec<u8>> {
766        let state = self
767            .ttrpc
768            .call()
769            .start(diag_proto::UnderhillDiag::DumpSavedState, ())
770            .await
771            .map_err(grpc_status)?;
772
773        Ok(state.data)
774    }
775}
776
777fn grpc_status(status: Status) -> anyhow::Error {
778    anyhow::anyhow!(status.message)
779}
780
781/// A builder for launching a command in VTL2.
782pub struct ExecBuilder<'a> {
783    client: &'a DiagClient,
784    with_stdin: bool,
785    with_stdout: bool,
786    with_stderr: bool,
787    request: ExecRequest,
788}
789
790impl ExecBuilder<'_> {
791    /// Adds `args` to the argument list.
792    pub fn args<T: AsRef<str>>(&mut self, args: impl IntoIterator<Item = T>) -> &mut Self {
793        self.request
794            .args
795            .extend(args.into_iter().map(|s| s.as_ref().to_owned()));
796        self
797    }
798
799    /// Sets whether the process is spawned with a TTY.
800    pub fn tty(&mut self, tty: bool) -> &mut Self {
801        self.request.tty = tty;
802        self
803    }
804
805    /// Specifies whether a stdin socket should be opened.
806    pub fn stdin(&mut self, stdin: bool) -> &mut Self {
807        self.with_stdin = stdin;
808        self
809    }
810
811    /// Specifies whether a stdout socket should be opened.
812    pub fn stdout(&mut self, stdout: bool) -> &mut Self {
813        self.with_stdout = stdout;
814        self
815    }
816
817    /// Specifies whether a stderr socket should be opened.
818    pub fn stderr(&mut self, stderr: bool) -> &mut Self {
819        self.with_stderr = stderr;
820        self
821    }
822
823    /// Specifies whether the processes's stdout and stderr should be combined
824    /// into a single stream (the stdout socket).
825    pub fn combine_stderr(&mut self, combine_stderr: bool) -> &mut Self {
826        self.request.combine_stderr = combine_stderr;
827        self
828    }
829
830    /// Specifies whether the vsock sockets used for stdio should be passed
831    /// directly to the launched process instead of going through relays.
832    pub fn raw_socket_io(&mut self, raw_socket_io: bool) -> &mut Self {
833        self.request.raw_socket_io = raw_socket_io;
834        self
835    }
836
837    /// Clears the default environment.
838    pub fn env_clear(&mut self) -> &mut Self {
839        self.request.clear_env = true;
840        self
841    }
842
843    /// Removes an environment variable.
844    pub fn env_remove(&mut self, name: impl AsRef<str>) -> &mut Self {
845        self.request.env.push(diag_proto::EnvPair {
846            name: name.as_ref().to_owned(),
847            value: None,
848        });
849        self
850    }
851
852    /// Sets an environment variable.
853    pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
854        self.request.env.push(diag_proto::EnvPair {
855            name: name.as_ref().to_owned(),
856            value: Some(value.as_ref().to_owned()),
857        });
858        self
859    }
860
861    /// Spawns the process.
862    pub async fn spawn(&self) -> anyhow::Result<Process> {
863        let mut request = self.request.clone();
864
865        let stdin = if self.with_stdin {
866            let (id, stdin) = self
867                .client
868                .connect_data()
869                .await
870                .context("failed to connect stdin")?;
871            request.stdin = id;
872
873            Some(stdin.into_inner())
874        } else {
875            None
876        };
877
878        let stdout = if self.with_stdout {
879            let (id, stdout) = self
880                .client
881                .connect_data()
882                .await
883                .context("failed to connect stdout")?;
884            request.stdout = id;
885
886            Some(stdout.into_inner())
887        } else {
888            None
889        };
890
891        let stderr = if self.with_stdout {
892            let (id, stderr) = self
893                .client
894                .connect_data()
895                .await
896                .context("failed to connect stderr")?;
897            request.stderr = id;
898
899            Some(stderr.into_inner())
900        } else {
901            None
902        };
903
904        let response = self
905            .client
906            .ttrpc
907            .call()
908            .start(diag_proto::UnderhillDiag::Exec, request)
909            .await
910            .map_err(grpc_status)?;
911
912        let wait = self.client.ttrpc.call().start(
913            diag_proto::UnderhillDiag::Wait,
914            WaitRequest { pid: response.pid },
915        );
916
917        Ok(Process {
918            stdin,
919            stdout,
920            stderr,
921            wait,
922            pid: response.pid,
923        })
924    }
925}
926
927/// A process running in VTL2.
928#[derive(Debug)]
929pub struct Process {
930    /// The standard input stream.
931    pub stdin: Option<socket2::Socket>,
932    /// The standard output stream.
933    pub stdout: Option<socket2::Socket>,
934    /// The standard error stream.
935    pub stderr: Option<socket2::Socket>,
936    pid: i32,
937    wait: mesh_rpc::client::Call<WaitResponse>,
938}
939
940impl Process {
941    /// Returns the process ID.
942    pub fn id(&self) -> i32 {
943        self.pid
944    }
945
946    /// Waits for the process to exit.
947    pub async fn wait(self) -> anyhow::Result<ExitStatus> {
948        let response = self
949            .wait
950            .await
951            .map_err(|err| anyhow::anyhow!("{}", err.message))?;
952
953        Ok(ExitStatus { response })
954    }
955}
956
957/// Process exit status.
958#[derive(Debug)]
959pub struct ExitStatus {
960    response: WaitResponse,
961}
962
963impl ExitStatus {
964    /// The exit code.
965    pub fn exit_code(&self) -> i32 {
966        self.response.exit_code
967    }
968
969    /// Whether the process successfully terminated.
970    pub fn success(&self) -> bool {
971        self.response.exit_code == 0
972    }
973}