1#![forbid(unsafe_code)]
7
8pub mod process;
9mod send;
10pub mod shell;
11
12pub use pipette_protocol::PIPETTE_PORT;
13pub use pipette_protocol::PIPETTE_READY_MARKER;
14
15use crate::send::PipetteSender;
16use anyhow::Context;
17use futures::AsyncBufReadExt;
18use futures::AsyncRead;
19use futures::AsyncWrite;
20use futures::AsyncWriteExt;
21use futures::FutureExt as _;
22use futures::StreamExt;
23use futures::io::BufReader;
24use futures_concurrency::future::TryJoin;
25use mesh::error::RemoteError;
26use mesh::payload::Timestamp;
27use mesh::rpc::RpcError;
28use mesh_remote::PointToPointMesh;
29use pal_async::task::Spawn;
30use pal_async::task::Task;
31use pipette_protocol::DiagnosticFile;
32use pipette_protocol::PipetteBootstrap;
33use pipette_protocol::PipetteRequest;
34use pipette_protocol::ReadFileRequest;
35use pipette_protocol::WriteFileRequest;
36use shell::UnixShell;
37use shell::WindowsShell;
38use std::path::Path;
39use std::path::PathBuf;
40
41pub struct PipetteClient {
43 send: PipetteSender,
44 watch: mesh::OneshotReceiver<()>,
45 _mesh: PointToPointMesh,
46 _log_task: Task<()>,
47 _diag_task: Task<()>,
48}
49
50impl PipetteClient {
51 pub async fn new(
56 spawner: impl Spawn,
57 conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
58 output_dir: &Path,
59 ) -> Result<Self, mesh::RecvError> {
60 let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
61 let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
62 let bootstrap = bootstrap_recv.await?;
63
64 let PipetteBootstrap {
65 requests,
66 diag_file_recv,
67 watch,
68 log,
69 } = bootstrap;
70
71 let log_task = spawner.spawn("pipette-log", replay_logs(log));
72 let diag_task = spawner.spawn(
73 "diagnostics-recv",
74 recv_diag_files(output_dir.to_owned(), diag_file_recv),
75 );
76
77 Ok(Self {
78 send: PipetteSender::new(requests),
79 watch,
80 _mesh: mesh,
81 _log_task: log_task,
82 _diag_task: diag_task,
83 })
84 }
85
86 pub async fn ping(&self) -> Result<(), RpcError> {
88 self.send.call(PipetteRequest::Ping, ()).await
89 }
90
91 pub fn windows_shell(&self) -> WindowsShell<'_> {
93 WindowsShell::new(self)
94 }
95
96 pub fn unix_shell(&self) -> UnixShell<'_> {
98 UnixShell::new(self)
99 }
100
101 pub async fn mount(
103 &self,
104 source: &str,
105 target: &str,
106 fstype: &str,
107 flags: u64,
108 mkdir_target: bool,
109 ) -> anyhow::Result<()> {
110 self.send
111 .call_failable(
112 PipetteRequest::Mount,
113 pipette_protocol::MountRequest {
114 source: source.to_owned(),
115 target: target.to_owned(),
116 fstype: fstype.to_owned(),
117 flags,
118 mkdir_target,
119 },
120 )
121 .await
122 .context("failed to send mount request")?;
123 Ok(())
124 }
125
126 pub async fn prepare_chroot(&self, target: &str) -> anyhow::Result<()> {
129 const MS_BIND: u64 = 0x1000;
131 for dir in ["/proc", "/dev", "/sys"] {
132 let mount_target = format!("{target}{dir}");
133 self.mount(dir, &mount_target, "", MS_BIND, true).await?;
134 }
135 let tmp_target = format!("{target}/tmp");
137 self.mount("tmpfs", &tmp_target, "tmpfs", 0, true).await?;
138 Ok(())
139 }
140
141 pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
147 process::Command::new(self, program)
148 }
149
150 pub async fn power_off(&self) -> anyhow::Result<()> {
152 self.shutdown(pipette_protocol::ShutdownType::PowerOff)
153 .await
154 }
155
156 pub async fn reboot(&self) -> anyhow::Result<()> {
158 self.shutdown(pipette_protocol::ShutdownType::Reboot).await
159 }
160
161 async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
162 tracing::debug!(?shutdown_type, "sending shutdown request to guest");
163 let r = self.send.call(
164 PipetteRequest::Shutdown,
165 pipette_protocol::ShutdownRequest { shutdown_type },
166 );
167 match r.await {
168 Ok(r) => r
169 .map_err(anyhow::Error::from)
170 .context("failed to shut down")?,
171 Err(_) => {
172 }
175 }
176 Ok(())
177 }
178
179 pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
181 let (recv_pipe, send_pipe) = mesh::pipe::pipe();
182 let req = ReadFileRequest {
183 path: path.as_ref().to_string(),
184 sender: send_pipe,
185 };
186
187 let request_future = self.send.call_failable(PipetteRequest::ReadFile, req);
188
189 let mut contents = Vec::new();
190 let transfer_future = async { futures::io::copy(recv_pipe, &mut contents).await };
191
192 tracing::debug!(path = path.as_ref(), "beginning file read transfer");
193 let (bytes_read, io_result) = (request_future, transfer_future.map(Ok))
194 .try_join()
195 .await
196 .context("failed to read file")?;
197
198 io_result.context("io failure")?;
199 if bytes_read != contents.len() as u64 {
200 anyhow::bail!("file truncated");
201 }
202
203 tracing::debug!("file read complete");
204 Ok(contents)
205 }
206
207 pub async fn write_file(
212 &self,
213 path: impl AsRef<str>,
214 contents: impl AsyncRead,
215 ) -> anyhow::Result<()> {
216 let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
217 let req = WriteFileRequest {
218 path: path.as_ref().to_string(),
219 receiver: recv_pipe,
220 };
221
222 let request_future = self.send.call_failable(PipetteRequest::WriteFile, req);
223
224 let transfer_future = async {
225 let copy_result = futures::io::copy(contents, &mut send_pipe).await;
226 send_pipe.close().await?;
227 copy_result
228 };
229
230 tracing::debug!(path = path.as_ref(), "beginning file wurite transfer");
231 let (bytes_written, io_result) = (request_future, transfer_future.map(Ok))
232 .try_join()
233 .await
234 .context("failed to write file")?;
235 if bytes_written != io_result.context("io failure")? {
236 anyhow::bail!("file truncated");
237 }
238
239 tracing::debug!("file write complete");
240 Ok(())
241 }
242
243 pub async fn wait(self) -> Result<(), mesh::RecvError> {
245 self.watch.await
246 }
247
248 pub async fn get_time(&self) -> anyhow::Result<Timestamp> {
250 self.send
251 .call(PipetteRequest::GetTime, ())
252 .await
253 .context("failed to get time")
254 }
255
256 pub async fn crash(&self) -> Result<(), RemoteError> {
258 Self::handle_crash_result(self.send.call_failable(PipetteRequest::Crash, ()).await)
259 }
260
261 pub async fn kernel_crash(&self) -> Result<(), RemoteError> {
263 Self::handle_crash_result(
264 self.send
265 .call_failable(PipetteRequest::KernelCrash, ())
266 .await,
267 )
268 }
269
270 fn handle_crash_result(r: Result<(), RpcError<RemoteError>>) -> Result<(), RemoteError> {
271 match r {
272 Ok(()) => unreachable!(),
273 Err(RpcError::Call(err)) => Err(err),
274 Err(RpcError::Channel(_)) => {
275 Ok(())
278 }
279 }
280 }
281}
282
283async fn replay_logs(log: mesh::pipe::ReadPipe) {
284 let mut lines = BufReader::new(log).lines();
285 while let Some(line) = lines.next().await {
286 match line {
287 Ok(line) => tracing::debug!(target: "pipette", "{}", line),
288 Err(err) => {
289 tracing::error!(
290 error = &err as &dyn std::error::Error,
291 "pipette log failure"
292 );
293 break;
294 }
295 }
296 }
297}
298
299async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
300 while let Some(diag_file) = diag_file_recv.next().await {
301 let DiagnosticFile { name, mut receiver } = diag_file;
302 tracing::debug!(name, "receiving diagnostic file");
303 let path = output_dir.join(&name);
304 let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
305 futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
306 .await
307 .expect("failed to write diagnostic file");
308 tracing::debug!(name, "diagnostic file transfer complete");
309
310 #[expect(
311 clippy::disallowed_methods,
312 reason = "ATTACHMENT is most reliable when using true canonicalized paths"
313 )]
314 let canonical_path = path
315 .canonicalize()
316 .expect("failed to canonicalize attachment path");
317 println!("[[ATTACHMENT|{}]]", canonical_path.display());
319 }
320}