1use ::smallbox::SmallBox;
7use ::smallbox::space::S4;
8use io_uring::IoUring;
9use io_uring::squeue;
10use pal::unix::SyscallResult;
11use pal::unix::affinity::CpuSet;
12use pal::unix::while_eintr;
13use parking_lot::Mutex;
14use slab::Slab;
15use smallbox::smallbox;
16use std::any::Any;
17use std::collections::VecDeque;
18use std::fmt::Debug;
19use std::io;
20use std::os::unix::prelude::*;
21use std::sync::Arc;
22use std::sync::atomic::AtomicUsize;
23use std::sync::atomic::Ordering;
24use std::task::Context;
25use std::task::Poll;
26use std::task::Waker;
27
28pub struct IoMemory(SmallBox<dyn Any + Sync + Send + Unpin, S4>);
36
37impl IoMemory {
38 pub fn new(v: impl 'static + Sync + Send + Unpin) -> Self {
40 Self(smallbox!(v))
41 }
42
43 pub fn downcast<T: Any>(self) -> T {
48 let inner: SmallBox<dyn Any, S4> = unsafe { std::mem::transmute(self.0) };
52 let inner: SmallBox<T, _> = inner.downcast().unwrap();
53 inner.into_inner()
54 }
55}
56
57impl Debug for IoMemory {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.pad("IoMemory")
60 }
61}
62
63pub struct IoRing {
65 inner: Arc<RingInner>,
66}
67
68struct RingInner {
69 ring: IoUring,
70 state: Mutex<RingState>,
71 pending_io_count: AtomicUsize,
72}
73
74struct RingState {
75 iocbs: Slab<Iocb>,
79 queue: VecDeque<QueueEntry>,
84}
85
86impl Debug for IoRing {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 f.debug_struct("IoRing").finish()
89 }
90}
91
92impl AsFd for IoRing {
93 fn as_fd(&self) -> BorrowedFd<'_> {
94 unsafe { BorrowedFd::borrow_raw(self.inner.ring.as_raw_fd()) }
96 }
97}
98
99struct QueueEntry {
100 sqe: squeue::Entry,
101 iocb_idx: Option<usize>,
102}
103
104pub struct IoCompletionRing {
106 inner: Arc<RingInner>,
107 results: Vec<Result<Waker, Iocb>>,
109}
110
111impl IoCompletionRing {
112 pub fn process(&mut self) {
114 {
116 let mut state = self.inner.state.lock();
117 while let Some(cqe) = unsafe { self.inner.ring.completion_shared().next() } {
122 let result = cqe.result();
123
124 if cqe.user_data() != !0 {
125 let idx = cqe.user_data() as usize;
126 let iocb = &mut state.iocbs[idx];
127 match std::mem::replace(&mut iocb.state, IoState::Completed(result)) {
128 IoState::Waiting(waker) => {
129 self.results.push(Ok(waker));
130 }
131 IoState::Completed(_) => panic!("io double completed"),
132 IoState::Dropped => {
133 self.results.push(Err(state.iocbs.remove(idx)));
134 self.inner.pending_io_count.fetch_sub(1, Ordering::Relaxed);
135 }
136 }
137 }
138 }
139 }
140
141 for result in self.results.drain(..) {
143 if let Ok(waker) = result {
144 waker.wake();
145 }
146 }
147 }
148
149 pub fn is_empty(&self) -> bool {
150 unsafe { self.inner.ring.completion_shared().is_empty() }
153 }
154}
155
156impl IoRing {
157 pub fn new(size: u32) -> Result<(IoRing, IoCompletionRing), io::Error> {
166 let inner = Arc::new(RingInner {
167 ring: IoUring::builder().build(size)?,
168 state: Mutex::new(RingState {
169 iocbs: Slab::new(),
170 queue: VecDeque::with_capacity(size as usize),
171 }),
172 pending_io_count: AtomicUsize::new(0),
173 });
174
175 let this = IoRing {
176 inner: inner.clone(),
177 };
178 let cring = IoCompletionRing {
179 inner,
180 results: Vec::new(),
181 };
182 Ok((this, cring))
183 }
184
185 pub fn set_iowq_max_workers(
188 &self,
189 bounded: Option<u32>,
190 unbounded: Option<u32>,
191 ) -> io::Result<()> {
192 self.inner
193 .ring
194 .submitter()
195 .register_iowq_max_workers(&mut [bounded.unwrap_or(0), unbounded.unwrap_or(0)])
196 }
197
198 pub fn set_iowq_affinity(&self, cpu_set: &CpuSet) -> io::Result<()> {
201 unsafe {
205 let ret = libc::syscall(
206 libc::SYS_io_uring_register,
207 self.inner.ring.as_raw_fd(),
208 17, cpu_set.as_ptr(),
210 cpu_set.buffer_len(),
211 );
212
213 if ret < 0 {
214 return Err(io::Error::last_os_error());
215 }
216 }
217 Ok(())
218 }
219
220 fn try_submit(&self) {
221 if let Err(e) = self.inner.ring.submit() {
222 if e.raw_os_error() == Some(libc::EBUSY) {
223 tracing::trace!("completion queue is full");
224 } else {
225 panic!("iouring submit failed: {}", e);
226 }
227 }
228 }
229
230 fn sqe(entry: &QueueEntry) -> squeue::Entry {
231 entry
232 .sqe
233 .clone()
234 .user_data(entry.iocb_idx.unwrap_or(!0) as u64)
235 }
236
237 fn flush_queue(&self, state: &mut RingState, new_entry: Option<QueueEntry>) -> bool {
240 assert_eq!(std::ptr::from_mut(state), self.inner.state.data_ptr());
241 unsafe {
245 let mut ring = self.inner.ring.submission_shared();
246 if ring.is_full() {
247 tracing::trace!("submission queue is full");
248 drop(ring);
249 self.try_submit();
250 ring = self.inner.ring.submission_shared();
251 }
252
253 while let Some(entry) = state.queue.front() {
254 if ring.push(&Self::sqe(entry)).is_err() {
255 break;
256 }
257 state.queue.pop_front();
258 }
259
260 if let Some(entry) = new_entry {
261 if ring.push(&Self::sqe(&entry)).is_err() {
262 state.queue.push_back(entry);
263 }
264 }
265
266 !ring.is_empty()
267 }
268 }
269
270 pub fn submit(&self) {
272 if self.flush_queue(&mut self.inner.state.lock(), None) {
274 self.try_submit();
275 }
276 }
277
278 pub fn submit_and_wait(&self) {
280 self.flush_queue(&mut self.inner.state.lock(), None);
282
283 while_eintr(|| self.inner.ring.submit_and_wait(1)).unwrap_or_else(|e| {
285 assert_eq!(e.raw_os_error(), Some(libc::EBUSY));
286 tracing::trace!("completion queue is full");
287 let mut pollfd = libc::pollfd {
290 fd: self.inner.ring.as_raw_fd(),
291 events: libc::POLLIN,
292 revents: 0,
293 };
294 unsafe {
296 while_eintr(|| libc::poll(&mut pollfd, 1, -1).syscall_result()).unwrap();
297 };
298 0
299 });
300 }
301
302 pub unsafe fn new_io(
313 &self,
314 sqe: squeue::Entry,
315 io_mem: IoMemory,
316 waker: Waker,
317 submit: bool,
318 ) -> usize {
319 let iocb_idx;
320 {
321 let mut state = self.inner.state.lock();
322 iocb_idx = state.iocbs.insert(Iocb {
323 state: IoState::Waiting(waker),
324 io_mem,
325 });
326 self.inner.pending_io_count.fetch_add(1, Ordering::Relaxed);
327 self.flush_queue(
328 &mut state,
329 Some(QueueEntry {
330 sqe,
331 iocb_idx: Some(iocb_idx),
332 }),
333 );
334 }
335
336 if submit {
337 self.try_submit();
338 }
339
340 iocb_idx
341 }
342
343 pub unsafe fn push(&self, sqe: squeue::Entry, submit: bool) {
350 let entry = QueueEntry {
351 sqe,
352 iocb_idx: None,
353 };
354 self.flush_queue(&mut self.inner.state.lock(), Some(entry));
355 if submit {
356 self.try_submit();
357 }
358 }
359
360 pub fn probe(&self, opcode: u8) -> bool {
362 let mut probe = io_uring::Probe::new();
363 self.inner
364 .ring
365 .submitter()
366 .register_probe(&mut probe)
367 .unwrap();
368 probe.is_supported(opcode)
369 }
370
371 pub fn poll_io(&self, cx: &mut Context<'_>, idx: usize) -> Poll<(i32, IoMemory)> {
376 let mut state = self.inner.state.lock();
377 let iocb = &mut state.iocbs[idx];
378 match &mut iocb.state {
379 IoState::Waiting(old_waker) => {
380 old_waker.clone_from(cx.waker());
381 }
382 IoState::Completed(status) => {
383 let status = *status;
384 let iocb = state.iocbs.remove(idx);
385 self.inner.pending_io_count.fetch_sub(1, Ordering::Relaxed);
386 return Poll::Ready((status, iocb.io_mem));
387 }
388 IoState::Dropped => {
389 panic!("polling dropped io");
390 }
391 }
392 Poll::Pending
393 }
394
395 pub fn drop_io(&self, idx: usize) {
401 let mut state = self.inner.state.lock();
402 let iocb = &mut state.iocbs[idx];
403 match &iocb.state {
404 IoState::Waiting(_) => {
405 iocb.state = IoState::Dropped;
406 }
407 IoState::Completed(_) => {
408 let iocb = state.iocbs.remove(idx);
409 self.inner.pending_io_count.fetch_sub(1, Ordering::Relaxed);
410 drop(state);
411 drop(iocb);
412 }
413 IoState::Dropped => {
414 panic!("double dropped an io");
415 }
416 }
417 }
418}
419
420impl inspect::Inspect for IoRing {
421 fn inspect(&self, req: inspect::Request<'_>) {
422 let state = self.inner.state.lock();
423 let mut completed = 0;
424 let mut waiting = 0;
425 let mut dropped = 0;
426 state.iocbs.iter().for_each(|i| match i.1.state {
427 IoState::Waiting(_) => waiting += 1,
428 IoState::Completed(_) => completed += 1,
429 IoState::Dropped => dropped += 1,
430 });
431 req.respond()
432 .field("iocbs_allocated", state.iocbs.len())
433 .field("iocbs_queued", state.queue.len())
434 .field("iocbs_waiting", waiting)
435 .field("iocbs_completed", completed)
436 .field("iocbs_dropped", dropped);
437 }
438}
439
440#[derive(Debug)]
441struct Iocb {
442 state: IoState,
443 io_mem: IoMemory,
444}
445
446#[derive(Debug)]
448enum IoState {
449 Waiting(Waker),
450 Completed(i32),
451 Dropped,
452}