Skip to main content

pipette_client/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! The client for `pipette`.
5
6#![forbid(unsafe_code)]
7
8pub mod process;
9mod send;
10pub mod shell;
11
12pub use pipette_protocol::PIPETTE_PORT;
13pub use pipette_protocol::PIPETTE_READY_MARKER;
14
15use crate::send::PipetteSender;
16use anyhow::Context;
17use futures::AsyncBufReadExt;
18use futures::AsyncRead;
19use futures::AsyncWrite;
20use futures::AsyncWriteExt;
21use futures::FutureExt as _;
22use futures::StreamExt;
23use futures::io::BufReader;
24use futures_concurrency::future::TryJoin;
25use mesh::error::RemoteError;
26use mesh::payload::Timestamp;
27use mesh::rpc::RpcError;
28use mesh_remote::PointToPointMesh;
29use pal_async::task::Spawn;
30use pal_async::task::Task;
31use pipette_protocol::DiagnosticFile;
32use pipette_protocol::PipetteBootstrap;
33use pipette_protocol::PipetteRequest;
34use pipette_protocol::ReadFileRequest;
35use pipette_protocol::WriteFileRequest;
36use shell::UnixShell;
37use shell::WindowsShell;
38use std::path::Path;
39use std::path::PathBuf;
40
41/// A client to a running `pipette` instance inside a VM.
42pub struct PipetteClient {
43    send: PipetteSender,
44    watch: mesh::OneshotReceiver<()>,
45    _mesh: PointToPointMesh,
46    _log_task: Task<()>,
47    _diag_task: Task<()>,
48}
49
50impl PipetteClient {
51    /// Connects to a `pipette` instance inside a VM.
52    ///
53    /// `conn` must be an established connection over some byte stream (e.g., a
54    /// socket).
55    pub async fn new(
56        spawner: impl Spawn,
57        conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
58        output_dir: &Path,
59    ) -> Result<Self, mesh::RecvError> {
60        let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
61        let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
62        let bootstrap = bootstrap_recv.await?;
63
64        let PipetteBootstrap {
65            requests,
66            diag_file_recv,
67            watch,
68            log,
69        } = bootstrap;
70
71        let log_task = spawner.spawn("pipette-log", replay_logs(log));
72        let diag_task = spawner.spawn(
73            "diagnostics-recv",
74            recv_diag_files(output_dir.to_owned(), diag_file_recv),
75        );
76
77        Ok(Self {
78            send: PipetteSender::new(requests),
79            watch,
80            _mesh: mesh,
81            _log_task: log_task,
82            _diag_task: diag_task,
83        })
84    }
85
86    /// Pings the agent to check if it's alive.
87    pub async fn ping(&self) -> Result<(), RpcError> {
88        self.send.call(PipetteRequest::Ping, ()).await
89    }
90
91    /// Return a shell object to interact with a Windows guest.
92    pub fn windows_shell(&self) -> WindowsShell<'_> {
93        WindowsShell::new(self)
94    }
95
96    /// Return a shell object to interact with a Linux guest.
97    pub fn unix_shell(&self) -> UnixShell<'_> {
98        UnixShell::new(self)
99    }
100
101    /// Mounts a filesystem inside the guest (Linux only).
102    pub async fn mount(
103        &self,
104        source: &str,
105        target: &str,
106        fstype: &str,
107        flags: u64,
108        mkdir_target: bool,
109    ) -> anyhow::Result<()> {
110        self.send
111            .call_failable(
112                PipetteRequest::Mount,
113                pipette_protocol::MountRequest {
114                    source: source.to_owned(),
115                    target: target.to_owned(),
116                    fstype: fstype.to_owned(),
117                    flags,
118                    mkdir_target,
119                },
120            )
121            .await
122            .context("failed to send mount request")?;
123        Ok(())
124    }
125
126    /// Prepares a chroot by bind-mounting `/proc`, `/dev`, and `/sys` into it,
127    /// and mounting a writable tmpfs at `/tmp`.
128    pub async fn prepare_chroot(&self, target: &str) -> anyhow::Result<()> {
129        // MS_BIND = 0x1000
130        const MS_BIND: u64 = 0x1000;
131        for dir in ["/proc", "/dev", "/sys"] {
132            let mount_target = format!("{target}{dir}");
133            self.mount(dir, &mount_target, "", MS_BIND, true).await?;
134        }
135        // Mount a writable tmpfs so tools like iperf3 can create temp files.
136        let tmp_target = format!("{target}/tmp");
137        self.mount("tmpfs", &tmp_target, "tmpfs", 0, true).await?;
138        Ok(())
139    }
140
141    /// Returns an object used to launch a command inside the guest.
142    ///
143    /// TODO: this is a low-level interface. Make a high-level interface like
144    /// `xshell::Shell` for manipulating the environment and launching
145    /// processes.
146    pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
147        process::Command::new(self, program)
148    }
149
150    /// Sends a request to the guest to power off.
151    pub async fn power_off(&self) -> anyhow::Result<()> {
152        self.shutdown(pipette_protocol::ShutdownType::PowerOff)
153            .await
154    }
155
156    /// Sends a request to the guest to reboot.
157    pub async fn reboot(&self) -> anyhow::Result<()> {
158        self.shutdown(pipette_protocol::ShutdownType::Reboot).await
159    }
160
161    async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
162        tracing::debug!(?shutdown_type, "sending shutdown request to guest");
163        let r = self.send.call(
164            PipetteRequest::Shutdown,
165            pipette_protocol::ShutdownRequest { shutdown_type },
166        );
167        match r.await {
168            Ok(r) => r
169                .map_err(anyhow::Error::from)
170                .context("failed to shut down")?,
171            Err(_) => {
172                // Presumably this is an expected error due to the agent exiting
173                // or the guest powering off.
174            }
175        }
176        Ok(())
177    }
178
179    /// Reads the full contents of a file.
180    pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
181        let (recv_pipe, send_pipe) = mesh::pipe::pipe();
182        let req = ReadFileRequest {
183            path: path.as_ref().to_string(),
184            sender: send_pipe,
185        };
186
187        let request_future = self.send.call_failable(PipetteRequest::ReadFile, req);
188
189        let mut contents = Vec::new();
190        let transfer_future = async { futures::io::copy(recv_pipe, &mut contents).await };
191
192        tracing::debug!(path = path.as_ref(), "beginning file read transfer");
193        let (bytes_read, io_result) = (request_future, transfer_future.map(Ok))
194            .try_join()
195            .await
196            .context("failed to read file")?;
197
198        io_result.context("io failure")?;
199        if bytes_read != contents.len() as u64 {
200            anyhow::bail!("file truncated");
201        }
202
203        tracing::debug!("file read complete");
204        Ok(contents)
205    }
206
207    /// Writes a file to the guest.
208    /// Note: This may transfer the file in chunks. It is likely not suitable
209    /// for writing to files that require all content to be written at once,
210    /// e.g. files in /proc or /sys.
211    pub async fn write_file(
212        &self,
213        path: impl AsRef<str>,
214        contents: impl AsyncRead,
215    ) -> anyhow::Result<()> {
216        let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
217        let req = WriteFileRequest {
218            path: path.as_ref().to_string(),
219            receiver: recv_pipe,
220        };
221
222        let request_future = self.send.call_failable(PipetteRequest::WriteFile, req);
223
224        let transfer_future = async {
225            let copy_result = futures::io::copy(contents, &mut send_pipe).await;
226            send_pipe.close().await?;
227            copy_result
228        };
229
230        tracing::debug!(path = path.as_ref(), "beginning file wurite transfer");
231        let (bytes_written, io_result) = (request_future, transfer_future.map(Ok))
232            .try_join()
233            .await
234            .context("failed to write file")?;
235        if bytes_written != io_result.context("io failure")? {
236            anyhow::bail!("file truncated");
237        }
238
239        tracing::debug!("file write complete");
240        Ok(())
241    }
242
243    /// Waits for the agent to exit.
244    pub async fn wait(self) -> Result<(), mesh::RecvError> {
245        self.watch.await
246    }
247
248    /// Returns the current time in the guest.
249    pub async fn get_time(&self) -> anyhow::Result<Timestamp> {
250        self.send
251            .call(PipetteRequest::GetTime, ())
252            .await
253            .context("failed to get time")
254    }
255
256    /// Tell the agent to crash itself.
257    pub async fn crash(&self) -> Result<(), RemoteError> {
258        Self::handle_crash_result(self.send.call_failable(PipetteRequest::Crash, ()).await)
259    }
260
261    /// Tell the agent to crash the kernel.
262    pub async fn kernel_crash(&self) -> Result<(), RemoteError> {
263        Self::handle_crash_result(
264            self.send
265                .call_failable(PipetteRequest::KernelCrash, ())
266                .await,
267        )
268    }
269
270    fn handle_crash_result(r: Result<(), RpcError<RemoteError>>) -> Result<(), RemoteError> {
271        match r {
272            Ok(()) => unreachable!(),
273            Err(RpcError::Call(err)) => Err(err),
274            Err(RpcError::Channel(_)) => {
275                // Presumably this is an expected error due to the agent exiting
276                // or the guest crashing.
277                Ok(())
278            }
279        }
280    }
281}
282
283async fn replay_logs(log: mesh::pipe::ReadPipe) {
284    let mut lines = BufReader::new(log).lines();
285    while let Some(line) = lines.next().await {
286        match line {
287            Ok(line) => tracing::debug!(target: "pipette", "{}", line),
288            Err(err) => {
289                tracing::error!(
290                    error = &err as &dyn std::error::Error,
291                    "pipette log failure"
292                );
293                break;
294            }
295        }
296    }
297}
298
299async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
300    while let Some(diag_file) = diag_file_recv.next().await {
301        let DiagnosticFile { name, mut receiver } = diag_file;
302        tracing::debug!(name, "receiving diagnostic file");
303        let path = output_dir.join(&name);
304        let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
305        futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
306            .await
307            .expect("failed to write diagnostic file");
308        tracing::debug!(name, "diagnostic file transfer complete");
309
310        #[expect(
311            clippy::disallowed_methods,
312            reason = "ATTACHMENT is most reliable when using true canonicalized paths"
313        )]
314        let canonical_path = path
315            .canonicalize()
316            .expect("failed to canonicalize attachment path");
317        // Use the inline junit syntax to attach the file to the test result.
318        println!("[[ATTACHMENT|{}]]", canonical_path.display());
319    }
320}