openvmm_entry/
serial_io.rs1use crate::cleanup_socket;
5use anyhow::Context;
6use futures::StreamExt;
7use futures::stream;
8use futures_concurrency::prelude::*;
9use hvlite_defs::config::SerialPipes;
10use io::ErrorKind;
11use io::Read;
12use pal_async::driver::Driver;
13use pal_async::driver::SpawnDriver;
14use pal_async::pipe::PolledPipe;
15use pal_async::task::Task;
16use serial_socket::net::OpenSocketSerialConfig;
17use std::fs::File;
18use std::io;
19use std::io::Write;
20use std::net::SocketAddr;
21use std::path::Path;
22use std::thread;
23use unix_socket::UnixListener;
24use vm_resource::IntoResource;
25use vm_resource::Resource;
26use vm_resource::kind::SerialBackendHandle;
27
28pub struct SerialIo {
29 pub input: Option<File>,
30 pub output: Option<File>,
31 pub config: SerialPipes,
32}
33
34impl SerialIo {
35 pub fn new() -> io::Result<Self> {
36 let (op, oc) = PolledPipe::file_pair()?;
37 let (ic, ip) = PolledPipe::file_pair()?;
38 Ok(Self {
39 input: Some(ip),
40 output: Some(op),
41 config: SerialPipes {
42 input: Some(ic),
43 output: Some(oc),
44 },
45 })
46 }
47
48 pub fn spawn_copy_out(&mut self, name: &str, mut f: impl Write + Send + 'static) {
49 if let Some(mut output) = self.output.take() {
50 thread::Builder::new()
51 .name(format!("{} copy out", name))
52 .spawn(move || {
53 loop {
54 let mut buf = [0; 256];
55 let n = output.read(&mut buf).unwrap_or(0);
56 if n == 0 {
57 break;
58 }
59 f.write_all(&buf[..n]).expect("BUGBUG");
60 f.flush().expect("BUGBUG");
61 }
62 })
63 .unwrap();
64 }
65 }
66
67 pub fn spawn_copy_listener(
68 &mut self,
69 driver: impl SpawnDriver + Clone,
70 name: &str,
71 path: &Path,
72 ) -> anyhow::Result<Task<()>> {
73 #[cfg(unix)]
74 {
75 use std::os::unix::fs::FileTypeExt;
76 if let Ok(meta) = path.metadata() {
79 if meta.file_type().is_socket() {
80 let _ = std::fs::remove_file(path);
81 }
82 }
83 }
84
85 let mut listener;
86 #[cfg(windows)]
87 {
88 listener = pal_async::windows::pipe::NamedPipeServer::create(path)?;
89 }
90
91 #[cfg(unix)]
92 {
93 listener = pal_async::socket::PolledSocket::new(&driver, UnixListener::bind(path)?)
94 .context("failed to create polled socket for listener")?;
95 }
96
97 let input = self.input.take().unwrap();
98 let output = self.output.take().unwrap();
99 let path = path.to_owned();
100 let mut output =
101 PolledPipe::new(&driver, output).context("failed to create polled pipe")?;
102 let mut input = PolledPipe::new(&driver, input).context("failed to create polled pipe")?;
103
104 let task = driver.spawn(format!("{} copy listener", name), {
105 let driver = driver.clone();
106 async move {
107 loop {
108 if let Err(err) =
109 relay_pipes(&driver, &mut listener, &mut output, &mut input).await
110 {
111 tracing::error!(
112 path = %path.display(),
113 error = err.as_ref() as &dyn std::error::Error,
114 "pipe relay failed"
115 );
116 } else {
117 tracing::debug!(path = %path.display(), "pipe relay done");
118 }
119 }
120 }
121 });
122 Ok(task)
123 }
124}
125
126#[cfg(windows)]
128type SerialListener = pal_async::windows::pipe::NamedPipeServer;
129
130#[cfg(unix)]
132type SerialListener = pal_async::socket::PolledSocket<UnixListener>;
133
134async fn relay_pipes(
135 driver: &impl Driver,
136 left_listener: &mut SerialListener,
137 right_read: &mut PolledPipe,
138 right_write: &mut PolledPipe,
139) -> anyhow::Result<()> {
140 loop {
141 let left_connection;
142 let (left_read, mut left_write);
143
144 #[cfg(windows)]
145 {
146 let pipe = left_listener.accept(driver)?.await?;
147 left_connection = PolledPipe::new(driver, pipe)?;
148 (left_read, left_write) = futures::AsyncReadExt::split(left_connection);
149 }
150
151 #[cfg(unix)]
152 {
153 let (conn, _) = left_listener
154 .accept()
155 .await
156 .context("failed to accept socket")?;
157 left_connection = pal_async::socket::PolledSocket::new(driver, conn)
158 .context("failed to create polled socket for connection")?;
159
160 (left_read, left_write) = left_connection.split();
161 }
162
163 enum Event {
164 LeftToRight(io::Result<u64>),
165 RightToLeft(io::Result<u64>),
166 }
167
168 let a = stream::once(futures::io::copy(&mut *right_read, &mut left_write))
169 .map(Event::LeftToRight);
170 let b = stream::once(futures::io::copy(left_read, right_write)).map(Event::RightToLeft);
171 let mut s = (a, b).merge();
172
173 while let Some(event) = s.next().await {
174 match event {
175 Event::LeftToRight(r) => {
176 let _ = r.context("failed to copy to serial port")?;
177 break;
180 }
181 Event::RightToLeft(r) => {
182 match r {
183 Ok(_) => {
184 return Ok(());
187 }
188 Err(err) if err.kind() == ErrorKind::BrokenPipe => {
189 }
193 Err(err) => {
194 return Err(err).context("failed to copy from serial port");
195 }
196 }
197 }
198 }
199 }
200 }
201}
202
203#[cfg(unix)]
204pub fn anonymous_serial_pair(
205 driver: &(impl Driver + ?Sized),
206) -> io::Result<(
207 Resource<SerialBackendHandle>,
208 pal_async::socket::PolledSocket<unix_socket::UnixStream>,
209)> {
210 let (left, right) = unix_socket::UnixStream::pair()?;
211 let right = pal_async::socket::PolledSocket::new(driver, right)?;
212 Ok((OpenSocketSerialConfig::from(left).into_resource(), right))
213}
214
215#[cfg(windows)]
216pub fn anonymous_serial_pair(
217 driver: &(impl Driver + ?Sized),
218) -> io::Result<(Resource<SerialBackendHandle>, PolledPipe)> {
219 use serial_socket::windows::OpenWindowsPipeSerialConfig;
220
221 let (server, client) = pal::windows::pipe::bidirectional_pair(false)?;
224 let server = PolledPipe::new(driver, server)?;
225 Ok((
229 OpenWindowsPipeSerialConfig::from(client).into_resource(),
230 server,
231 ))
232}
233
234pub fn bind_serial(path: &Path) -> io::Result<Resource<SerialBackendHandle>> {
235 #[cfg(windows)]
236 {
237 use serial_socket::windows::OpenWindowsPipeSerialConfig;
238
239 if path.starts_with("//./pipe") {
240 let pipe = pal::windows::pipe::new_named_pipe(
241 path,
242 winapi::um::winnt::GENERIC_READ | winapi::um::winnt::GENERIC_WRITE,
243 pal::windows::pipe::Disposition::Create,
244 pal::windows::pipe::PipeMode::Byte,
245 )?;
246 return Ok(OpenWindowsPipeSerialConfig::from(pipe).into_resource());
247 }
248 }
249
250 cleanup_socket(path);
251 Ok(OpenSocketSerialConfig::from(UnixListener::bind(path)?).into_resource())
252}
253
254pub fn bind_tcp_serial(addr: &SocketAddr) -> anyhow::Result<Resource<SerialBackendHandle>> {
255 let listener = std::net::TcpListener::bind(addr)
256 .with_context(|| format!("failed to bind tcp address {addr}"))?;
257 Ok(OpenSocketSerialConfig::from(listener).into_resource())
258}