openvmm_entry/
serial_io.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use crate::cleanup_socket;
use anyhow::Context;
use futures::StreamExt;
use futures::stream;
use futures_concurrency::prelude::*;
use hvlite_defs::config::SerialPipes;
use io::ErrorKind;
use io::Read;
use pal_async::driver::Driver;
use pal_async::driver::SpawnDriver;
use pal_async::pipe::PolledPipe;
use pal_async::task::Task;
use serial_socket::net::OpenSocketSerialConfig;
use std::fs::File;
use std::io;
use std::io::Write;
use std::net::SocketAddr;
use std::path::Path;
use std::thread;
use unix_socket::UnixListener;
use vm_resource::IntoResource;
use vm_resource::Resource;
use vm_resource::kind::SerialBackendHandle;

pub struct SerialIo {
    pub input: Option<File>,
    pub output: Option<File>,
    pub config: SerialPipes,
}

impl SerialIo {
    pub fn new() -> io::Result<Self> {
        let (op, oc) = PolledPipe::file_pair()?;
        let (ic, ip) = PolledPipe::file_pair()?;
        Ok(Self {
            input: Some(ip),
            output: Some(op),
            config: SerialPipes {
                input: Some(ic),
                output: Some(oc),
            },
        })
    }

    pub fn spawn_copy_out(&mut self, name: &str, mut f: impl Write + Send + 'static) {
        if let Some(mut output) = self.output.take() {
            thread::Builder::new()
                .name(format!("{} copy out", name))
                .spawn(move || {
                    loop {
                        let mut buf = [0; 256];
                        let n = output.read(&mut buf).unwrap_or(0);
                        if n == 0 {
                            break;
                        }
                        f.write_all(&buf[..n]).expect("BUGBUG");
                        f.flush().expect("BUGBUG");
                    }
                })
                .unwrap();
        }
    }

    pub fn spawn_copy_listener(
        &mut self,
        driver: impl SpawnDriver + Clone,
        name: &str,
        path: &Path,
    ) -> anyhow::Result<Task<()>> {
        #[cfg(unix)]
        {
            use std::os::unix::fs::FileTypeExt;
            // Delete the specified path if it's a socket so that we can rebind
            // to the same path.
            if let Ok(meta) = path.metadata() {
                if meta.file_type().is_socket() {
                    let _ = std::fs::remove_file(path);
                }
            }
        }

        let mut listener;
        #[cfg(windows)]
        {
            listener = pal_async::windows::pipe::NamedPipeServer::create(path)?;
        }

        #[cfg(unix)]
        {
            listener = pal_async::socket::PolledSocket::new(&driver, UnixListener::bind(path)?)
                .context("failed to create polled socket for listener")?;
        }

        let input = self.input.take().unwrap();
        let output = self.output.take().unwrap();
        let path = path.to_owned();
        let mut output =
            PolledPipe::new(&driver, output).context("failed to create polled pipe")?;
        let mut input = PolledPipe::new(&driver, input).context("failed to create polled pipe")?;

        let task = driver.spawn(format!("{} copy listener", name), {
            let driver = driver.clone();
            async move {
                loop {
                    if let Err(err) =
                        relay_pipes(&driver, &mut listener, &mut output, &mut input).await
                    {
                        tracing::error!(
                            path = %path.display(),
                            error = err.as_ref() as &dyn std::error::Error,
                            "pipe relay failed"
                        );
                    } else {
                        tracing::debug!(path = %path.display(), "pipe relay done");
                    }
                }
            }
        });
        Ok(task)
    }
}

// On Windows, serial listeners are backed by named pipes.
#[cfg(windows)]
type SerialListener = pal_async::windows::pipe::NamedPipeServer;

// On Unix, serial listeners are backed by Unix sockets.
#[cfg(unix)]
type SerialListener = pal_async::socket::PolledSocket<UnixListener>;

async fn relay_pipes(
    driver: &impl Driver,
    left_listener: &mut SerialListener,
    right_read: &mut PolledPipe,
    right_write: &mut PolledPipe,
) -> anyhow::Result<()> {
    loop {
        let left_connection;
        let (left_read, mut left_write);

        #[cfg(windows)]
        {
            let pipe = left_listener.accept(driver)?.await?;
            left_connection = PolledPipe::new(driver, pipe)?;
            (left_read, left_write) = futures::AsyncReadExt::split(left_connection);
        }

        #[cfg(unix)]
        {
            let (conn, _) = left_listener
                .accept()
                .await
                .context("failed to accept socket")?;
            left_connection = pal_async::socket::PolledSocket::new(driver, conn)
                .context("failed to create polled socket for connection")?;

            (left_read, left_write) = left_connection.split();
        }

        enum Event {
            LeftToRight(io::Result<u64>),
            RightToLeft(io::Result<u64>),
        }

        let a = stream::once(futures::io::copy(&mut *right_read, &mut left_write))
            .map(Event::LeftToRight);
        let b = stream::once(futures::io::copy(left_read, right_write)).map(Event::RightToLeft);
        let mut s = (a, b).merge();

        while let Some(event) = s.next().await {
            match event {
                Event::LeftToRight(r) => {
                    let _ = r.context("failed to copy to serial port")?;
                    // The client disconnected, so break out of this loop to
                    // wait for another connection.
                    break;
                }
                Event::RightToLeft(r) => {
                    match r {
                        Ok(_) => {
                            // The VM disconnected, so it is not waiting for any
                            // more data. Break out.
                            return Ok(());
                        }
                        Err(err) if err.kind() == ErrorKind::BrokenPipe => {
                            // The client disconnected. Continue in this loop to
                            // drain anything in the client's buffer before
                            // accepting a new connection.
                        }
                        Err(err) => {
                            return Err(err).context("failed to copy from serial port");
                        }
                    }
                }
            }
        }
    }
}

#[cfg(unix)]
pub fn anonymous_serial_pair(
    driver: &(impl Driver + ?Sized),
) -> io::Result<(
    Resource<SerialBackendHandle>,
    pal_async::socket::PolledSocket<unix_socket::UnixStream>,
)> {
    let (left, right) = unix_socket::UnixStream::pair()?;
    let right = pal_async::socket::PolledSocket::new(driver, right)?;
    Ok((OpenSocketSerialConfig::from(left).into_resource(), right))
}

#[cfg(windows)]
pub fn anonymous_serial_pair(
    driver: &(impl Driver + ?Sized),
) -> io::Result<(Resource<SerialBackendHandle>, PolledPipe)> {
    use serial_socket::windows::OpenWindowsPipeSerialConfig;

    // Use named pipes on Windows even though we also support Unix sockets
    // there. This avoids an unnecessary winsock dependency.
    let (server, client) = pal::windows::pipe::bidirectional_pair(false)?;
    let server = PolledPipe::new(driver, server)?;
    // Use the client for the VM side so that it does not try to reconnect
    // (which isn't possible via pal_async for pipes opened in non-overlapped
    // mode, anyway).
    Ok((
        OpenWindowsPipeSerialConfig::from(client).into_resource(),
        server,
    ))
}

pub fn bind_serial(path: &Path) -> io::Result<Resource<SerialBackendHandle>> {
    #[cfg(windows)]
    {
        use serial_socket::windows::OpenWindowsPipeSerialConfig;

        if path.starts_with("//./pipe") {
            let pipe = pal::windows::pipe::new_named_pipe(
                path,
                winapi::um::winnt::GENERIC_READ | winapi::um::winnt::GENERIC_WRITE,
                pal::windows::pipe::Disposition::Create,
                pal::windows::pipe::PipeMode::Byte,
            )?;
            return Ok(OpenWindowsPipeSerialConfig::from(pipe).into_resource());
        }
    }

    cleanup_socket(path);
    Ok(OpenSocketSerialConfig::from(UnixListener::bind(path)?).into_resource())
}

pub fn bind_tcp_serial(addr: &SocketAddr) -> anyhow::Result<Resource<SerialBackendHandle>> {
    let listener = std::net::TcpListener::bind(addr)
        .with_context(|| format!("failed to bind tcp address {addr}"))?;
    Ok(OpenSocketSerialConfig::from(listener).into_resource())
}