pipette_client/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! The client for `pipette`.
5
6#![forbid(unsafe_code)]
7
8pub mod process;
9mod send;
10pub mod shell;
11
12pub use pipette_protocol::PIPETTE_VSOCK_PORT;
13
14use crate::send::PipetteSender;
15use anyhow::Context;
16use futures::AsyncBufReadExt;
17use futures::AsyncRead;
18use futures::AsyncWrite;
19use futures::AsyncWriteExt;
20use futures::FutureExt as _;
21use futures::StreamExt;
22use futures::io::BufReader;
23use futures_concurrency::future::TryJoin;
24use mesh::payload::Timestamp;
25use mesh::rpc::RpcError;
26use mesh_remote::PointToPointMesh;
27use pal_async::task::Spawn;
28use pal_async::task::Task;
29use pipette_protocol::DiagnosticFile;
30use pipette_protocol::PipetteBootstrap;
31use pipette_protocol::PipetteRequest;
32use pipette_protocol::ReadFileRequest;
33use pipette_protocol::WriteFileRequest;
34use shell::UnixShell;
35use shell::WindowsShell;
36use std::path::Path;
37use std::path::PathBuf;
38
39/// A client to a running `pipette` instance inside a VM.
40pub struct PipetteClient {
41    send: PipetteSender,
42    watch: mesh::OneshotReceiver<()>,
43    _mesh: PointToPointMesh,
44    _log_task: Task<()>,
45    _diag_task: Task<()>,
46}
47
48impl PipetteClient {
49    /// Connects to a `pipette` instance inside a VM.
50    ///
51    /// `conn` must be an established connection over some byte stream (e.g., a
52    /// socket).
53    pub async fn new(
54        spawner: impl Spawn,
55        conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
56        output_dir: &Path,
57    ) -> Result<Self, mesh::RecvError> {
58        let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
59        let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
60        let bootstrap = bootstrap_recv.await?;
61
62        let PipetteBootstrap {
63            requests,
64            diag_file_recv,
65            watch,
66            log,
67        } = bootstrap;
68
69        let log_task = spawner.spawn("pipette-log", replay_logs(log));
70        let diag_task = spawner.spawn(
71            "diagnostics-recv",
72            recv_diag_files(output_dir.to_owned(), diag_file_recv),
73        );
74
75        Ok(Self {
76            send: PipetteSender::new(requests),
77            watch,
78            _mesh: mesh,
79            _log_task: log_task,
80            _diag_task: diag_task,
81        })
82    }
83
84    /// Pings the agent to check if it's alive.
85    pub async fn ping(&self) -> Result<(), RpcError> {
86        self.send.call(PipetteRequest::Ping, ()).await
87    }
88
89    /// Return a shell object to interact with a Windows guest.
90    pub fn windows_shell(&self) -> WindowsShell<'_> {
91        WindowsShell::new(self)
92    }
93
94    /// Return a shell object to interact with a Linux guest.
95    pub fn unix_shell(&self) -> UnixShell<'_> {
96        UnixShell::new(self)
97    }
98
99    /// Returns an object used to launch a command inside the guest.
100    ///
101    /// TODO: this is a low-level interface. Make a high-level interface like
102    /// `xshell::Shell` for manipulating the environment and launching
103    /// processes.
104    pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
105        process::Command::new(self, program)
106    }
107
108    /// Sends a request to the guest to power off.
109    pub async fn power_off(&self) -> anyhow::Result<()> {
110        self.shutdown(pipette_protocol::ShutdownType::PowerOff)
111            .await
112    }
113
114    /// Sends a request to the guest to reboot.
115    pub async fn reboot(&self) -> anyhow::Result<()> {
116        self.shutdown(pipette_protocol::ShutdownType::Reboot).await
117    }
118
119    async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
120        tracing::debug!(?shutdown_type, "sending shutdown request to guest");
121        let r = self.send.call(
122            PipetteRequest::Shutdown,
123            pipette_protocol::ShutdownRequest { shutdown_type },
124        );
125        match r.await {
126            Ok(r) => r
127                .map_err(anyhow::Error::from)
128                .context("failed to shut down")?,
129            Err(_) => {
130                // Presumably this is an expected error due to the agent exiting
131                // or the guest powering off.
132            }
133        }
134        Ok(())
135    }
136
137    /// Reads the full contents of a file.
138    pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
139        let (recv_pipe, send_pipe) = mesh::pipe::pipe();
140        let req = ReadFileRequest {
141            path: path.as_ref().to_string(),
142            sender: send_pipe,
143        };
144
145        let request_future = self.send.call_failable(PipetteRequest::ReadFile, req);
146
147        let mut contents = Vec::new();
148        let transfer_future = async { futures::io::copy(recv_pipe, &mut contents).await };
149
150        tracing::debug!(path = path.as_ref(), "beginning file read transfer");
151        let (bytes_read, io_result) = (request_future, transfer_future.map(Ok))
152            .try_join()
153            .await
154            .context("failed to read file")?;
155
156        io_result.context("io failure")?;
157        if bytes_read != contents.len() as u64 {
158            anyhow::bail!("file truncated");
159        }
160
161        tracing::debug!("file read complete");
162        Ok(contents)
163    }
164
165    /// Writes a file to the guest.
166    /// Note: This may transfer the file in chunks. It is likely not suitable
167    /// for writing to files that require all content to be written at once,
168    /// e.g. files in /proc or /sys.
169    pub async fn write_file(
170        &self,
171        path: impl AsRef<str>,
172        contents: impl AsyncRead,
173    ) -> anyhow::Result<()> {
174        let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
175        let req = WriteFileRequest {
176            path: path.as_ref().to_string(),
177            receiver: recv_pipe,
178        };
179
180        let request_future = self.send.call_failable(PipetteRequest::WriteFile, req);
181
182        let transfer_future = async {
183            let copy_result = futures::io::copy(contents, &mut send_pipe).await;
184            send_pipe.close().await?;
185            copy_result
186        };
187
188        tracing::debug!(path = path.as_ref(), "beginning file wurite transfer");
189        let (bytes_written, io_result) = (request_future, transfer_future.map(Ok))
190            .try_join()
191            .await
192            .context("failed to write file")?;
193        if bytes_written != io_result.context("io failure")? {
194            anyhow::bail!("file truncated");
195        }
196
197        tracing::debug!("file write complete");
198        Ok(())
199    }
200
201    /// Waits for the agent to exit.
202    pub async fn wait(self) -> Result<(), mesh::RecvError> {
203        self.watch.await
204    }
205
206    /// Returns the current time in the guest.
207    pub async fn get_time(&self) -> anyhow::Result<Timestamp> {
208        self.send
209            .call(PipetteRequest::GetTime, ())
210            .await
211            .context("failed to get time")
212    }
213}
214
215async fn replay_logs(log: mesh::pipe::ReadPipe) {
216    let mut lines = BufReader::new(log).lines();
217    while let Some(line) = lines.next().await {
218        match line {
219            Ok(line) => tracing::info!(target: "pipette", "{}", line),
220            Err(err) => {
221                tracing::error!(
222                    error = &err as &dyn std::error::Error,
223                    "pipette log failure"
224                );
225                break;
226            }
227        }
228    }
229}
230
231async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
232    while let Some(diag_file) = diag_file_recv.next().await {
233        let DiagnosticFile { name, mut receiver } = diag_file;
234        tracing::debug!(name, "receiving diagnostic file");
235        let path = output_dir.join(&name);
236        let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
237        futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
238            .await
239            .expect("failed to write diagnostic file");
240        tracing::debug!(name, "diagnostic file transfer complete");
241
242        #[expect(
243            clippy::disallowed_methods,
244            reason = "ATTACHMENT is most reliable when using true canonicalized paths"
245        )]
246        let canonical_path = path
247            .canonicalize()
248            .expect("failed to canonicalize attachment path");
249        // Use the inline junit syntax to attach the file to the test result.
250        println!("[[ATTACHMENT|{}]]", canonical_path.display());
251    }
252}