pal_uring/
ioring.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Lower-level async io-uring support, not tied to an executor model.
5
6use ::smallbox::SmallBox;
7use ::smallbox::space::S4;
8use io_uring::IoUring;
9use io_uring::squeue;
10use pal::unix::SyscallResult;
11use pal::unix::affinity::CpuSet;
12use pal::unix::while_eintr;
13use parking_lot::Mutex;
14use slab::Slab;
15use smallbox::smallbox;
16use std::any::Any;
17use std::collections::VecDeque;
18use std::fmt::Debug;
19use std::io;
20use std::os::unix::prelude::*;
21use std::sync::Arc;
22use std::sync::atomic::AtomicUsize;
23use std::sync::atomic::Ordering;
24use std::task::Context;
25use std::task::Poll;
26use std::task::Waker;
27
28/// An object that owns memory needed for an asynchronous IO.
29///
30/// This object is passed to the ring to ensure that any pointers referenced by
31/// the IO are kept alive for the duration of the IO, even if the issuing task
32/// is dropped or forgotten.
33///
34/// The internals use a SmallBox<_, S4> to avoid allocations for common sizes.
35pub struct IoMemory(SmallBox<dyn Any + Sync + Send + Unpin, S4>);
36
37impl IoMemory {
38    /// Creates a new memory, erasing the type.
39    pub fn new(v: impl 'static + Sync + Send + Unpin) -> Self {
40        Self(smallbox!(v))
41    }
42
43    /// Converts the memory back to an unerased type.
44    ///
45    /// Panics if the type is not the same as that of the object passed to
46    /// `new`.
47    pub fn downcast<T: Any>(self) -> T {
48        // Remove the Unpin bound so that `downcast` is available.
49        //
50        // SAFETY: The vtable for Any is the same as Any + Sync + Send + Unpin.
51        let inner: SmallBox<dyn Any, S4> = unsafe { std::mem::transmute(self.0) };
52        let inner: SmallBox<T, _> = inner.downcast().unwrap();
53        inner.into_inner()
54    }
55}
56
57impl Debug for IoMemory {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.pad("IoMemory")
60    }
61}
62
63/// An IO submission ring.
64pub struct IoRing {
65    inner: Arc<RingInner>,
66}
67
68struct RingInner {
69    ring: IoUring,
70    state: Mutex<RingState>,
71    pending_io_count: AtomicUsize,
72}
73
74struct RingState {
75    // The list of outstanding IOs. This is locked with a single Mutex because
76    // IOs are generally expected to be issued and completed on a single thread,
77    // so contention should be rare.
78    iocbs: Slab<Iocb>,
79    // The submission IO overflow queue.
80    //
81    // FUTURE: instead of maintaining a queue, consider providing backpressure
82    // to initiators.
83    queue: VecDeque<QueueEntry>,
84}
85
86impl Debug for IoRing {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.debug_struct("IoRing").finish()
89    }
90}
91
92impl AsFd for IoRing {
93    fn as_fd(&self) -> BorrowedFd<'_> {
94        // SAFETY: fd is valid as long as `ring`
95        unsafe { BorrowedFd::borrow_raw(self.inner.ring.as_raw_fd()) }
96    }
97}
98
99struct QueueEntry {
100    sqe: squeue::Entry,
101    iocb_idx: Option<usize>,
102}
103
104/// An IO completion ring.
105pub struct IoCompletionRing {
106    inner: Arc<RingInner>,
107    // Keep this Vec around to avoid repeated allocations.
108    results: Vec<Result<Waker, Iocb>>,
109}
110
111impl IoCompletionRing {
112    /// Processes the completion ring, waking any tasks waiting on IO.
113    pub fn process(&mut self) {
114        // Collect the wakers and dropped IO with the IOCB lock held.
115        {
116            let mut state = self.inner.state.lock();
117            // SAFETY: Callers of `completion_shared` must ensure that no other
118            // `CompletionQueue` may exist at the time of the call. This is
119            // guaranteed because there is only once instance of
120            // IoCompletionRing per io-uring.
121            while let Some(cqe) = unsafe { self.inner.ring.completion_shared().next() } {
122                let result = cqe.result();
123
124                if cqe.user_data() != !0 {
125                    let idx = cqe.user_data() as usize;
126                    let iocb = &mut state.iocbs[idx];
127                    match std::mem::replace(&mut iocb.state, IoState::Completed(result)) {
128                        IoState::Waiting(waker) => {
129                            self.results.push(Ok(waker));
130                        }
131                        IoState::Completed(_) => panic!("io double completed"),
132                        IoState::Dropped => {
133                            self.results.push(Err(state.iocbs.remove(idx)));
134                            self.inner.pending_io_count.fetch_sub(1, Ordering::Relaxed);
135                        }
136                    }
137                }
138            }
139        }
140
141        // Wake the tasks and drop IOCBs outside the lock.
142        for result in self.results.drain(..) {
143            if let Ok(waker) = result {
144                waker.wake();
145            }
146        }
147    }
148
149    pub fn is_empty(&self) -> bool {
150        // SAFETY: there is only one instance of this type per io-uring, so
151        // there cannot be other concurrent users of the completion ring.
152        unsafe { self.inner.ring.completion_shared().is_empty() }
153    }
154}
155
156impl IoRing {
157    /// Creates a new `IoRing` wrapper and the underlying kernel io-uring.
158    ///
159    /// # Arguments
160    ///
161    /// * `size` - The maximum number of entries in the submission queue. The completion queue is
162    ///   twice the size of the submission queue. Note that this is not strictly a limit on the maximum
163    ///   number of outstanding I/Os, rather it's the maximum number of I/Os that the IoRing client
164    ///   can allow to batch (either in the submission or completion paths).
165    pub fn new(size: u32) -> Result<(IoRing, IoCompletionRing), io::Error> {
166        let inner = Arc::new(RingInner {
167            ring: IoUring::builder().build(size)?,
168            state: Mutex::new(RingState {
169                iocbs: Slab::new(),
170                queue: VecDeque::with_capacity(size as usize),
171            }),
172            pending_io_count: AtomicUsize::new(0),
173        });
174
175        let this = IoRing {
176            inner: inner.clone(),
177        };
178        let cring = IoCompletionRing {
179            inner,
180            results: Vec::new(),
181        };
182        Ok((this, cring))
183    }
184
185    /// Sets the maximum bounded and unbounded workers (per NUMA node) for the
186    /// ring.
187    pub fn set_iowq_max_workers(
188        &self,
189        bounded: Option<u32>,
190        unbounded: Option<u32>,
191    ) -> io::Result<()> {
192        self.inner
193            .ring
194            .submitter()
195            .register_iowq_max_workers(&mut [bounded.unwrap_or(0), unbounded.unwrap_or(0)])
196    }
197
198    /// io_uring doesn't support IORING_REGISTER_IOWQ_AFF yet, so create an unsafe version.
199    /// We should use the io_uring API in the future.
200    pub fn set_iowq_affinity(&self, cpu_set: &CpuSet) -> io::Result<()> {
201        // SAFETY: calling as documented, with appropriately-sized buffer.
202        // According to the doc, IORING_REGISTER_IOWQ_AFF expects nr_args as the
203        // byte size of cpu mask.
204        unsafe {
205            let ret = libc::syscall(
206                libc::SYS_io_uring_register,
207                self.inner.ring.as_raw_fd(),
208                17, // IORING_REGISTER_IOWQ_AFF
209                cpu_set.as_ptr(),
210                cpu_set.buffer_len(),
211            );
212
213            if ret < 0 {
214                return Err(io::Error::last_os_error());
215            }
216        }
217        Ok(())
218    }
219
220    fn try_submit(&self) {
221        if let Err(e) = self.inner.ring.submit() {
222            if e.raw_os_error() == Some(libc::EBUSY) {
223                tracing::trace!("completion queue is full");
224            } else {
225                panic!("iouring submit failed: {}", e);
226            }
227        }
228    }
229
230    fn sqe(entry: &QueueEntry) -> squeue::Entry {
231        entry
232            .sqe
233            .clone()
234            .user_data(entry.iocb_idx.unwrap_or(!0) as u64)
235    }
236
237    // Flushes as many entries as possible from the overflow queue to the submission queue and
238    // optionally queues a new entry.
239    fn flush_queue(&self, state: &mut RingState, new_entry: Option<QueueEntry>) -> bool {
240        assert_eq!(std::ptr::from_mut(state), self.inner.state.data_ptr());
241        // SAFETY: Callers of submission_shared must ensure that no other
242        // SubmissionQueue may exist at the time of the call. This is guaranteed
243        // by holding the lock associated with `state`.
244        unsafe {
245            let mut ring = self.inner.ring.submission_shared();
246            if ring.is_full() {
247                tracing::trace!("submission queue is full");
248                drop(ring);
249                self.try_submit();
250                ring = self.inner.ring.submission_shared();
251            }
252
253            while let Some(entry) = state.queue.front() {
254                if ring.push(&Self::sqe(entry)).is_err() {
255                    break;
256                }
257                state.queue.pop_front();
258            }
259
260            if let Some(entry) = new_entry {
261                if ring.push(&Self::sqe(&entry)).is_err() {
262                    state.queue.push_back(entry);
263                }
264            }
265
266            !ring.is_empty()
267        }
268    }
269
270    /// Submits as many entries as possible.
271    pub fn submit(&self) {
272        // Push entries from the overflow queue.
273        if self.flush_queue(&mut self.inner.state.lock(), None) {
274            self.try_submit();
275        }
276    }
277
278    /// Submits as many entries as possible and waits for the next completion.
279    pub fn submit_and_wait(&self) {
280        // Push entries from the overflow queue.
281        self.flush_queue(&mut self.inner.state.lock(), None);
282
283        // Attempt to submit all entries
284        while_eintr(|| self.inner.ring.submit_and_wait(1)).unwrap_or_else(|e| {
285            assert_eq!(e.raw_os_error(), Some(libc::EBUSY));
286            tracing::trace!("completion queue is full");
287            // Completion queue is full. Wait on the ring fd without submitting any entries,
288            // the caller will consume some completion entries and try to submit again.
289            let mut pollfd = libc::pollfd {
290                fd: self.inner.ring.as_raw_fd(),
291                events: libc::POLLIN,
292                revents: 0,
293            };
294            // SAFETY: calling poll with a valid pollfd.
295            unsafe {
296                while_eintr(|| libc::poll(&mut pollfd, 1, -1).syscall_result()).unwrap();
297            };
298            0
299        });
300    }
301
302    /// Pushes a new IO in the ring and optionally submits it. Returns an index
303    /// to be used for tracking.
304    ///
305    /// The IO can be polled for completion with [`Self::poll_io`], or the
306    /// result discarded with [`Self::drop_io`].
307    ///
308    /// # Safety
309    ///
310    /// The caller must ensure that `sqe` is valid and that `io_mem` will ensure
311    /// the lifetime of any required buffers.
312    pub unsafe fn new_io(
313        &self,
314        sqe: squeue::Entry,
315        io_mem: IoMemory,
316        waker: Waker,
317        submit: bool,
318    ) -> usize {
319        let iocb_idx;
320        {
321            let mut state = self.inner.state.lock();
322            iocb_idx = state.iocbs.insert(Iocb {
323                state: IoState::Waiting(waker),
324                io_mem,
325            });
326            self.inner.pending_io_count.fetch_add(1, Ordering::Relaxed);
327            self.flush_queue(
328                &mut state,
329                Some(QueueEntry {
330                    sqe,
331                    iocb_idx: Some(iocb_idx),
332                }),
333            );
334        }
335
336        if submit {
337            self.try_submit();
338        }
339
340        iocb_idx
341    }
342
343    /// Pushes an IO to the ring and optionally submits it.
344    ///
345    /// # Safety
346    ///
347    /// The caller must ensure that `sqe` is valid and does not reference any
348    /// external buffers.
349    pub unsafe fn push(&self, sqe: squeue::Entry, submit: bool) {
350        let entry = QueueEntry {
351            sqe,
352            iocb_idx: None,
353        };
354        self.flush_queue(&mut self.inner.state.lock(), Some(entry));
355        if submit {
356            self.try_submit();
357        }
358    }
359
360    /// Checks whether the specified opcode is supported by this `IoRing`.
361    pub fn probe(&self, opcode: u8) -> bool {
362        let mut probe = io_uring::Probe::new();
363        self.inner
364            .ring
365            .submitter()
366            .register_probe(&mut probe)
367            .unwrap();
368        probe.is_supported(opcode)
369    }
370
371    /// Polls an IO for completion.
372    ///
373    /// If the IO is completed, returns the status and associated memory object.
374    /// The IO is no longer tracked after this, so do not call `drop_io`.
375    pub fn poll_io(&self, cx: &mut Context<'_>, idx: usize) -> Poll<(i32, IoMemory)> {
376        let mut state = self.inner.state.lock();
377        let iocb = &mut state.iocbs[idx];
378        match &mut iocb.state {
379            IoState::Waiting(old_waker) => {
380                old_waker.clone_from(cx.waker());
381            }
382            IoState::Completed(status) => {
383                let status = *status;
384                let iocb = state.iocbs.remove(idx);
385                self.inner.pending_io_count.fetch_sub(1, Ordering::Relaxed);
386                return Poll::Ready((status, iocb.io_mem));
387            }
388            IoState::Dropped => {
389                panic!("polling dropped io");
390            }
391        }
392        Poll::Pending
393    }
394
395    /// Releases an IO without consuming its result.
396    ///
397    /// This does not cancel the IO. It just directs the completion ring to
398    /// release the associated resources after the IO completes, since no task
399    /// plans to poll it.
400    pub fn drop_io(&self, idx: usize) {
401        let mut state = self.inner.state.lock();
402        let iocb = &mut state.iocbs[idx];
403        match &iocb.state {
404            IoState::Waiting(_) => {
405                iocb.state = IoState::Dropped;
406            }
407            IoState::Completed(_) => {
408                let iocb = state.iocbs.remove(idx);
409                self.inner.pending_io_count.fetch_sub(1, Ordering::Relaxed);
410                drop(state);
411                drop(iocb);
412            }
413            IoState::Dropped => {
414                panic!("double dropped an io");
415            }
416        }
417    }
418}
419
420impl inspect::Inspect for IoRing {
421    fn inspect(&self, req: inspect::Request<'_>) {
422        let state = self.inner.state.lock();
423        let mut completed = 0;
424        let mut waiting = 0;
425        let mut dropped = 0;
426        state.iocbs.iter().for_each(|i| match i.1.state {
427            IoState::Waiting(_) => waiting += 1,
428            IoState::Completed(_) => completed += 1,
429            IoState::Dropped => dropped += 1,
430        });
431        req.respond()
432            .field("iocbs_allocated", state.iocbs.len())
433            .field("iocbs_queued", state.queue.len())
434            .field("iocbs_waiting", waiting)
435            .field("iocbs_completed", completed)
436            .field("iocbs_dropped", dropped);
437    }
438}
439
440#[derive(Debug)]
441struct Iocb {
442    state: IoState,
443    io_mem: IoMemory,
444}
445
446/// The completion state of an asynchronous IO.
447#[derive(Debug)]
448enum IoState {
449    Waiting(Waker),
450    Completed(i32),
451    Dropped,
452}