1#![forbid(unsafe_code)]
7
8pub mod process;
9mod send;
10pub mod shell;
11
12pub use pipette_protocol::PIPETTE_VSOCK_PORT;
13
14use crate::send::PipetteSender;
15use anyhow::Context;
16use futures::AsyncBufReadExt;
17use futures::AsyncRead;
18use futures::AsyncWrite;
19use futures::AsyncWriteExt;
20use futures::FutureExt as _;
21use futures::StreamExt;
22use futures::io::BufReader;
23use futures_concurrency::future::TryJoin;
24use mesh::error::RemoteError;
25use mesh::payload::Timestamp;
26use mesh::rpc::RpcError;
27use mesh_remote::PointToPointMesh;
28use pal_async::task::Spawn;
29use pal_async::task::Task;
30use pipette_protocol::DiagnosticFile;
31use pipette_protocol::PipetteBootstrap;
32use pipette_protocol::PipetteRequest;
33use pipette_protocol::ReadFileRequest;
34use pipette_protocol::WriteFileRequest;
35use shell::UnixShell;
36use shell::WindowsShell;
37use std::path::Path;
38use std::path::PathBuf;
39
40pub struct PipetteClient {
42 send: PipetteSender,
43 watch: mesh::OneshotReceiver<()>,
44 _mesh: PointToPointMesh,
45 _log_task: Task<()>,
46 _diag_task: Task<()>,
47}
48
49impl PipetteClient {
50 pub async fn new(
55 spawner: impl Spawn,
56 conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
57 output_dir: &Path,
58 ) -> Result<Self, mesh::RecvError> {
59 let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
60 let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
61 let bootstrap = bootstrap_recv.await?;
62
63 let PipetteBootstrap {
64 requests,
65 diag_file_recv,
66 watch,
67 log,
68 } = bootstrap;
69
70 let log_task = spawner.spawn("pipette-log", replay_logs(log));
71 let diag_task = spawner.spawn(
72 "diagnostics-recv",
73 recv_diag_files(output_dir.to_owned(), diag_file_recv),
74 );
75
76 Ok(Self {
77 send: PipetteSender::new(requests),
78 watch,
79 _mesh: mesh,
80 _log_task: log_task,
81 _diag_task: diag_task,
82 })
83 }
84
85 pub async fn ping(&self) -> Result<(), RpcError> {
87 self.send.call(PipetteRequest::Ping, ()).await
88 }
89
90 pub fn windows_shell(&self) -> WindowsShell<'_> {
92 WindowsShell::new(self)
93 }
94
95 pub fn unix_shell(&self) -> UnixShell<'_> {
97 UnixShell::new(self)
98 }
99
100 pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
106 process::Command::new(self, program)
107 }
108
109 pub async fn power_off(&self) -> anyhow::Result<()> {
111 self.shutdown(pipette_protocol::ShutdownType::PowerOff)
112 .await
113 }
114
115 pub async fn reboot(&self) -> anyhow::Result<()> {
117 self.shutdown(pipette_protocol::ShutdownType::Reboot).await
118 }
119
120 async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
121 tracing::debug!(?shutdown_type, "sending shutdown request to guest");
122 let r = self.send.call(
123 PipetteRequest::Shutdown,
124 pipette_protocol::ShutdownRequest { shutdown_type },
125 );
126 match r.await {
127 Ok(r) => r
128 .map_err(anyhow::Error::from)
129 .context("failed to shut down")?,
130 Err(_) => {
131 }
134 }
135 Ok(())
136 }
137
138 pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
140 let (recv_pipe, send_pipe) = mesh::pipe::pipe();
141 let req = ReadFileRequest {
142 path: path.as_ref().to_string(),
143 sender: send_pipe,
144 };
145
146 let request_future = self.send.call_failable(PipetteRequest::ReadFile, req);
147
148 let mut contents = Vec::new();
149 let transfer_future = async { futures::io::copy(recv_pipe, &mut contents).await };
150
151 tracing::debug!(path = path.as_ref(), "beginning file read transfer");
152 let (bytes_read, io_result) = (request_future, transfer_future.map(Ok))
153 .try_join()
154 .await
155 .context("failed to read file")?;
156
157 io_result.context("io failure")?;
158 if bytes_read != contents.len() as u64 {
159 anyhow::bail!("file truncated");
160 }
161
162 tracing::debug!("file read complete");
163 Ok(contents)
164 }
165
166 pub async fn write_file(
171 &self,
172 path: impl AsRef<str>,
173 contents: impl AsyncRead,
174 ) -> anyhow::Result<()> {
175 let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
176 let req = WriteFileRequest {
177 path: path.as_ref().to_string(),
178 receiver: recv_pipe,
179 };
180
181 let request_future = self.send.call_failable(PipetteRequest::WriteFile, req);
182
183 let transfer_future = async {
184 let copy_result = futures::io::copy(contents, &mut send_pipe).await;
185 send_pipe.close().await?;
186 copy_result
187 };
188
189 tracing::debug!(path = path.as_ref(), "beginning file wurite transfer");
190 let (bytes_written, io_result) = (request_future, transfer_future.map(Ok))
191 .try_join()
192 .await
193 .context("failed to write file")?;
194 if bytes_written != io_result.context("io failure")? {
195 anyhow::bail!("file truncated");
196 }
197
198 tracing::debug!("file write complete");
199 Ok(())
200 }
201
202 pub async fn wait(self) -> Result<(), mesh::RecvError> {
204 self.watch.await
205 }
206
207 pub async fn get_time(&self) -> anyhow::Result<Timestamp> {
209 self.send
210 .call(PipetteRequest::GetTime, ())
211 .await
212 .context("failed to get time")
213 }
214
215 pub async fn crash(&self) -> Result<(), RemoteError> {
217 Self::handle_crash_result(self.send.call_failable(PipetteRequest::Crash, ()).await)
218 }
219
220 pub async fn kernel_crash(&self) -> Result<(), RemoteError> {
222 Self::handle_crash_result(
223 self.send
224 .call_failable(PipetteRequest::KernelCrash, ())
225 .await,
226 )
227 }
228
229 fn handle_crash_result(r: Result<(), RpcError<RemoteError>>) -> Result<(), RemoteError> {
230 match r {
231 Ok(()) => unreachable!(),
232 Err(RpcError::Call(err)) => Err(err),
233 Err(RpcError::Channel(_)) => {
234 Ok(())
237 }
238 }
239 }
240}
241
242async fn replay_logs(log: mesh::pipe::ReadPipe) {
243 let mut lines = BufReader::new(log).lines();
244 while let Some(line) = lines.next().await {
245 match line {
246 Ok(line) => tracing::info!(target: "pipette", "{}", line),
247 Err(err) => {
248 tracing::error!(
249 error = &err as &dyn std::error::Error,
250 "pipette log failure"
251 );
252 break;
253 }
254 }
255 }
256}
257
258async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
259 while let Some(diag_file) = diag_file_recv.next().await {
260 let DiagnosticFile { name, mut receiver } = diag_file;
261 tracing::debug!(name, "receiving diagnostic file");
262 let path = output_dir.join(&name);
263 let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
264 futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
265 .await
266 .expect("failed to write diagnostic file");
267 tracing::debug!(name, "diagnostic file transfer complete");
268
269 #[expect(
270 clippy::disallowed_methods,
271 reason = "ATTACHMENT is most reliable when using true canonicalized paths"
272 )]
273 let canonical_path = path
274 .canonicalize()
275 .expect("failed to canonicalize attachment path");
276 println!("[[ATTACHMENT|{}]]", canonical_path.display());
278 }
279}