pipette_client/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! The client for `pipette`.
5
6#![forbid(unsafe_code)]
7
8pub mod process;
9mod send;
10pub mod shell;
11
12pub use pipette_protocol::PIPETTE_VSOCK_PORT;
13
14use crate::send::PipetteSender;
15use anyhow::Context;
16use futures::AsyncBufReadExt;
17use futures::AsyncRead;
18use futures::AsyncWrite;
19use futures::AsyncWriteExt;
20use futures::FutureExt as _;
21use futures::StreamExt;
22use futures::io::BufReader;
23use futures_concurrency::future::TryJoin;
24use mesh::payload::Timestamp;
25use mesh::rpc::RpcError;
26use mesh_remote::PointToPointMesh;
27use pal_async::task::Spawn;
28use pal_async::task::Task;
29use pipette_protocol::DiagnosticFile;
30use pipette_protocol::PipetteBootstrap;
31use pipette_protocol::PipetteRequest;
32use pipette_protocol::ReadFileRequest;
33use pipette_protocol::WriteFileRequest;
34use shell::UnixShell;
35use shell::WindowsShell;
36use std::path::Path;
37use std::path::PathBuf;
38
39/// A client to a running `pipette` instance inside a VM.
40pub struct PipetteClient {
41    send: PipetteSender,
42    watch: mesh::OneshotReceiver<()>,
43    _mesh: PointToPointMesh,
44    _log_task: Task<()>,
45    _diag_task: Task<()>,
46}
47
48impl PipetteClient {
49    /// Connects to a `pipette` instance inside a VM.
50    ///
51    /// `conn` must be an established connection over some byte stream (e.g., a
52    /// socket).
53    pub async fn new(
54        spawner: impl Spawn,
55        conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
56        output_dir: &Path,
57    ) -> Result<Self, mesh::RecvError> {
58        let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
59        let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
60        let bootstrap = bootstrap_recv.await?;
61
62        let PipetteBootstrap {
63            requests,
64            diag_file_recv,
65            watch,
66            log,
67        } = bootstrap;
68
69        let log_task = spawner.spawn("pipette-log", replay_logs(log));
70        let diag_task = spawner.spawn(
71            "diagnostics-recv",
72            recv_diag_files(output_dir.to_owned(), diag_file_recv),
73        );
74
75        Ok(Self {
76            send: PipetteSender::new(requests),
77            watch,
78            _mesh: mesh,
79            _log_task: log_task,
80            _diag_task: diag_task,
81        })
82    }
83
84    /// Pings the agent to check if it's alive.
85    pub async fn ping(&self) -> Result<(), RpcError> {
86        self.send.call(PipetteRequest::Ping, ()).await
87    }
88
89    /// Return a shell object to interact with a Windows guest.
90    pub fn windows_shell(&self) -> WindowsShell<'_> {
91        WindowsShell::new(self)
92    }
93
94    /// Return a shell object to interact with a Linux guest.
95    pub fn unix_shell(&self) -> UnixShell<'_> {
96        UnixShell::new(self)
97    }
98
99    /// Returns an object used to launch a command inside the guest.
100    ///
101    /// TODO: this is a low-level interface. Make a high-level interface like
102    /// `xshell::Shell` for manipulating the environment and launching
103    /// processes.
104    pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
105        process::Command::new(self, program)
106    }
107
108    /// Sends a request to the guest to power off.
109    pub async fn power_off(&self) -> anyhow::Result<()> {
110        self.shutdown(pipette_protocol::ShutdownType::PowerOff)
111            .await
112    }
113
114    /// Sends a request to the guest to reboot.
115    pub async fn reboot(&self) -> anyhow::Result<()> {
116        self.shutdown(pipette_protocol::ShutdownType::Reboot).await
117    }
118
119    async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
120        let r = self.send.call(
121            PipetteRequest::Shutdown,
122            pipette_protocol::ShutdownRequest { shutdown_type },
123        );
124        match r.await {
125            Ok(r) => r
126                .map_err(anyhow::Error::from)
127                .context("failed to shut down")?,
128            Err(_) => {
129                // Presumably this is an expected error due to the agent exiting
130                // or the guest powering off.
131            }
132        }
133        Ok(())
134    }
135
136    /// Reads the full contents of a file.
137    pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
138        let (recv_pipe, send_pipe) = mesh::pipe::pipe();
139        let req = ReadFileRequest {
140            path: path.as_ref().to_string(),
141            sender: send_pipe,
142        };
143
144        let request_future = self.send.call_failable(PipetteRequest::ReadFile, req);
145
146        let mut contents = Vec::new();
147        let transfer_future = async { futures::io::copy(recv_pipe, &mut contents).await };
148
149        tracing::debug!(path = path.as_ref(), "beginning file read transfer");
150        let (bytes_read, io_result) = (request_future, transfer_future.map(Ok))
151            .try_join()
152            .await
153            .context("failed to read file")?;
154
155        io_result.context("io failure")?;
156        if bytes_read != contents.len() as u64 {
157            anyhow::bail!("file truncated");
158        }
159
160        tracing::debug!("file read complete");
161        Ok(contents)
162    }
163
164    /// Writes a file to the guest.
165    /// Note: This may transfer the file in chunks. It is likely not suitable
166    /// for writing to files that require all content to be written at once,
167    /// e.g. files in /proc or /sys.
168    pub async fn write_file(
169        &self,
170        path: impl AsRef<str>,
171        contents: impl AsyncRead,
172    ) -> anyhow::Result<()> {
173        let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
174        let req = WriteFileRequest {
175            path: path.as_ref().to_string(),
176            receiver: recv_pipe,
177        };
178
179        let request_future = self.send.call_failable(PipetteRequest::WriteFile, req);
180
181        let transfer_future = async {
182            let copy_result = futures::io::copy(contents, &mut send_pipe).await;
183            send_pipe.close().await?;
184            copy_result
185        };
186
187        tracing::debug!(path = path.as_ref(), "beginning file wurite transfer");
188        let (bytes_written, io_result) = (request_future, transfer_future.map(Ok))
189            .try_join()
190            .await
191            .context("failed to write file")?;
192        if bytes_written != io_result.context("io failure")? {
193            anyhow::bail!("file truncated");
194        }
195
196        tracing::debug!("file write complete");
197        Ok(())
198    }
199
200    /// Waits for the agent to exit.
201    pub async fn wait(self) -> Result<(), mesh::RecvError> {
202        self.watch.await
203    }
204
205    /// Returns the current time in the guest.
206    pub async fn get_time(&self) -> anyhow::Result<Timestamp> {
207        self.send
208            .call(PipetteRequest::GetTime, ())
209            .await
210            .context("failed to get time")
211    }
212}
213
214async fn replay_logs(log: mesh::pipe::ReadPipe) {
215    let mut lines = BufReader::new(log).lines();
216    while let Some(line) = lines.next().await {
217        match line {
218            Ok(line) => tracing::info!(target: "pipette", "{}", line),
219            Err(err) => {
220                tracing::error!(
221                    error = &err as &dyn std::error::Error,
222                    "pipette log failure"
223                );
224                break;
225            }
226        }
227    }
228}
229
230async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
231    while let Some(diag_file) = diag_file_recv.next().await {
232        let DiagnosticFile { name, mut receiver } = diag_file;
233        tracing::debug!(name, "receiving diagnostic file");
234        let path = output_dir.join(&name);
235        let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
236        futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
237            .await
238            .expect("failed to write diagnostic file");
239        tracing::debug!(name, "diagnostic file transfer complete");
240
241        #[expect(
242            clippy::disallowed_methods,
243            reason = "ATTACHMENT is most reliable when using true canonicalized paths"
244        )]
245        let canonical_path = path
246            .canonicalize()
247            .expect("failed to canonicalize attachment path");
248        // Use the inline junit syntax to attach the file to the test result.
249        println!("[[ATTACHMENT|{}]]", canonical_path.display());
250    }
251}