pipette_client/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! The client for `pipette`.
5
6#![forbid(unsafe_code)]
7
8pub mod process;
9mod send;
10pub mod shell;
11
12pub use pipette_protocol::PIPETTE_VSOCK_PORT;
13
14use crate::send::PipetteSender;
15use anyhow::Context;
16use futures::AsyncBufReadExt;
17use futures::AsyncRead;
18use futures::AsyncWrite;
19use futures::AsyncWriteExt;
20use futures::FutureExt as _;
21use futures::StreamExt;
22use futures::io::BufReader;
23use futures_concurrency::future::TryJoin;
24use mesh::error::RemoteError;
25use mesh::payload::Timestamp;
26use mesh::rpc::RpcError;
27use mesh_remote::PointToPointMesh;
28use pal_async::task::Spawn;
29use pal_async::task::Task;
30use pipette_protocol::DiagnosticFile;
31use pipette_protocol::PipetteBootstrap;
32use pipette_protocol::PipetteRequest;
33use pipette_protocol::ReadFileRequest;
34use pipette_protocol::WriteFileRequest;
35use shell::UnixShell;
36use shell::WindowsShell;
37use std::path::Path;
38use std::path::PathBuf;
39
40/// A client to a running `pipette` instance inside a VM.
41pub struct PipetteClient {
42    send: PipetteSender,
43    watch: mesh::OneshotReceiver<()>,
44    _mesh: PointToPointMesh,
45    _log_task: Task<()>,
46    _diag_task: Task<()>,
47}
48
49impl PipetteClient {
50    /// Connects to a `pipette` instance inside a VM.
51    ///
52    /// `conn` must be an established connection over some byte stream (e.g., a
53    /// socket).
54    pub async fn new(
55        spawner: impl Spawn,
56        conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
57        output_dir: &Path,
58    ) -> Result<Self, mesh::RecvError> {
59        let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
60        let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
61        let bootstrap = bootstrap_recv.await?;
62
63        let PipetteBootstrap {
64            requests,
65            diag_file_recv,
66            watch,
67            log,
68        } = bootstrap;
69
70        let log_task = spawner.spawn("pipette-log", replay_logs(log));
71        let diag_task = spawner.spawn(
72            "diagnostics-recv",
73            recv_diag_files(output_dir.to_owned(), diag_file_recv),
74        );
75
76        Ok(Self {
77            send: PipetteSender::new(requests),
78            watch,
79            _mesh: mesh,
80            _log_task: log_task,
81            _diag_task: diag_task,
82        })
83    }
84
85    /// Pings the agent to check if it's alive.
86    pub async fn ping(&self) -> Result<(), RpcError> {
87        self.send.call(PipetteRequest::Ping, ()).await
88    }
89
90    /// Return a shell object to interact with a Windows guest.
91    pub fn windows_shell(&self) -> WindowsShell<'_> {
92        WindowsShell::new(self)
93    }
94
95    /// Return a shell object to interact with a Linux guest.
96    pub fn unix_shell(&self) -> UnixShell<'_> {
97        UnixShell::new(self)
98    }
99
100    /// Returns an object used to launch a command inside the guest.
101    ///
102    /// TODO: this is a low-level interface. Make a high-level interface like
103    /// `xshell::Shell` for manipulating the environment and launching
104    /// processes.
105    pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
106        process::Command::new(self, program)
107    }
108
109    /// Sends a request to the guest to power off.
110    pub async fn power_off(&self) -> anyhow::Result<()> {
111        self.shutdown(pipette_protocol::ShutdownType::PowerOff)
112            .await
113    }
114
115    /// Sends a request to the guest to reboot.
116    pub async fn reboot(&self) -> anyhow::Result<()> {
117        self.shutdown(pipette_protocol::ShutdownType::Reboot).await
118    }
119
120    async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
121        tracing::debug!(?shutdown_type, "sending shutdown request to guest");
122        let r = self.send.call(
123            PipetteRequest::Shutdown,
124            pipette_protocol::ShutdownRequest { shutdown_type },
125        );
126        match r.await {
127            Ok(r) => r
128                .map_err(anyhow::Error::from)
129                .context("failed to shut down")?,
130            Err(_) => {
131                // Presumably this is an expected error due to the agent exiting
132                // or the guest powering off.
133            }
134        }
135        Ok(())
136    }
137
138    /// Reads the full contents of a file.
139    pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
140        let (recv_pipe, send_pipe) = mesh::pipe::pipe();
141        let req = ReadFileRequest {
142            path: path.as_ref().to_string(),
143            sender: send_pipe,
144        };
145
146        let request_future = self.send.call_failable(PipetteRequest::ReadFile, req);
147
148        let mut contents = Vec::new();
149        let transfer_future = async { futures::io::copy(recv_pipe, &mut contents).await };
150
151        tracing::debug!(path = path.as_ref(), "beginning file read transfer");
152        let (bytes_read, io_result) = (request_future, transfer_future.map(Ok))
153            .try_join()
154            .await
155            .context("failed to read file")?;
156
157        io_result.context("io failure")?;
158        if bytes_read != contents.len() as u64 {
159            anyhow::bail!("file truncated");
160        }
161
162        tracing::debug!("file read complete");
163        Ok(contents)
164    }
165
166    /// Writes a file to the guest.
167    /// Note: This may transfer the file in chunks. It is likely not suitable
168    /// for writing to files that require all content to be written at once,
169    /// e.g. files in /proc or /sys.
170    pub async fn write_file(
171        &self,
172        path: impl AsRef<str>,
173        contents: impl AsyncRead,
174    ) -> anyhow::Result<()> {
175        let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
176        let req = WriteFileRequest {
177            path: path.as_ref().to_string(),
178            receiver: recv_pipe,
179        };
180
181        let request_future = self.send.call_failable(PipetteRequest::WriteFile, req);
182
183        let transfer_future = async {
184            let copy_result = futures::io::copy(contents, &mut send_pipe).await;
185            send_pipe.close().await?;
186            copy_result
187        };
188
189        tracing::debug!(path = path.as_ref(), "beginning file wurite transfer");
190        let (bytes_written, io_result) = (request_future, transfer_future.map(Ok))
191            .try_join()
192            .await
193            .context("failed to write file")?;
194        if bytes_written != io_result.context("io failure")? {
195            anyhow::bail!("file truncated");
196        }
197
198        tracing::debug!("file write complete");
199        Ok(())
200    }
201
202    /// Waits for the agent to exit.
203    pub async fn wait(self) -> Result<(), mesh::RecvError> {
204        self.watch.await
205    }
206
207    /// Returns the current time in the guest.
208    pub async fn get_time(&self) -> anyhow::Result<Timestamp> {
209        self.send
210            .call(PipetteRequest::GetTime, ())
211            .await
212            .context("failed to get time")
213    }
214
215    /// Tell the agent to crash itself.
216    pub async fn crash(&self) -> Result<(), RemoteError> {
217        Self::handle_crash_result(self.send.call_failable(PipetteRequest::Crash, ()).await)
218    }
219
220    /// Tell the agent to crash the kernel.
221    pub async fn kernel_crash(&self) -> Result<(), RemoteError> {
222        Self::handle_crash_result(
223            self.send
224                .call_failable(PipetteRequest::KernelCrash, ())
225                .await,
226        )
227    }
228
229    fn handle_crash_result(r: Result<(), RpcError<RemoteError>>) -> Result<(), RemoteError> {
230        match r {
231            Ok(()) => unreachable!(),
232            Err(RpcError::Call(err)) => Err(err),
233            Err(RpcError::Channel(_)) => {
234                // Presumably this is an expected error due to the agent exiting
235                // or the guest crashing.
236                Ok(())
237            }
238        }
239    }
240}
241
242async fn replay_logs(log: mesh::pipe::ReadPipe) {
243    let mut lines = BufReader::new(log).lines();
244    while let Some(line) = lines.next().await {
245        match line {
246            Ok(line) => tracing::info!(target: "pipette", "{}", line),
247            Err(err) => {
248                tracing::error!(
249                    error = &err as &dyn std::error::Error,
250                    "pipette log failure"
251                );
252                break;
253            }
254        }
255    }
256}
257
258async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
259    while let Some(diag_file) = diag_file_recv.next().await {
260        let DiagnosticFile { name, mut receiver } = diag_file;
261        tracing::debug!(name, "receiving diagnostic file");
262        let path = output_dir.join(&name);
263        let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
264        futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
265            .await
266            .expect("failed to write diagnostic file");
267        tracing::debug!(name, "diagnostic file transfer complete");
268
269        #[expect(
270            clippy::disallowed_methods,
271            reason = "ATTACHMENT is most reliable when using true canonicalized paths"
272        )]
273        let canonical_path = path
274            .canonicalize()
275            .expect("failed to canonicalize attachment path");
276        // Use the inline junit syntax to attach the file to the test result.
277        println!("[[ATTACHMENT|{}]]", canonical_path.display());
278    }
279}