pipette_client/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! The client for `pipette`.

#![forbid(unsafe_code)]

pub mod process;
mod send;
pub mod shell;

pub use pipette_protocol::PIPETTE_VSOCK_PORT;

use crate::send::PipetteSender;
use anyhow::Context;
use futures::AsyncBufReadExt;
use futures::AsyncRead;
use futures::AsyncWrite;
use futures::AsyncWriteExt;
use futures::StreamExt;
use futures::TryFutureExt;
use futures::io::BufReader;
use futures_concurrency::future::TryJoin;
use mesh::rpc::RpcError;
use mesh_remote::PointToPointMesh;
use pal_async::task::Spawn;
use pal_async::task::Task;
use pipette_protocol::DiagnosticFile;
use pipette_protocol::PipetteBootstrap;
use pipette_protocol::PipetteRequest;
use pipette_protocol::ReadFileRequest;
use pipette_protocol::WriteFileRequest;
use shell::UnixShell;
use shell::WindowsShell;
use std::path::Path;
use std::path::PathBuf;

/// A client to a running `pipette` instance inside a VM.
pub struct PipetteClient {
    send: PipetteSender,
    watch: mesh::OneshotReceiver<()>,
    _mesh: PointToPointMesh,
    _log_task: Task<()>,
    _diag_task: Task<()>,
}

impl PipetteClient {
    /// Connects to a `pipette` instance inside a VM.
    ///
    /// `conn` must be an established connection over some byte stream (e.g., a
    /// socket).
    pub async fn new(
        spawner: impl Spawn,
        conn: impl 'static + AsyncRead + AsyncWrite + Send + Unpin,
        output_dir: &Path,
    ) -> Result<Self, mesh::RecvError> {
        let (bootstrap_send, bootstrap_recv) = mesh::oneshot::<PipetteBootstrap>();
        let mesh = PointToPointMesh::new(&spawner, conn, bootstrap_send.into());
        let bootstrap = bootstrap_recv.await?;

        let PipetteBootstrap {
            requests,
            diag_file_recv,
            watch,
            log,
        } = bootstrap;

        let log_task = spawner.spawn("pipette-log", replay_logs(log));
        let diag_task = spawner.spawn(
            "diagnostics-recv",
            recv_diag_files(output_dir.to_owned(), diag_file_recv),
        );

        Ok(Self {
            send: PipetteSender::new(requests),
            watch,
            _mesh: mesh,
            _log_task: log_task,
            _diag_task: diag_task,
        })
    }

    /// Pings the agent to check if it's alive.
    pub async fn ping(&self) -> Result<(), RpcError> {
        self.send.call(PipetteRequest::Ping, ()).await
    }

    /// Return a shell object to interact with a Windows guest.
    pub fn windows_shell(&self) -> WindowsShell<'_> {
        WindowsShell::new(self)
    }

    /// Return a shell object to interact with a Linux guest.
    pub fn unix_shell(&self) -> UnixShell<'_> {
        UnixShell::new(self)
    }

    /// Returns an object used to launch a command inside the guest.
    ///
    /// TODO: this is a low-level interface. Make a high-level interface like
    /// `xshell::Shell` for manipulating the environment and launching
    /// processes.
    pub fn command(&self, program: impl AsRef<str>) -> process::Command<'_> {
        process::Command::new(self, program)
    }

    /// Sends a request to the guest to power off.
    pub async fn power_off(&self) -> anyhow::Result<()> {
        self.shutdown(pipette_protocol::ShutdownType::PowerOff)
            .await
    }

    /// Sends a request to the guest to reboot.
    pub async fn reboot(&self) -> anyhow::Result<()> {
        self.shutdown(pipette_protocol::ShutdownType::Reboot).await
    }

    async fn shutdown(&self, shutdown_type: pipette_protocol::ShutdownType) -> anyhow::Result<()> {
        let r = self.send.call(
            PipetteRequest::Shutdown,
            pipette_protocol::ShutdownRequest { shutdown_type },
        );
        match r.await {
            Ok(r) => r
                .map_err(anyhow::Error::from)
                .context("failed to shut down")?,
            Err(_) => {
                // Presumably this is an expected error due to the agent exiting
                // or the guest powering off.
            }
        }
        Ok(())
    }

    /// Reads the full contents of a file.
    pub async fn read_file(&self, path: impl AsRef<str>) -> anyhow::Result<Vec<u8>> {
        let (recv_pipe, send_pipe) = mesh::pipe::pipe();
        let req = ReadFileRequest {
            path: path.as_ref().to_string(),
            sender: send_pipe,
        };

        let request_future = self
            .send
            .call(PipetteRequest::ReadFile, req)
            .map_err(anyhow::Error::from);

        let transfer_future = async {
            let mut contents = Vec::new();
            let copy_result = futures::io::copy(recv_pipe, &mut contents).await;
            copy_result.map_err(anyhow::Error::from)?;
            Ok(contents)
        };

        tracing::debug!(path = path.as_ref(), "beginning file read transfer");
        let (request_result, contents) = (request_future, transfer_future).try_join().await?;

        tracing::debug!("file read complete");
        request_result.map_err(anyhow::Error::from)?;
        Ok(contents)
    }

    /// Writes a file to the guest.
    /// Note: This may transfer the file in chunks. It is likely not suitable
    /// for writing to files that require all content to be written at once,
    /// e.g. files in /proc or /sys.
    pub async fn write_file(
        &self,
        path: impl AsRef<str>,
        contents: impl AsyncRead,
    ) -> anyhow::Result<()> {
        let (recv_pipe, mut send_pipe) = mesh::pipe::pipe();
        let req = WriteFileRequest {
            path: path.as_ref().to_string(),
            receiver: recv_pipe,
        };

        let request_future = self
            .send
            .call(PipetteRequest::WriteFile, req)
            .map_err(anyhow::Error::from);

        let transfer_future = async {
            let copy_result = futures::io::copy(contents, &mut send_pipe).await;
            send_pipe.close().await?;
            copy_result.map_err(anyhow::Error::from)
        };

        tracing::debug!(path = path.as_ref(), "beginning file write transfer");
        let (request_result, _bytes_transferred) =
            (request_future, transfer_future).try_join().await?;

        tracing::debug!("file write complete");
        request_result.map_err(anyhow::Error::from)
    }

    /// Waits for the agent to exit.
    pub async fn wait(self) -> Result<(), mesh::RecvError> {
        self.watch.await
    }
}

async fn replay_logs(log: mesh::pipe::ReadPipe) {
    let mut lines = BufReader::new(log).lines();
    while let Some(line) = lines.next().await {
        match line {
            Ok(line) => tracing::info!(target: "pipette", "{}", line),
            Err(err) => {
                tracing::error!(
                    error = &err as &dyn std::error::Error,
                    "pipette log failure"
                );
                break;
            }
        }
    }
}

async fn recv_diag_files(output_dir: PathBuf, mut diag_file_recv: mesh::Receiver<DiagnosticFile>) {
    while let Some(diag_file) = diag_file_recv.next().await {
        let DiagnosticFile { name, mut receiver } = diag_file;
        tracing::debug!(name, "receiving diagnostic file");
        let path = output_dir.join(&name);
        let file = fs_err::File::create(&path).expect("failed to create diagnostic file {name}");
        futures::io::copy(&mut receiver, &mut futures::io::AllowStdIo::new(file))
            .await
            .expect("failed to write diagnostic file");
        tracing::debug!(name, "diagnostic file transfer complete");

        #[expect(
            clippy::disallowed_methods,
            reason = "ATTACHMENT is most reliable when using true canonicalized paths"
        )]
        let canonical_path = path
            .canonicalize()
            .expect("failed to canonicalize attachment path");
        // Use the inline junit syntax to attach the file to the test result.
        println!("[[ATTACHMENT|{}]]", canonical_path.display());
    }
}