1#![forbid(unsafe_code)]
7
8pub mod process;
9mod send;
10pub mod shell;
11
12pub use pipette_protocol::PIPETTE_VSOCK_PORT;
13
14use crate::send::PipetteSender;
15use anyhow::Context;
16use futures::AsyncBufReadExt;
17use futures::AsyncRead;
18use futures::AsyncWrite;
19use futures::AsyncWriteExt;
20use futures::FutureExt as _;
21use futures::StreamExt;
22use futures::io::BufReader;
23use futures_concurrency::future::TryJoin;
24use mesh::payload::Timestamp;
25use mesh::rpc::RpcError;
26use mesh_remote::PointToPointMesh;
27use pal_async::task::Spawn;
28use pal_async::task::Task;
29use pipette_protocol::DiagnosticFile;
30use pipette_protocol::PipetteBootstrap;
31use pipette_protocol::PipetteRequest;
32use pipette_protocol::ReadFileRequest;
33use pipette_protocol::WriteFileRequest;
34use shell::UnixShell;
35use shell::WindowsShell;
36use std::path::Path;
37use std::path::PathBuf;
38
39pub struct PipetteClient {
41 send: PipetteSender,
42 watch: mesh::OneshotReceiver<()>,
43 _mesh: PointToPointMesh,
44 _log_task: Task<()>,
45 _diag_task: Task<()>,
46}
47
48impl PipetteClient {
49 pub async fn new(
54 spawner: impl Spawn,
55 conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
56 output_dir: &Path,
57 ) -> Result<Self, mesh::RecvError> {
58 let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
59 let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
60 let bootstrap = bootstrap_recv.await?;
61
62 let PipetteBootstrap {
63 requests,
64 diag_file_recv,
65 watch,
66 log,
67 } = bootstrap;
68
69 let log_task = spawner.spawn("pipette-log", replay_logs(log));
70 let diag_task = spawner.spawn(
71 "diagnostics-recv",
72 recv_diag_files(output_dir.to_owned(), diag_file_recv),
73 );
74
75 Ok(Self {
76 send: PipetteSender::new(requests),
77 watch,
78 _mesh: mesh,
79 _log_task: log_task,
80 _diag_task: diag_task,
81 })
82 }
83
84 pub async fn ping(&self) -> Result<(), RpcError> {
86 self.send.call(PipetteRequest::Ping, ()).await
87 }
88
89 pub fn windows_shell(&self) -> WindowsShell<'_> {
91 WindowsShell::new(self)
92 }
93
94 pub fn unix_shell(&self) -> UnixShell<'_> {
96 UnixShell::new(self)
97 }
98
99 pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
105 process::Command::new(self, program)
106 }
107
108 pub async fn power_off(&self) -> anyhow::Result<()> {
110 self.shutdown(pipette_protocol::ShutdownType::PowerOff)
111 .await
112 }
113
114 pub async fn reboot(&self) -> anyhow::Result<()> {
116 self.shutdown(pipette_protocol::ShutdownType::Reboot).await
117 }
118
119 async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
120 tracing::debug!(?shutdown_type, "sending shutdown request to guest");
121 let r = self.send.call(
122 PipetteRequest::Shutdown,
123 pipette_protocol::ShutdownRequest { shutdown_type },
124 );
125 match r.await {
126 Ok(r) => r
127 .map_err(anyhow::Error::from)
128 .context("failed to shut down")?,
129 Err(_) => {
130 }
133 }
134 Ok(())
135 }
136
137 pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
139 let (recv_pipe, send_pipe) = mesh::pipe::pipe();
140 let req = ReadFileRequest {
141 path: path.as_ref().to_string(),
142 sender: send_pipe,
143 };
144
145 let request_future = self.send.call_failable(PipetteRequest::ReadFile, req);
146
147 let mut contents = Vec::new();
148 let transfer_future = async { futures::io::copy(recv_pipe, &mut contents).await };
149
150 tracing::debug!(path = path.as_ref(), "beginning file read transfer");
151 let (bytes_read, io_result) = (request_future, transfer_future.map(Ok))
152 .try_join()
153 .await
154 .context("failed to read file")?;
155
156 io_result.context("io failure")?;
157 if bytes_read != contents.len() as u64 {
158 anyhow::bail!("file truncated");
159 }
160
161 tracing::debug!("file read complete");
162 Ok(contents)
163 }
164
165 pub async fn write_file(
170 &self,
171 path: impl AsRef<str>,
172 contents: impl AsyncRead,
173 ) -> anyhow::Result<()> {
174 let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
175 let req = WriteFileRequest {
176 path: path.as_ref().to_string(),
177 receiver: recv_pipe,
178 };
179
180 let request_future = self.send.call_failable(PipetteRequest::WriteFile, req);
181
182 let transfer_future = async {
183 let copy_result = futures::io::copy(contents, &mut send_pipe).await;
184 send_pipe.close().await?;
185 copy_result
186 };
187
188 tracing::debug!(path = path.as_ref(), "beginning file wurite transfer");
189 let (bytes_written, io_result) = (request_future, transfer_future.map(Ok))
190 .try_join()
191 .await
192 .context("failed to write file")?;
193 if bytes_written != io_result.context("io failure")? {
194 anyhow::bail!("file truncated");
195 }
196
197 tracing::debug!("file write complete");
198 Ok(())
199 }
200
201 pub async fn wait(self) -> Result<(), mesh::RecvError> {
203 self.watch.await
204 }
205
206 pub async fn get_time(&self) -> anyhow::Result<Timestamp> {
208 self.send
209 .call(PipetteRequest::GetTime, ())
210 .await
211 .context("failed to get time")
212 }
213}
214
215async fn replay_logs(log: mesh::pipe::ReadPipe) {
216 let mut lines = BufReader::new(log).lines();
217 while let Some(line) = lines.next().await {
218 match line {
219 Ok(line) => tracing::info!(target: "pipette", "{}", line),
220 Err(err) => {
221 tracing::error!(
222 error = &err as &dyn std::error::Error,
223 "pipette log failure"
224 );
225 break;
226 }
227 }
228 }
229}
230
231async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
232 while let Some(diag_file) = diag_file_recv.next().await {
233 let DiagnosticFile { name, mut receiver } = diag_file;
234 tracing::debug!(name, "receiving diagnostic file");
235 let path = output_dir.join(&name);
236 let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
237 futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
238 .await
239 .expect("failed to write diagnostic file");
240 tracing::debug!(name, "diagnostic file transfer complete");
241
242 #[expect(
243 clippy::disallowed_methods,
244 reason = "ATTACHMENT is most reliable when using true canonicalized paths"
245 )]
246 let canonical_path = path
247 .canonicalize()
248 .expect("failed to canonicalize attachment path");
249 println!("[[ATTACHMENT|{}]]", canonical_path.display());
251 }
252}