vmbus_user_channel/
lib.rs1#![cfg(unix)]
33#![forbid(unsafe_code)]
34
35use filepath::FilePath;
36use guid::Guid;
37use pal_async::driver::Driver;
38use pal_async::wait::PolledWait;
39use parking_lot::Mutex;
40use safeatomic::AtomicSliceOps;
41use sparse_mmap::SparseMapping;
42use std::fs::File;
43use std::io::Write;
44use std::path::Path;
45use std::sync::Arc;
46use std::task::ready;
47use thiserror::Error;
48use vmbus_async::pipe::BytePipe;
49use vmbus_async::pipe::MessagePipe;
50use vmbus_channel::RawAsyncChannel;
51use vmbus_channel::SignalVmbusChannel;
52use vmbus_ring::IncomingRing;
53use vmbus_ring::OutgoingRing;
54use vmbus_ring::RingMem;
55use zerocopy::IntoBytes;
56
57#[derive(Debug)]
59pub struct MappedRingMem {
60 mapping: Arc<SparseMapping>,
61 offset: usize,
62 len: usize,
63}
64
65const CONTROL_SIZE: usize = 0x1000;
66const OUT_RING_SIZE: usize = 0x10000;
70const IN_RING_SIZE: usize = 0x10000;
71
72impl RingMem for MappedRingMem {
73 fn control(&self) -> &[std::sync::atomic::AtomicU32; vmbus_ring::CONTROL_WORD_COUNT] {
74 self.mapping
75 .atomic_slice(self.offset, CONTROL_SIZE)
76 .as_atomic_slice()
77 .unwrap()[..vmbus_ring::CONTROL_WORD_COUNT]
78 .try_into()
79 .unwrap()
80 }
81
82 fn read_at(&self, mut addr: usize, data: &mut [u8]) {
83 debug_assert!(addr + data.len() <= 2 * self.len);
84 if addr > self.len() {
85 addr -= self.len();
86 }
87 if addr + data.len() <= self.len() {
88 self.mapping
89 .read_at(self.offset + CONTROL_SIZE + addr, data)
90 .unwrap();
91 } else {
92 let (first, last) = data.split_at_mut(self.len() - addr);
93 self.mapping
94 .read_at(self.offset + CONTROL_SIZE + addr, first)
95 .unwrap();
96 self.mapping
97 .read_at(self.offset + CONTROL_SIZE, last)
98 .unwrap();
99 }
100 }
101
102 fn write_at(&self, mut addr: usize, data: &[u8]) {
103 debug_assert!(addr + data.len() <= 2 * self.len);
104 if addr > self.len() {
105 addr -= self.len();
106 }
107 if addr + data.len() <= self.len() {
108 self.mapping
109 .write_at(self.offset + CONTROL_SIZE + addr, data)
110 .unwrap();
111 } else {
112 let (first, last) = data.split_at(self.len() - addr);
113 self.mapping
114 .write_at(self.offset + CONTROL_SIZE + addr, first)
115 .unwrap();
116 self.mapping
117 .write_at(self.offset + CONTROL_SIZE, last)
118 .unwrap();
119 }
120 }
121
122 fn len(&self) -> usize {
123 self.len
124 }
125}
126
127#[derive(Debug, Error)]
128enum ErrorInner {
129 #[error("couldn't find uio device")]
130 Exist(#[source] std::io::Error),
131 #[error("failed to open file")]
132 Open(#[source] std::io::Error),
133 #[error("failed to mmap")]
134 Mmap(#[source] std::io::Error),
135 #[error("ring buffer error")]
136 Ring(#[source] vmbus_ring::Error),
137 #[error("vmbus pipe error")]
138 Pipe(#[source] std::io::Error),
139 #[error("driver error")]
140 Driver(#[source] std::io::Error),
141}
142
143#[derive(Debug, Error)]
145#[error(transparent)]
146pub struct Error(ErrorInner);
147
148impl<T: Into<ErrorInner>> From<T> for Error {
149 fn from(t: T) -> Self {
150 Self(t.into())
151 }
152}
153
154pub fn open_uio_device(instance_id: &Guid) -> Result<File, Error> {
156 let paths = fs_err::read_dir(format!("/sys/bus/vmbus/devices/{instance_id}/uio"))
157 .map_err(ErrorInner::Exist)?;
158
159 let uio_path = paths
160 .last()
161 .unwrap_or_else(|| Err(std::io::ErrorKind::NotFound.into()))
162 .map_err(ErrorInner::Exist)?;
163
164 let uio_dev_path = Path::new("/dev").join(uio_path.file_name());
165 tracing::debug!(
166 dev_path = %uio_dev_path.display(),
167 %instance_id,
168 "opening device"
169 );
170
171 let file = fs_err::OpenOptions::new()
172 .read(true)
173 .write(true)
174 .open(uio_dev_path)
175 .map_err(ErrorInner::Open)?;
176
177 Ok(file.into())
178}
179
180pub fn channel(
182 driver: &(impl Driver + ?Sized),
183 file: File,
184) -> Result<RawAsyncChannel<MappedRingMem>, Error> {
185 let total_mapping_size = CONTROL_SIZE + IN_RING_SIZE + CONTROL_SIZE + OUT_RING_SIZE;
186
187 let mapping = Arc::new(SparseMapping::new(total_mapping_size).map_err(ErrorInner::Mmap)?);
188
189 let mapping_offset = 0;
192 let len = CONTROL_SIZE + OUT_RING_SIZE + CONTROL_SIZE + IN_RING_SIZE;
193
194 mapping
195 .map_file(mapping_offset, len, &file, 0_u64, true)
196 .map_err(ErrorInner::Mmap)?;
197
198 let file = Arc::new(file);
199 let wait = PolledWait::new_with_size(driver, file.clone(), 4).map_err(ErrorInner::Driver)?;
201 let signal = UioSignal {
202 wait: Mutex::new(wait),
203 file,
204 };
205
206 let out_mem = MappedRingMem {
207 mapping: mapping.clone(),
208 offset: 0,
209 len: OUT_RING_SIZE,
210 };
211 let out_ring = OutgoingRing::new(out_mem).map_err(ErrorInner::Ring)?;
212 let in_mem = MappedRingMem {
213 mapping,
214 offset: CONTROL_SIZE + OUT_RING_SIZE,
215 len: IN_RING_SIZE,
216 };
217 let in_ring = IncomingRing::new(in_mem).map_err(ErrorInner::Ring)?;
218
219 let channel = RawAsyncChannel {
220 in_ring,
221 out_ring,
222 signal: Box::new(signal),
223 };
224
225 Ok(channel)
226}
227
228struct UioSignal {
229 file: Arc<File>,
230 wait: Mutex<PolledWait<Arc<File>>>,
231}
232
233impl UioSignal {
234 fn ids(&self) -> Option<(String, String)> {
236 let path = self.file.path().ok()?;
237 let sysfs = Path::new("/sys/bus/uio").join(path.file_name()?);
238 let interface_id = fs_err::read_to_string(sysfs.join("device/class_id")).ok()?;
239 let instance_id = fs_err::read_to_string(sysfs.join("device/device_id")).ok()?;
240 Some((interface_id, instance_id))
241 }
242}
243
244impl SignalVmbusChannel for UioSignal {
245 fn signal_remote(&self) {
246 let n = self.file.as_ref().write(1u32.as_bytes()).unwrap();
250 assert_eq!(n, 4);
251 }
252
253 fn poll_for_signal(
254 &self,
255 cx: &mut std::task::Context<'_>,
256 ) -> std::task::Poll<Result<(), vmbus_channel::ChannelClosed>> {
257 match ready!(self.wait.lock().poll_wait(cx)) {
258 Ok(()) => Ok(()),
259 Err(err) => {
260 let (interface_id, instance_id) = self.ids().unzip();
261 let interface_id = interface_id.as_ref().map(|s| s.trim_end());
262 let interface_id = interface_id.as_ref().map(|s| s.trim_end());
263 if err.raw_os_error() == Some(libc::EIO) {
264 tracing::info!(interface_id, instance_id, "vmbus channel revoked");
265 } else {
266 tracing::error!(
267 interface_id,
268 instance_id,
269 error = &err as &dyn std::error::Error,
270 "unexpected uio error, treating as revoked channel"
271 )
272 }
273 Err(vmbus_channel::ChannelClosed)
274 }
275 }
276 .into()
277 }
278}
279
280pub fn byte_pipe(
282 driver: &(impl Driver + ?Sized),
283 file: File,
284) -> Result<BytePipe<MappedRingMem>, Error> {
285 let channel = channel(driver, file)?;
286 let pipe = BytePipe::new(channel).map_err(ErrorInner::Pipe)?;
287 Ok(pipe)
288}
289
290pub fn message_pipe(
292 driver: &(impl Driver + ?Sized),
293 file: File,
294) -> Result<MessagePipe<MappedRingMem>, Error> {
295 let channel = channel(driver, file)?;
296 let pipe = MessagePipe::new(channel).map_err(ErrorInner::Pipe)?;
297 Ok(pipe)
298}