#![forbid(unsafe_code)]
pub mod process;
mod send;
pub mod shell;
pub use pipette_protocol::PIPETTE_VSOCK_PORT;
use crate::send::PipetteSender;
use anyhow::Context;
use futures::AsyncBufReadExt;
use futures::AsyncRead;
use futures::AsyncWrite;
use futures::AsyncWriteExt;
use futures::StreamExt;
use futures::TryFutureExt;
use futures::io::BufReader;
use futures_concurrency::future::TryJoin;
use mesh::rpc::RpcError;
use mesh_remote::PointToPointMesh;
use pal_async::task::Spawn;
use pal_async::task::Task;
use pipette_protocol::DiagnosticFile;
use pipette_protocol::PipetteBootstrap;
use pipette_protocol::PipetteRequest;
use pipette_protocol::ReadFileRequest;
use pipette_protocol::WriteFileRequest;
use shell::UnixShell;
use shell::WindowsShell;
use std::path::Path;
use std::path::PathBuf;
pub struct PipetteClient {
send: PipetteSender,
watch: mesh::OneshotReceiver<()>,
_mesh: PointToPointMesh,
_log_task: Task<()>,
_diag_task: Task<()>,
}
impl PipetteClient {
pub async fn new(
spawner: impl Spawn,
conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
output_dir: &Path,
) -> Result<Self, mesh::RecvError> {
let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
let bootstrap = bootstrap_recv.await?;
let PipetteBootstrap {
requests,
diag_file_recv,
watch,
log,
} = bootstrap;
let log_task = spawner.spawn("pipette-log", replay_logs(log));
let diag_task = spawner.spawn(
"diagnostics-recv",
recv_diag_files(output_dir.to_owned(), diag_file_recv),
);
Ok(Self {
send: PipetteSender::new(requests),
watch,
_mesh: mesh,
_log_task: log_task,
_diag_task: diag_task,
})
}
pub async fn ping(&self) -> Result<(), RpcError> {
self.send.call(PipetteRequest::Ping, ()).await
}
pub fn windows_shell(&self) -> WindowsShell<'_> {
WindowsShell::new(self)
}
pub fn unix_shell(&self) -> UnixShell<'_> {
UnixShell::new(self)
}
pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
process::Command::new(self, program)
}
pub async fn power_off(&self) -> anyhow::Result<()> {
self.shutdown(pipette_protocol::ShutdownType::PowerOff)
.await
}
pub async fn reboot(&self) -> anyhow::Result<()> {
self.shutdown(pipette_protocol::ShutdownType::Reboot).await
}
async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
let r = self.send.call(
PipetteRequest::Shutdown,
pipette_protocol::ShutdownRequest { shutdown_type },
);
match r.await {
Ok(r) => r
.map_err(anyhow::Error::from)
.context("failed to shut down")?,
Err(_) => {
}
}
Ok(())
}
pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
let (recv_pipe, send_pipe) = mesh::pipe::pipe();
let req = ReadFileRequest {
path: path.as_ref().to_string(),
sender: send_pipe,
};
let request_future = self
.send
.call(PipetteRequest::ReadFile, req)
.map_err(anyhow::Error::from);
let transfer_future = async {
let mut contents = Vec::new();
let copy_result = futures::io::copy(recv_pipe, &mut contents).await;
copy_result.map_err(anyhow::Error::from)?;
Ok(contents)
};
tracing::debug!(path = path.as_ref(), "beginning file read transfer");
let (request_result, contents) = (request_future, transfer_future).try_join().await?;
tracing::debug!("file read complete");
request_result.map_err(anyhow::Error::from)?;
Ok(contents)
}
pub async fn write_file(
&self,
path: impl AsRef<str>,
contents: impl AsyncRead,
) -> anyhow::Result<()> {
let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
let req = WriteFileRequest {
path: path.as_ref().to_string(),
receiver: recv_pipe,
};
let request_future = self
.send
.call(PipetteRequest::WriteFile, req)
.map_err(anyhow::Error::from);
let transfer_future = async {
let copy_result = futures::io::copy(contents, &mut send_pipe).await;
send_pipe.close().await?;
copy_result.map_err(anyhow::Error::from)
};
tracing::debug!(path = path.as_ref(), "beginning file write transfer");
let (request_result, _bytes_transferred) =
(request_future, transfer_future).try_join().await?;
tracing::debug!("file write complete");
request_result.map_err(anyhow::Error::from)
}
pub async fn wait(self) -> Result<(), mesh::RecvError> {
self.watch.await
}
}
async fn replay_logs(log: mesh::pipe::ReadPipe) {
let mut lines = BufReader::new(log).lines();
while let Some(line) = lines.next().await {
match line {
Ok(line) => tracing::info!(target: "pipette", "{}", line),
Err(err) => {
tracing::error!(
error = &err as &dyn std::error::Error,
"pipette log failure"
);
break;
}
}
}
}
async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
while let Some(diag_file) = diag_file_recv.next().await {
let DiagnosticFile { name, mut receiver } = diag_file;
tracing::debug!(name, "receiving diagnostic file");
let path = output_dir.join(&name);
let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
.await
.expect("failed to write diagnostic file");
tracing::debug!(name, "diagnostic file transfer complete");
#[expect(
clippy::disallowed_methods,
reason = "ATTACHMENT is most reliable when using true canonicalized paths"
)]
let canonical_path = path
.canonicalize()
.expect("failed to canonicalize attachment path");
println!("[[ATTACHMENT|{}]]", canonical_path.display());
}
}