1#![forbid(unsafe_code)]
7
8pub mod process;
9mod send;
10pub mod shell;
11
12pub use pipette_protocol::PIPETTE_VSOCK_PORT;
13
14use crate::send::PipetteSender;
15use anyhow::Context;
16use futures::AsyncBufReadExt;
17use futures::AsyncRead;
18use futures::AsyncWrite;
19use futures::AsyncWriteExt;
20use futures::FutureExt as _;
21use futures::StreamExt;
22use futures::io::BufReader;
23use futures_concurrency::future::TryJoin;
24use mesh::error::RemoteError;
25use mesh::payload::Timestamp;
26use mesh::rpc::RpcError;
27use mesh_remote::PointToPointMesh;
28use pal_async::task::Spawn;
29use pal_async::task::Task;
30use pipette_protocol::DiagnosticFile;
31use pipette_protocol::PipetteBootstrap;
32use pipette_protocol::PipetteRequest;
33use pipette_protocol::ReadFileRequest;
34use pipette_protocol::WriteFileRequest;
35use shell::UnixShell;
36use shell::WindowsShell;
37use std::path::Path;
38use std::path::PathBuf;
39
40pub struct PipetteClient {
42 send: PipetteSender,
43 watch: mesh::OneshotReceiver<()>,
44 _mesh: PointToPointMesh,
45 _log_task: Task<()>,
46 _diag_task: Task<()>,
47}
48
49impl PipetteClient {
50 pub async fn new(
55 spawner: impl Spawn,
56 conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
57 output_dir: &Path,
58 ) -> Result<Self, mesh::RecvError> {
59 let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
60 let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
61 let bootstrap = bootstrap_recv.await?;
62
63 let PipetteBootstrap {
64 requests,
65 diag_file_recv,
66 watch,
67 log,
68 } = bootstrap;
69
70 let log_task = spawner.spawn("pipette-log", replay_logs(log));
71 let diag_task = spawner.spawn(
72 "diagnostics-recv",
73 recv_diag_files(output_dir.to_owned(), diag_file_recv),
74 );
75
76 Ok(Self {
77 send: PipetteSender::new(requests),
78 watch,
79 _mesh: mesh,
80 _log_task: log_task,
81 _diag_task: diag_task,
82 })
83 }
84
85 pub async fn ping(&self) -> Result<(), RpcError> {
87 self.send.call(PipetteRequest::Ping, ()).await
88 }
89
90 pub fn windows_shell(&self) -> WindowsShell<'_> {
92 WindowsShell::new(self)
93 }
94
95 pub fn unix_shell(&self) -> UnixShell<'_> {
97 UnixShell::new(self)
98 }
99
100 pub async fn mount(
102 &self,
103 source: &str,
104 target: &str,
105 fstype: &str,
106 flags: u64,
107 mkdir_target: bool,
108 ) -> anyhow::Result<()> {
109 self.send
110 .call_failable(
111 PipetteRequest::Mount,
112 pipette_protocol::MountRequest {
113 source: source.to_owned(),
114 target: target.to_owned(),
115 fstype: fstype.to_owned(),
116 flags,
117 mkdir_target,
118 },
119 )
120 .await
121 .context("failed to send mount request")?;
122 Ok(())
123 }
124
125 pub async fn prepare_chroot(&self, target: &str) -> anyhow::Result<()> {
128 const MS_BIND: u64 = 0x1000;
130 for dir in ["/proc", "/dev", "/sys"] {
131 let mount_target = format!("{target}{dir}");
132 self.mount(dir, &mount_target, "", MS_BIND, true).await?;
133 }
134 let tmp_target = format!("{target}/tmp");
136 self.mount("tmpfs", &tmp_target, "tmpfs", 0, true).await?;
137 Ok(())
138 }
139
140 pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
146 process::Command::new(self, program)
147 }
148
149 pub async fn power_off(&self) -> anyhow::Result<()> {
151 self.shutdown(pipette_protocol::ShutdownType::PowerOff)
152 .await
153 }
154
155 pub async fn reboot(&self) -> anyhow::Result<()> {
157 self.shutdown(pipette_protocol::ShutdownType::Reboot).await
158 }
159
160 async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
161 tracing::debug!(?shutdown_type, "sending shutdown request to guest");
162 let r = self.send.call(
163 PipetteRequest::Shutdown,
164 pipette_protocol::ShutdownRequest { shutdown_type },
165 );
166 match r.await {
167 Ok(r) => r
168 .map_err(anyhow::Error::from)
169 .context("failed to shut down")?,
170 Err(_) => {
171 }
174 }
175 Ok(())
176 }
177
178 pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
180 let (recv_pipe, send_pipe) = mesh::pipe::pipe();
181 let req = ReadFileRequest {
182 path: path.as_ref().to_string(),
183 sender: send_pipe,
184 };
185
186 let request_future = self.send.call_failable(PipetteRequest::ReadFile, req);
187
188 let mut contents = Vec::new();
189 let transfer_future = async { futures::io::copy(recv_pipe, &mut contents).await };
190
191 tracing::debug!(path = path.as_ref(), "beginning file read transfer");
192 let (bytes_read, io_result) = (request_future, transfer_future.map(Ok))
193 .try_join()
194 .await
195 .context("failed to read file")?;
196
197 io_result.context("io failure")?;
198 if bytes_read != contents.len() as u64 {
199 anyhow::bail!("file truncated");
200 }
201
202 tracing::debug!("file read complete");
203 Ok(contents)
204 }
205
206 pub async fn write_file(
211 &self,
212 path: impl AsRef<str>,
213 contents: impl AsyncRead,
214 ) -> anyhow::Result<()> {
215 let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
216 let req = WriteFileRequest {
217 path: path.as_ref().to_string(),
218 receiver: recv_pipe,
219 };
220
221 let request_future = self.send.call_failable(PipetteRequest::WriteFile, req);
222
223 let transfer_future = async {
224 let copy_result = futures::io::copy(contents, &mut send_pipe).await;
225 send_pipe.close().await?;
226 copy_result
227 };
228
229 tracing::debug!(path = path.as_ref(), "beginning file wurite transfer");
230 let (bytes_written, io_result) = (request_future, transfer_future.map(Ok))
231 .try_join()
232 .await
233 .context("failed to write file")?;
234 if bytes_written != io_result.context("io failure")? {
235 anyhow::bail!("file truncated");
236 }
237
238 tracing::debug!("file write complete");
239 Ok(())
240 }
241
242 pub async fn wait(self) -> Result<(), mesh::RecvError> {
244 self.watch.await
245 }
246
247 pub async fn get_time(&self) -> anyhow::Result<Timestamp> {
249 self.send
250 .call(PipetteRequest::GetTime, ())
251 .await
252 .context("failed to get time")
253 }
254
255 pub async fn crash(&self) -> Result<(), RemoteError> {
257 Self::handle_crash_result(self.send.call_failable(PipetteRequest::Crash, ()).await)
258 }
259
260 pub async fn kernel_crash(&self) -> Result<(), RemoteError> {
262 Self::handle_crash_result(
263 self.send
264 .call_failable(PipetteRequest::KernelCrash, ())
265 .await,
266 )
267 }
268
269 fn handle_crash_result(r: Result<(), RpcError<RemoteError>>) -> Result<(), RemoteError> {
270 match r {
271 Ok(()) => unreachable!(),
272 Err(RpcError::Call(err)) => Err(err),
273 Err(RpcError::Channel(_)) => {
274 Ok(())
277 }
278 }
279 }
280}
281
282async fn replay_logs(log: mesh::pipe::ReadPipe) {
283 let mut lines = BufReader::new(log).lines();
284 while let Some(line) = lines.next().await {
285 match line {
286 Ok(line) => tracing::info!(target: "pipette", "{}", line),
287 Err(err) => {
288 tracing::error!(
289 error = &err as &dyn std::error::Error,
290 "pipette log failure"
291 );
292 break;
293 }
294 }
295 }
296}
297
298async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
299 while let Some(diag_file) = diag_file_recv.next().await {
300 let DiagnosticFile { name, mut receiver } = diag_file;
301 tracing::debug!(name, "receiving diagnostic file");
302 let path = output_dir.join(&name);
303 let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
304 futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
305 .await
306 .expect("failed to write diagnostic file");
307 tracing::debug!(name, "diagnostic file transfer complete");
308
309 #[expect(
310 clippy::disallowed_methods,
311 reason = "ATTACHMENT is most reliable when using true canonicalized paths"
312 )]
313 let canonical_path = path
314 .canonicalize()
315 .expect("failed to canonicalize attachment path");
316 println!("[[ATTACHMENT|{}]]", canonical_path.display());
318 }
319}