1#![forbid(unsafe_code)]
7
8pub mod process;
9mod send;
10pub mod shell;
11
12pub use pipette_protocol::PIPETTE_VSOCK_PORT;
13
14use crate::send::PipetteSender;
15use anyhow::Context;
16use futures::AsyncBufReadExt;
17use futures::AsyncRead;
18use futures::AsyncWrite;
19use futures::AsyncWriteExt;
20use futures::FutureExt as _;
21use futures::StreamExt;
22use futures::io::BufReader;
23use futures_concurrency::future::TryJoin;
24use mesh::payload::Timestamp;
25use mesh::rpc::RpcError;
26use mesh_remote::PointToPointMesh;
27use pal_async::task::Spawn;
28use pal_async::task::Task;
29use pipette_protocol::DiagnosticFile;
30use pipette_protocol::PipetteBootstrap;
31use pipette_protocol::PipetteRequest;
32use pipette_protocol::ReadFileRequest;
33use pipette_protocol::WriteFileRequest;
34use shell::UnixShell;
35use shell::WindowsShell;
36use std::path::Path;
37use std::path::PathBuf;
38
39pub struct PipetteClient {
41 send: PipetteSender,
42 watch: mesh::OneshotReceiver<()>,
43 _mesh: PointToPointMesh,
44 _log_task: Task<()>,
45 _diag_task: Task<()>,
46}
47
48impl PipetteClient {
49 pub async fn new(
54 spawner: impl Spawn,
55 conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
56 output_dir: &Path,
57 ) -> Result<Self, mesh::RecvError> {
58 let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
59 let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
60 let bootstrap = bootstrap_recv.await?;
61
62 let PipetteBootstrap {
63 requests,
64 diag_file_recv,
65 watch,
66 log,
67 } = bootstrap;
68
69 let log_task = spawner.spawn("pipette-log", replay_logs(log));
70 let diag_task = spawner.spawn(
71 "diagnostics-recv",
72 recv_diag_files(output_dir.to_owned(), diag_file_recv),
73 );
74
75 Ok(Self {
76 send: PipetteSender::new(requests),
77 watch,
78 _mesh: mesh,
79 _log_task: log_task,
80 _diag_task: diag_task,
81 })
82 }
83
84 pub async fn ping(&self) -> Result<(), RpcError> {
86 self.send.call(PipetteRequest::Ping, ()).await
87 }
88
89 pub fn windows_shell(&self) -> WindowsShell<'_> {
91 WindowsShell::new(self)
92 }
93
94 pub fn unix_shell(&self) -> UnixShell<'_> {
96 UnixShell::new(self)
97 }
98
99 pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
105 process::Command::new(self, program)
106 }
107
108 pub async fn power_off(&self) -> anyhow::Result<()> {
110 self.shutdown(pipette_protocol::ShutdownType::PowerOff)
111 .await
112 }
113
114 pub async fn reboot(&self) -> anyhow::Result<()> {
116 self.shutdown(pipette_protocol::ShutdownType::Reboot).await
117 }
118
119 async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
120 let r = self.send.call(
121 PipetteRequest::Shutdown,
122 pipette_protocol::ShutdownRequest { shutdown_type },
123 );
124 match r.await {
125 Ok(r) => r
126 .map_err(anyhow::Error::from)
127 .context("failed to shut down")?,
128 Err(_) => {
129 }
132 }
133 Ok(())
134 }
135
136 pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
138 let (recv_pipe, send_pipe) = mesh::pipe::pipe();
139 let req = ReadFileRequest {
140 path: path.as_ref().to_string(),
141 sender: send_pipe,
142 };
143
144 let request_future = self.send.call_failable(PipetteRequest::ReadFile, req);
145
146 let mut contents = Vec::new();
147 let transfer_future = async { futures::io::copy(recv_pipe, &mut contents).await };
148
149 tracing::debug!(path = path.as_ref(), "beginning file read transfer");
150 let (bytes_read, io_result) = (request_future, transfer_future.map(Ok))
151 .try_join()
152 .await
153 .context("failed to read file")?;
154
155 io_result.context("io failure")?;
156 if bytes_read != contents.len() as u64 {
157 anyhow::bail!("file truncated");
158 }
159
160 tracing::debug!("file read complete");
161 Ok(contents)
162 }
163
164 pub async fn write_file(
169 &self,
170 path: impl AsRef<str>,
171 contents: impl AsyncRead,
172 ) -> anyhow::Result<()> {
173 let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
174 let req = WriteFileRequest {
175 path: path.as_ref().to_string(),
176 receiver: recv_pipe,
177 };
178
179 let request_future = self.send.call_failable(PipetteRequest::WriteFile, req);
180
181 let transfer_future = async {
182 let copy_result = futures::io::copy(contents, &mut send_pipe).await;
183 send_pipe.close().await?;
184 copy_result
185 };
186
187 tracing::debug!(path = path.as_ref(), "beginning file wurite transfer");
188 let (bytes_written, io_result) = (request_future, transfer_future.map(Ok))
189 .try_join()
190 .await
191 .context("failed to write file")?;
192 if bytes_written != io_result.context("io failure")? {
193 anyhow::bail!("file truncated");
194 }
195
196 tracing::debug!("file write complete");
197 Ok(())
198 }
199
200 pub async fn wait(self) -> Result<(), mesh::RecvError> {
202 self.watch.await
203 }
204
205 pub async fn get_time(&self) -> anyhow::Result<Timestamp> {
207 self.send
208 .call(PipetteRequest::GetTime, ())
209 .await
210 .context("failed to get time")
211 }
212}
213
214async fn replay_logs(log: mesh::pipe::ReadPipe) {
215 let mut lines = BufReader::new(log).lines();
216 while let Some(line) = lines.next().await {
217 match line {
218 Ok(line) => tracing::info!(target: "pipette", "{}", line),
219 Err(err) => {
220 tracing::error!(
221 error = &err as &dyn std::error::Error,
222 "pipette log failure"
223 );
224 break;
225 }
226 }
227 }
228}
229
230async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
231 while let Some(diag_file) = diag_file_recv.next().await {
232 let DiagnosticFile { name, mut receiver } = diag_file;
233 tracing::debug!(name, "receiving diagnostic file");
234 let path = output_dir.join(&name);
235 let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
236 futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
237 .await
238 .expect("failed to write diagnostic file");
239 tracing::debug!(name, "diagnostic file transfer complete");
240
241 #[expect(
242 clippy::disallowed_methods,
243 reason = "ATTACHMENT is most reliable when using true canonicalized paths"
244 )]
245 let canonical_path = path
246 .canonicalize()
247 .expect("failed to canonicalize attachment path");
248 println!("[[ATTACHMENT|{}]]", canonical_path.display());
250 }
251}