pipette_client/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! The client for `pipette`.
5
6#![forbid(unsafe_code)]
7
8pub mod process;
9mod send;
10pub mod shell;
11
12pub use pipette_protocol::PIPETTE_VSOCK_PORT;
13
14use crate::send::PipetteSender;
15use anyhow::Context;
16use futures::AsyncBufReadExt;
17use futures::AsyncRead;
18use futures::AsyncWrite;
19use futures::AsyncWriteExt;
20use futures::FutureExt as _;
21use futures::StreamExt;
22use futures::io::BufReader;
23use futures_concurrency::future::TryJoin;
24use mesh::error::RemoteError;
25use mesh::payload::Timestamp;
26use mesh::rpc::RpcError;
27use mesh_remote::PointToPointMesh;
28use pal_async::task::Spawn;
29use pal_async::task::Task;
30use pipette_protocol::DiagnosticFile;
31use pipette_protocol::PipetteBootstrap;
32use pipette_protocol::PipetteRequest;
33use pipette_protocol::ReadFileRequest;
34use pipette_protocol::WriteFileRequest;
35use shell::UnixShell;
36use shell::WindowsShell;
37use std::path::Path;
38use std::path::PathBuf;
39
40/// A client to a running `pipette` instance inside a VM.
41pub struct PipetteClient {
42    send: PipetteSender,
43    watch: mesh::OneshotReceiver<()>,
44    _mesh: PointToPointMesh,
45    _log_task: Task<()>,
46    _diag_task: Task<()>,
47}
48
49impl PipetteClient {
50    /// Connects to a `pipette` instance inside a VM.
51    ///
52    /// `conn` must be an established connection over some byte stream (e.g., a
53    /// socket).
54    pub async fn new(
55        spawner: impl Spawn,
56        conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
57        output_dir: &Path,
58    ) -> Result<Self, mesh::RecvError> {
59        let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
60        let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
61        let bootstrap = bootstrap_recv.await?;
62
63        let PipetteBootstrap {
64            requests,
65            diag_file_recv,
66            watch,
67            log,
68        } = bootstrap;
69
70        let log_task = spawner.spawn("pipette-log", replay_logs(log));
71        let diag_task = spawner.spawn(
72            "diagnostics-recv",
73            recv_diag_files(output_dir.to_owned(), diag_file_recv),
74        );
75
76        Ok(Self {
77            send: PipetteSender::new(requests),
78            watch,
79            _mesh: mesh,
80            _log_task: log_task,
81            _diag_task: diag_task,
82        })
83    }
84
85    /// Pings the agent to check if it's alive.
86    pub async fn ping(&self) -> Result<(), RpcError> {
87        self.send.call(PipetteRequest::Ping, ()).await
88    }
89
90    /// Return a shell object to interact with a Windows guest.
91    pub fn windows_shell(&self) -> WindowsShell<'_> {
92        WindowsShell::new(self)
93    }
94
95    /// Return a shell object to interact with a Linux guest.
96    pub fn unix_shell(&self) -> UnixShell<'_> {
97        UnixShell::new(self)
98    }
99
100    /// Mounts a filesystem inside the guest (Linux only).
101    pub async fn mount(
102        &self,
103        source: &str,
104        target: &str,
105        fstype: &str,
106        flags: u64,
107        mkdir_target: bool,
108    ) -> anyhow::Result<()> {
109        self.send
110            .call_failable(
111                PipetteRequest::Mount,
112                pipette_protocol::MountRequest {
113                    source: source.to_owned(),
114                    target: target.to_owned(),
115                    fstype: fstype.to_owned(),
116                    flags,
117                    mkdir_target,
118                },
119            )
120            .await
121            .context("failed to send mount request")?;
122        Ok(())
123    }
124
125    /// Prepares a chroot by bind-mounting `/proc`, `/dev`, and `/sys` into it,
126    /// and mounting a writable tmpfs at `/tmp`.
127    pub async fn prepare_chroot(&self, target: &str) -> anyhow::Result<()> {
128        // MS_BIND = 0x1000
129        const MS_BIND: u64 = 0x1000;
130        for dir in ["/proc", "/dev", "/sys"] {
131            let mount_target = format!("{target}{dir}");
132            self.mount(dir, &mount_target, "", MS_BIND, true).await?;
133        }
134        // Mount a writable tmpfs so tools like iperf3 can create temp files.
135        let tmp_target = format!("{target}/tmp");
136        self.mount("tmpfs", &tmp_target, "tmpfs", 0, true).await?;
137        Ok(())
138    }
139
140    /// Returns an object used to launch a command inside the guest.
141    ///
142    /// TODO: this is a low-level interface. Make a high-level interface like
143    /// `xshell::Shell` for manipulating the environment and launching
144    /// processes.
145    pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
146        process::Command::new(self, program)
147    }
148
149    /// Sends a request to the guest to power off.
150    pub async fn power_off(&self) -> anyhow::Result<()> {
151        self.shutdown(pipette_protocol::ShutdownType::PowerOff)
152            .await
153    }
154
155    /// Sends a request to the guest to reboot.
156    pub async fn reboot(&self) -> anyhow::Result<()> {
157        self.shutdown(pipette_protocol::ShutdownType::Reboot).await
158    }
159
160    async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
161        tracing::debug!(?shutdown_type, "sending shutdown request to guest");
162        let r = self.send.call(
163            PipetteRequest::Shutdown,
164            pipette_protocol::ShutdownRequest { shutdown_type },
165        );
166        match r.await {
167            Ok(r) => r
168                .map_err(anyhow::Error::from)
169                .context("failed to shut down")?,
170            Err(_) => {
171                // Presumably this is an expected error due to the agent exiting
172                // or the guest powering off.
173            }
174        }
175        Ok(())
176    }
177
178    /// Reads the full contents of a file.
179    pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
180        let (recv_pipe, send_pipe) = mesh::pipe::pipe();
181        let req = ReadFileRequest {
182            path: path.as_ref().to_string(),
183            sender: send_pipe,
184        };
185
186        let request_future = self.send.call_failable(PipetteRequest::ReadFile, req);
187
188        let mut contents = Vec::new();
189        let transfer_future = async { futures::io::copy(recv_pipe, &mut contents).await };
190
191        tracing::debug!(path = path.as_ref(), "beginning file read transfer");
192        let (bytes_read, io_result) = (request_future, transfer_future.map(Ok))
193            .try_join()
194            .await
195            .context("failed to read file")?;
196
197        io_result.context("io failure")?;
198        if bytes_read != contents.len() as u64 {
199            anyhow::bail!("file truncated");
200        }
201
202        tracing::debug!("file read complete");
203        Ok(contents)
204    }
205
206    /// Writes a file to the guest.
207    /// Note: This may transfer the file in chunks. It is likely not suitable
208    /// for writing to files that require all content to be written at once,
209    /// e.g. files in /proc or /sys.
210    pub async fn write_file(
211        &self,
212        path: impl AsRef<str>,
213        contents: impl AsyncRead,
214    ) -> anyhow::Result<()> {
215        let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
216        let req = WriteFileRequest {
217            path: path.as_ref().to_string(),
218            receiver: recv_pipe,
219        };
220
221        let request_future = self.send.call_failable(PipetteRequest::WriteFile, req);
222
223        let transfer_future = async {
224            let copy_result = futures::io::copy(contents, &mut send_pipe).await;
225            send_pipe.close().await?;
226            copy_result
227        };
228
229        tracing::debug!(path = path.as_ref(), "beginning file wurite transfer");
230        let (bytes_written, io_result) = (request_future, transfer_future.map(Ok))
231            .try_join()
232            .await
233            .context("failed to write file")?;
234        if bytes_written != io_result.context("io failure")? {
235            anyhow::bail!("file truncated");
236        }
237
238        tracing::debug!("file write complete");
239        Ok(())
240    }
241
242    /// Waits for the agent to exit.
243    pub async fn wait(self) -> Result<(), mesh::RecvError> {
244        self.watch.await
245    }
246
247    /// Returns the current time in the guest.
248    pub async fn get_time(&self) -> anyhow::Result<Timestamp> {
249        self.send
250            .call(PipetteRequest::GetTime, ())
251            .await
252            .context("failed to get time")
253    }
254
255    /// Tell the agent to crash itself.
256    pub async fn crash(&self) -> Result<(), RemoteError> {
257        Self::handle_crash_result(self.send.call_failable(PipetteRequest::Crash, ()).await)
258    }
259
260    /// Tell the agent to crash the kernel.
261    pub async fn kernel_crash(&self) -> Result<(), RemoteError> {
262        Self::handle_crash_result(
263            self.send
264                .call_failable(PipetteRequest::KernelCrash, ())
265                .await,
266        )
267    }
268
269    fn handle_crash_result(r: Result<(), RpcError<RemoteError>>) -> Result<(), RemoteError> {
270        match r {
271            Ok(()) => unreachable!(),
272            Err(RpcError::Call(err)) => Err(err),
273            Err(RpcError::Channel(_)) => {
274                // Presumably this is an expected error due to the agent exiting
275                // or the guest crashing.
276                Ok(())
277            }
278        }
279    }
280}
281
282async fn replay_logs(log: mesh::pipe::ReadPipe) {
283    let mut lines = BufReader::new(log).lines();
284    while let Some(line) = lines.next().await {
285        match line {
286            Ok(line) => tracing::info!(target: "pipette", "{}", line),
287            Err(err) => {
288                tracing::error!(
289                    error = &err as &dyn std::error::Error,
290                    "pipette log failure"
291                );
292                break;
293            }
294        }
295    }
296}
297
298async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
299    while let Some(diag_file) = diag_file_recv.next().await {
300        let DiagnosticFile { name, mut receiver } = diag_file;
301        tracing::debug!(name, "receiving diagnostic file");
302        let path = output_dir.join(&name);
303        let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
304        futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
305            .await
306            .expect("failed to write diagnostic file");
307        tracing::debug!(name, "diagnostic file transfer complete");
308
309        #[expect(
310            clippy::disallowed_methods,
311            reason = "ATTACHMENT is most reliable when using true canonicalized paths"
312        )]
313        let canonical_path = path
314            .canonicalize()
315            .expect("failed to canonicalize attachment path");
316        // Use the inline junit syntax to attach the file to the test result.
317        println!("[[ATTACHMENT|{}]]", canonical_path.display());
318    }
319}