pal_uring/
threadpool.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! This module implements the following functionality:
5//! - A threadpool that executes tasks on a pool of affinitized worker threads, and manages a pool of
6//!   `IO-Uring`s used to execute asynchronous I/Os. Clients of the threadpool can initiate I/Os on any
7//!   of the rings, but each worker thread owns one `IO-Uring` instance and processes all completions
8//!   for that instance.
9//! - An async task executor that starts a task on the current thread, and then polls it to completion
10//!   on an a worker affinitized to the same processor where the task started. The use case for this
11//!   executor is the VSCL, where we intend to start an I/O processing task on the VP run thread that
12//!   handles a VMBUS interrupt from VTL0, and then later process the I/O completion on the same processor
13//!   (either on an affinitized worker thread, or possibly on the VP run thread itself).
14//! - A future that represents an async I/O request issued via the IO-Uring mechanism.
15
16use super::ioring::IoCompletionRing;
17use super::ioring::IoMemory;
18use super::ioring::IoRing;
19use futures::FutureExt;
20use inspect::Inspect;
21use io_uring::opcode;
22use io_uring::squeue;
23use loan_cell::LoanCell;
24use pal_async::task::Runnable;
25use pal_async::task::Schedule;
26use pal_async::task::Scheduler;
27use pal_async::task::Spawn;
28use pal_async::task::TaskMetadata;
29use pal_async::task::TaskQueue;
30use std::borrow::Borrow;
31use std::cell::Cell;
32use std::cell::RefCell;
33use std::fmt::Debug;
34use std::future::Future;
35use std::future::poll_fn;
36use std::io;
37use std::os::unix::prelude::*;
38use std::pin::Pin;
39use std::pin::pin;
40use std::process::abort;
41use std::sync::Arc;
42use std::task::Context;
43use std::task::Poll;
44use std::task::Wake;
45use std::task::Waker;
46
47/// An io-uring backed pool of tasks and IO.
48pub struct IoUringPool {
49    client: PoolClient,
50    worker: Arc<Worker>,
51    completion_ring: IoCompletionRing,
52    queue: TaskQueue,
53}
54
55impl IoUringPool {
56    /// Builds a new pool with the given ring size. `name` is used as the name of the executor.
57    pub fn new(name: impl Into<Arc<str>>, ring_size: u32) -> io::Result<Self> {
58        let (queue, scheduler) = pal_async::task::task_queue(name);
59
60        let (io_ring, completion_ring) = IoRing::new(ring_size)?;
61        let worker = Arc::new(Worker::new(ring_size, io_ring));
62
63        Ok(Self {
64            client: PoolClient(worker.clone().initiator(scheduler)),
65            worker,
66            completion_ring,
67            queue,
68        })
69    }
70
71    /// Returns the client used to configure the pool and get the initiator.
72    pub fn client(&self) -> &PoolClient {
73        &self.client
74    }
75
76    /// Runs the pool until all clients have been dropped and any registered idle
77    /// tasks have completed.
78    pub fn run(mut self) {
79        drop(self.client);
80        self.worker.run(self.completion_ring, self.queue.run())
81    }
82}
83
84/// A client for manipulating a running [`IoUringPool`].
85#[derive(Debug, Clone, Inspect)]
86#[inspect(transparent)]
87pub struct PoolClient(#[inspect(with = "|x| &x.client")] IoInitiator);
88
89impl PoolClient {
90    /// Sets the idle task to run. The task is returned by `f`, which receives
91    /// the file descriptor of the IO ring.
92    ///
93    /// The idle task is run before waiting on the IO ring. The idle task can
94    /// block synchronously by first calling [`IdleControl::pre_block`], and
95    /// then by polling on the IO ring while the task blocks.
96    //
97    // TODO: move this functionality into underhill_threadpool.
98    pub fn set_idle_task<F>(&self, f: F)
99    where
100        F: 'static + Send + AsyncFnOnce(IdleControl),
101    {
102        // Keep the pool alive as long as the idle task is running by keeping a
103        // clone of this client.
104        let keep_pool_alive = self.clone();
105        let f = Box::new(|fd| {
106            Box::pin(async move {
107                let _keep_pool_alive = keep_pool_alive;
108                f(fd).await
109            }) as Pin<Box<dyn Future<Output = _>>>
110        })
111            as Box<dyn Send + FnOnce(IdleControl) -> Pin<Box<dyn Future<Output = ()>>>>;
112
113        // Spawn a short-lived task to update the idle task.
114        let worker_id = Arc::as_ptr(&self.0.client.worker) as usize; // cast because pointers are not Send
115        let task = self.0.spawn("set_idle_task", async move {
116            THREADPOOL_WORKER_STATE.with(|state| {
117                state.borrow(|state| {
118                    let state = state.unwrap();
119                    assert_eq!(Arc::as_ptr(&state.worker), worker_id as *const _);
120                    state.new_idle_task.set(Some(f));
121                })
122            })
123        });
124
125        task.detach();
126    }
127
128    /// Returns the IO initiator.
129    pub fn initiator(&self) -> &IoInitiator {
130        &self.0
131    }
132
133    /// Sets the CPU affinity for the kernel io-uring worker threads.
134    pub fn set_iowq_affinity(&self, affinity: &pal::unix::affinity::CpuSet) -> io::Result<()> {
135        self.0.client.worker.io_ring.set_iowq_affinity(affinity)
136    }
137
138    /// Sets the maximum bounded and unbounded workers (per NUMA node) for the
139    /// ring.
140    pub fn set_iowq_max_workers(
141        &self,
142        bounded: Option<u32>,
143        unbounded: Option<u32>,
144    ) -> io::Result<()> {
145        self.0
146            .client
147            .worker
148            .io_ring
149            .set_iowq_max_workers(bounded, unbounded)
150    }
151}
152
153impl Schedule for PoolClient {
154    fn schedule(&self, runnable: Runnable) {
155        self.0.client.schedule(runnable)
156    }
157
158    fn name(&self) -> Arc<str> {
159        self.0.client.name()
160    }
161}
162
163#[derive(Debug, inspect::Inspect)]
164pub(crate) struct Worker {
165    io_ring_size: u32,
166    io_ring: IoRing,
167}
168
169type IdleTask = Pin<Box<dyn Future<Output = ()>>>;
170type IdleTaskSpawn = Box<dyn Send + FnOnce(IdleControl) -> IdleTask>;
171
172struct AffinitizedWorkerState {
173    worker: Arc<Worker>,
174    wake: Cell<bool>,
175    new_idle_task: Cell<Option<IdleTaskSpawn>>,
176    completion_ring: RefCell<IoCompletionRing>,
177}
178
179thread_local! {
180    static THREADPOOL_WORKER_STATE: LoanCell<AffinitizedWorkerState> = const { LoanCell::new() };
181}
182
183impl Wake for Worker {
184    fn wake_by_ref(self: &Arc<Self>) {
185        THREADPOOL_WORKER_STATE.with(|state| {
186            state.borrow(|state| {
187                if let Some(state) = state {
188                    if Arc::ptr_eq(self, &state.worker) {
189                        state.wake.set(true);
190                        return;
191                    }
192                }
193                // Submit a nop request to wake up the worker.
194                //
195                // SAFETY: nop opcode does not reference any data.
196                unsafe {
197                    self.io_ring.push(opcode::Nop::new().build(), true);
198                }
199            })
200        })
201    }
202
203    fn wake(self: Arc<Self>) {
204        self.wake_by_ref()
205    }
206}
207
208impl Worker {
209    pub fn new(io_ring_size: u32, io_ring: IoRing) -> Self {
210        Self {
211            io_ring_size,
212            io_ring,
213        }
214    }
215
216    pub fn initiator(self: Arc<Self>, scheduler: Scheduler) -> IoInitiator {
217        IoInitiator {
218            client: Arc::new(WorkerClient {
219                scheduler,
220                worker: self,
221            }),
222        }
223    }
224
225    pub fn run<Fut: Future>(
226        self: Arc<Self>,
227        completion_ring: IoCompletionRing,
228        fut: Fut,
229    ) -> Fut::Output {
230        tracing::debug!(
231            io_ring_size = self.io_ring_size,
232            "AffinitizedWorker running"
233        );
234
235        let waker = self.clone().into();
236        let mut cx = Context::from_waker(&waker);
237        let mut fut = pin!(fut);
238        let mut idle_task = None;
239        let state = AffinitizedWorkerState {
240            worker: self,
241            wake: Cell::new(false),
242            new_idle_task: Cell::new(None),
243            completion_ring: RefCell::new(completion_ring),
244        };
245
246        THREADPOOL_WORKER_STATE.with(|slot| {
247            slot.lend(&state, || {
248                loop {
249                    // Wake tasks due to IO completion.
250                    state.completion_ring.borrow_mut().process();
251
252                    match fut.poll_unpin(&mut cx) {
253                        Poll::Ready(r) => {
254                            tracing::debug!("AffinitizedWorker exiting");
255                            break r;
256                        }
257                        Poll::Pending => {}
258                    }
259
260                    if !state.wake.take() {
261                        if let Some(new_idle_task) = state.new_idle_task.take() {
262                            idle_task = Some(new_idle_task(IdleControl {
263                                inner: state.worker.clone(),
264                            }));
265                            tracing::debug!("new idle task");
266                        }
267
268                        if let Some(task) = &mut idle_task {
269                            match task.poll_unpin(&mut cx) {
270                                Poll::Ready(()) => {
271                                    tracing::debug!("idle task done");
272                                    idle_task = None;
273                                }
274                                Poll::Pending => {}
275                            }
276
277                            if state.wake.take() {
278                                continue;
279                            }
280                        }
281
282                        state.worker.io_ring.submit_and_wait();
283                    }
284                }
285            })
286        })
287    }
288}
289
290/// Control interface used by the idle task.
291#[derive(Debug)]
292pub struct IdleControl {
293    inner: Arc<Worker>,
294}
295
296impl IdleControl {
297    /// Call before blocking in the idle task.
298    ///
299    /// Returns true if it is OK to block. Returns false if the idle task should
300    /// immediately yield instead of blocking.
301    pub fn pre_block(&mut self) -> bool {
302        THREADPOOL_WORKER_STATE.with(|state| {
303            state.borrow(|state| {
304                let state = state.unwrap();
305                assert!(Arc::ptr_eq(&state.worker, &self.inner));
306
307                // Issue IOs.
308                //
309                // FUTURE: get the idle task to do this. This will require an
310                // io-uring change to allow other drivers to call submit.
311                self.inner.io_ring.submit();
312                // If the thread was woken or there are completed IOs, ask the idle
313                // task to yield.
314                !state.wake.get() && state.completion_ring.borrow().is_empty()
315            })
316        })
317    }
318
319    /// The file descriptor of the IO ring.
320    ///
321    /// The idle task should poll on this fd while blocking.
322    pub fn ring_fd(&self) -> BorrowedFd<'_> {
323        self.inner.io_ring.as_fd()
324    }
325}
326
327impl Spawn for IoInitiator {
328    fn scheduler(&self, _metadata: &TaskMetadata) -> Arc<dyn Schedule> {
329        self.client.clone()
330    }
331}
332
333#[derive(Debug, inspect::Inspect)]
334struct WorkerClient {
335    #[inspect(skip)]
336    scheduler: Scheduler,
337    #[inspect(flatten)]
338    worker: Arc<Worker>,
339}
340
341impl Schedule for WorkerClient {
342    fn schedule(&self, runnable: Runnable) {
343        self.scheduler.schedule(runnable)
344    }
345
346    fn name(&self) -> Arc<str> {
347        self.scheduler.name()
348    }
349}
350
351/// Client handle for initiating IOs or spawning tasks on a specific threadpool
352/// thread.
353#[derive(Debug, Clone)]
354pub struct IoInitiator {
355    client: Arc<WorkerClient>,
356}
357
358impl IoInitiator {
359    /// Probes the ring for supporting a given opcode.
360    pub fn probe(&self, opcode: u8) -> bool {
361        self.client.worker.io_ring.probe(opcode)
362    }
363
364    /// Issues an IO described by `f`, referencing IO memory in `io_mem`.
365    ///
366    /// The submission queue entry for the IO is provided by `f` so that the IO
367    /// can reference memory in the `io_mem` object. A reference to `io_mem` is
368    /// passed to `f` after it has been pinned in memory so that it will not
369    /// move for the lifetime of the IO.
370    ///
371    /// Once the IO has completed, both the result and the IO memory are
372    /// returned.
373    ///
374    /// # Safety
375    ///
376    /// The caller must guarantee that `f` returns a submission queue entry that
377    /// only references memory of static lifetime or that is part of the
378    /// `io_mem` object passed to `f`.
379    ///
380    /// # Aborts
381    ///
382    /// The process will abort if the async function is dropped before it
383    /// completes. This is because the IO memory is not moved into the heap, and
384    /// `drop` cannot synchronously wait for the IO to complete.
385    pub async unsafe fn issue_io<T, F>(&self, mut io_mem: T, f: F) -> (io::Result<i32>, T)
386    where
387        T: 'static + Unpin,
388        F: FnOnce(&mut T) -> squeue::Entry,
389    {
390        // Note that this function is written carefully to minimize the
391        // generated future size.
392
393        struct AbortOnDrop;
394
395        impl Drop for AbortOnDrop {
396            fn drop(&mut self) {
397                eprintln!("io dropped in flight, may reference stack memory, aborting process");
398                abort();
399            }
400        }
401
402        // Abort if this future is dropped while the IO is in flight.
403        let abort_on_drop = AbortOnDrop;
404
405        // Initiate and wait for the IO.
406        let result = poll_fn({
407            enum State<F> {
408                NotIssued(F),
409                Issued(usize),
410                Invalid,
411            }
412
413            let mut state = State::NotIssued(f);
414            let io_mem = &mut io_mem;
415            move |cx: &mut Context<'_>| {
416                match std::mem::replace(&mut state, State::Invalid) {
417                    State::NotIssued(f) => {
418                        // SAFETY: validity of the entry is guaranteed by the caller.
419                        state = State::Issued(unsafe {
420                            self.submit_io((f)(io_mem), IoMemory::new(()), cx.waker().clone())
421                        });
422
423                        // Wait once until the waker is woken.
424                        Poll::Pending
425                    }
426                    State::Issued(idx) => match self.poll_io(cx, idx) {
427                        Poll::Ready((result, _)) => Poll::Ready(result),
428                        Poll::Pending => {
429                            state = State::Issued(idx);
430                            Poll::Pending
431                        }
432                    },
433                    State::Invalid => unreachable!(),
434                }
435            }
436        })
437        .await;
438
439        // The IO is complete, so io_mem is no longer aliased.
440        std::mem::forget(abort_on_drop);
441
442        let result = if result >= 0 {
443            Ok(result)
444        } else {
445            Err(io::Error::from_raw_os_error(-result))
446        };
447
448        (result, io_mem)
449    }
450
451    /// # Safety
452    ///
453    /// The caller must guarantee that the given io_mem is compatible with the given sqe.
454    unsafe fn submit_io(&self, sqe: squeue::Entry, io_mem: IoMemory, waker: Waker) -> usize {
455        // Only submit if the worker is not currently running on this thread--if it is, the
456        // IO will be submitted soon.
457        let needs_submit = THREADPOOL_WORKER_STATE.with(|state| {
458            state.borrow(|state| {
459                state.is_none_or(|state| !Arc::ptr_eq(&state.worker, &self.client.worker))
460            })
461        });
462
463        // SAFETY: caller guarantees sqe and io_mem are compatible.
464        unsafe {
465            self.client
466                .worker
467                .io_ring
468                .new_io(sqe, io_mem, waker, needs_submit)
469        }
470    }
471
472    fn poll_io(&self, cx: &mut Context<'_>, idx: usize) -> Poll<(i32, IoMemory)> {
473        self.client.worker.io_ring.poll_io(cx, idx)
474    }
475
476    fn drop_io(&self, idx: usize) {
477        self.client.worker.io_ring.drop_io(idx);
478    }
479}
480
481/// A future representing an IO request submitted to an `IoRingPool`.
482pub struct Io<T, Init: Borrow<IoInitiator> = IoInitiator> {
483    initiator: Init,
484    state: IoState<T>,
485}
486
487enum IoState<T> {
488    NotStarted(squeue::Entry, T),
489    Started(usize),
490    Completed(i32, T),
491    Invalid,
492}
493
494impl<T, Init: Borrow<IoInitiator>> Debug for Io<T, Init> {
495    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496        f.pad("Io")
497    }
498}
499
500impl<T: 'static + Send + Sync + Unpin, Init: Borrow<IoInitiator> + Unpin> Io<T, Init> {
501    /// Creates a new request that will submit the IO described by the submission queue entry
502    /// to the specified initiator.
503    ///
504    /// # Safety
505    ///
506    /// The caller must guarantee that the `submission_queue_entry` only references memory
507    /// owned by the supplied `io_mem` object.
508    pub unsafe fn new(initiator: Init, submission_queue_entry: squeue::Entry, io_mem: T) -> Self {
509        Self {
510            initiator,
511            state: IoState::NotStarted(submission_queue_entry, io_mem),
512        }
513    }
514
515    /// Returns the initiator used to issue the IO.
516    pub fn initiator(&self) -> &Init {
517        &self.initiator
518    }
519
520    /// Issues an async cancel operation for this IO.
521    pub fn cancel(&self) {
522        // SAFETY: the AsyncCancel entry does not reference any external memory.
523        unsafe {
524            self.cancel_inner(|user_data| opcode::AsyncCancel::new(user_data).build());
525        }
526    }
527
528    /// Issues a timeout remove operation for this IO.
529    pub fn cancel_timeout(&self) {
530        // SAFETY: the TimeoutRemove entry does not reference any external memory.
531        unsafe {
532            self.cancel_inner(|user_data| opcode::TimeoutRemove::new(user_data).build());
533        }
534    }
535
536    /// Issues a poll remove operation for this IO.
537    pub fn cancel_poll(&self) {
538        // SAFETY: the PollRemove entry does not reference any external memory.
539        unsafe {
540            self.cancel_inner(|user_data| opcode::PollRemove::new(user_data).build());
541        }
542    }
543
544    /// # Safety
545    ///
546    /// Caller must ensure that `f` produces a safe sqe entry.
547    unsafe fn cancel_inner(&self, f: impl FnOnce(u64) -> squeue::Entry) {
548        let sqe = f(self.user_data().unwrap());
549        // SAFETY: guaranteed by caller
550        let idx = unsafe {
551            self.initiator
552                .borrow()
553                .submit_io(sqe, IoMemory::new(()), Waker::noop().clone())
554        };
555        self.initiator.borrow().drop_io(idx);
556    }
557
558    /// Retrieves the IO memory.
559    ///
560    /// Panics if the IO has started and has not yet completed.
561    pub fn into_mem(mut self) -> T {
562        match std::mem::replace(&mut self.state, IoState::Invalid) {
563            IoState::Started(_) => {
564                panic!("io is not complete");
565            }
566            IoState::NotStarted(_, io_mem) | IoState::Completed(_, io_mem) => io_mem,
567            IoState::Invalid => unreachable!(),
568        }
569    }
570
571    /// Returns the `user_data` field used when intiating the IO, or `None` if
572    /// the IO has not yet been initiated. This is necessary to support
573    /// cancelling IOs.
574    pub fn user_data(&self) -> Option<u64> {
575        match self.state {
576            IoState::Started(idx) => Some(idx as u64),
577            IoState::NotStarted(_, _) | IoState::Completed(_, _) | IoState::Invalid => None,
578        }
579    }
580}
581
582impl<T: 'static + Sync + Send + Unpin, Init: Borrow<IoInitiator> + Unpin> Future for Io<T, Init> {
583    type Output = io::Result<i32>;
584
585    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
586        let this = Pin::get_mut(self);
587        let result = match std::mem::replace(&mut this.state, IoState::Invalid) {
588            IoState::NotStarted(entry, io_mem) => {
589                // SAFETY: guaranteed by unsafe Self::new.
590                let idx = unsafe {
591                    this.initiator.borrow().submit_io(
592                        entry,
593                        IoMemory::new(io_mem),
594                        cx.waker().clone(),
595                    )
596                };
597                this.state = IoState::Started(idx);
598                return Poll::Pending;
599            }
600            IoState::Started(idx) => {
601                this.state = IoState::Started(idx);
602                let (result, io_mem) = std::task::ready!(this.initiator.borrow().poll_io(cx, idx));
603                this.state = IoState::Completed(result, io_mem.downcast());
604                result
605            }
606            IoState::Completed(result, io_mem) => {
607                this.state = IoState::Completed(result, io_mem);
608                result
609            }
610            IoState::Invalid => unreachable!(),
611        };
612        let result = if result >= 0 {
613            Ok(result)
614        } else {
615            Err(io::Error::from_raw_os_error(-result))
616        };
617        Poll::Ready(result)
618    }
619}
620
621impl<T, Init: Borrow<IoInitiator>> Drop for Io<T, Init> {
622    fn drop(&mut self) {
623        if let IoState::Started(idx) = self.state {
624            self.initiator.borrow().drop_io(idx);
625        }
626    }
627}
628
629#[cfg(test)]
630mod tests {
631    #![expect(
632        clippy::disallowed_methods,
633        reason = "test code using futures channels"
634    )]
635
636    use super::Io;
637    use super::IoRing;
638    use crate::IoUringPool;
639    use crate::uring::tests::SingleThreadPool;
640    use futures::executor::block_on;
641    use io_uring::opcode;
642    use io_uring::types;
643    use pal_async::task::Spawn;
644    use parking_lot::Mutex;
645    use std::future::Future;
646    use std::os::unix::prelude::*;
647    use std::pin::Pin;
648    use std::sync::Arc;
649    use std::sync::atomic::AtomicBool;
650    use std::sync::atomic::Ordering;
651    use std::task::Context;
652    use std::task::Poll;
653    use std::task::Waker;
654    use std::thread;
655    use std::time::Duration;
656    use std::time::Instant;
657    use tempfile::NamedTempFile;
658    use test_with_tracing::test;
659
660    const PAGE_SIZE: usize = 4096;
661
662    fn new_test_file() -> NamedTempFile {
663        let mut file = NamedTempFile::new().unwrap();
664        file.as_file_mut().set_len(1024 * 64).unwrap();
665        file
666    }
667
668    struct Env {
669        rx: std::sync::mpsc::Receiver<()>,
670    }
671
672    struct TestCase {
673        tp: SingleThreadPool,
674        file: NamedTempFile,
675        _tx: std::sync::mpsc::Sender<()>,
676    }
677
678    impl Drop for Env {
679        fn drop(&mut self) {
680            while self.rx.recv().is_ok() {}
681        }
682    }
683
684    fn new_test() -> (Env, TestCase) {
685        let file = new_test_file();
686        let tp = SingleThreadPool::new().unwrap();
687        let (tx, rx) = std::sync::mpsc::channel();
688        (Env { rx }, TestCase { tp, file, _tx: tx })
689    }
690
691    struct Timeout {
692        shared_state: Arc<Mutex<TimeoutState>>,
693    }
694
695    struct TimeoutState {
696        completed: bool,
697        waker: Option<Waker>,
698    }
699
700    impl Future for Timeout {
701        type Output = ();
702
703        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
704            let mut shared_state = self.shared_state.lock();
705            if shared_state.completed {
706                Poll::Ready(())
707            } else {
708                shared_state.waker = Some(cx.waker().clone());
709                Poll::Pending
710            }
711        }
712    }
713
714    impl Timeout {
715        fn new(duration: Duration) -> Self {
716            let shared_state = Arc::new(Mutex::new(TimeoutState {
717                completed: false,
718                waker: None,
719            }));
720
721            // Spawn the new thread
722            let thread_shared_state = shared_state.clone();
723            thread::spawn(move || {
724                thread::sleep(duration);
725                let mut shared_state = thread_shared_state.lock();
726                shared_state.completed = true;
727                if let Some(waker) = shared_state.waker.take() {
728                    waker.wake()
729                }
730            });
731
732            Timeout { shared_state }
733        }
734    }
735
736    /// Skips a test case if IO-Uring is not supported (this is necessary because IO-Uring is not yet supported
737    /// by the official WSL2 kernel).
738    macro_rules! skip_if_no_io_uring_support {
739        () => {
740            if IoRing::new(1).is_err() {
741                println!("Test case skipped (no IO-Uring support)");
742                return;
743            }
744        };
745    }
746
747    #[test]
748    fn test_task_executor() {
749        skip_if_no_io_uring_support!();
750        let tp = SingleThreadPool::new().unwrap();
751
752        let (tx, rx) = std::sync::mpsc::channel();
753
754        tp.initiator()
755            .spawn("test", async move {
756                let now = Instant::now();
757                Timeout::new(Duration::from_secs(2)).await;
758                assert!(now.elapsed().as_secs() >= 2);
759                tx.send(()).unwrap();
760            })
761            .detach();
762
763        rx.recv().unwrap();
764    }
765
766    #[test]
767    fn test_local_task_executor() {
768        skip_if_no_io_uring_support!();
769        let tp = SingleThreadPool::new().unwrap();
770
771        let (tx, rx) = std::sync::mpsc::channel();
772
773        tp.initiator()
774            .spawn("test", async move {
775                let now = Instant::now();
776                Timeout::new(Duration::from_secs(2)).await;
777                assert!(now.elapsed().as_secs() >= 2);
778                tx.send(()).unwrap();
779            })
780            .detach();
781
782        rx.recv().unwrap();
783    }
784
785    #[test]
786    fn test_serial_io() {
787        skip_if_no_io_uring_support!();
788        let (_env, test) = new_test();
789
790        block_on(async move {
791            let _ = &test;
792            let mut write_buf = vec![0u8; PAGE_SIZE];
793            for (i, b) in write_buf.iter_mut().enumerate() {
794                *b = i as u8;
795            }
796
797            let sqe = opcode::Write::new(
798                types::Fd(test.file.as_fd().as_raw_fd()),
799                write_buf.as_ptr(),
800                write_buf.len() as _,
801            )
802            .offset(0)
803            .build();
804
805            // SAFETY: The only memory being referenced in the submission is write_buf.
806            let mut write_io = unsafe { Io::new(test.tp.initiator(), sqe, write_buf) };
807            (&mut write_io).await.unwrap();
808            let write_buf = write_io.into_mem();
809
810            let sqe = opcode::Fsync::new(types::Fd(test.file.as_fd().as_raw_fd())).build();
811            // SAFETY: the Fsync entry does not reference any external memory.
812            unsafe {
813                Io::new(test.tp.initiator(), sqe, ()).await.unwrap();
814            }
815
816            let mut read_buf = vec![0u8; PAGE_SIZE];
817            let sqe = opcode::Read::new(
818                types::Fd(test.file.as_fd().as_raw_fd()),
819                read_buf.as_mut_ptr(),
820                read_buf.len() as _,
821            )
822            .offset(0)
823            .build();
824
825            // SAFETY: The only memory being referenced in the submission is read_buf.
826            let mut read_io = unsafe { Io::new(test.tp.initiator(), sqe, read_buf) };
827            (&mut read_io).await.unwrap();
828            let read_buf = read_io.into_mem();
829
830            assert_eq!(&write_buf[..], &read_buf[..]);
831        });
832    }
833
834    #[test]
835    fn test_stack_io() {
836        skip_if_no_io_uring_support!();
837        let (_env, test) = new_test();
838
839        block_on(async move {
840            let _ = &test;
841            let mut write_buf = [0; 100];
842            for (i, b) in write_buf.iter_mut().enumerate() {
843                *b = i as u8;
844            }
845
846            // SAFETY: The only memory being referenced in the submission is write_buf.
847            let (r, write_buf) = unsafe {
848                test.tp
849                    .initiator()
850                    .issue_io(write_buf, |write_buf| {
851                        opcode::Write::new(
852                            types::Fd(test.file.as_fd().as_raw_fd()),
853                            write_buf.as_ptr(),
854                            write_buf.len() as _,
855                        )
856                        .offset(0)
857                        .build()
858                    })
859                    .await
860            };
861            r.unwrap();
862
863            // SAFETY: the Fsync entry does not reference any external memory.
864            unsafe {
865                test.tp
866                    .initiator()
867                    .issue_io((), |_| {
868                        opcode::Fsync::new(types::Fd(test.file.as_fd().as_raw_fd())).build()
869                    })
870                    .await
871                    .0
872                    .unwrap();
873            }
874
875            let read_buf = [0u8; 100];
876            // SAFETY: the buffer is owned by the IO for its lifetime.
877            let (r, read_buf) = unsafe {
878                test.tp
879                    .initiator()
880                    .issue_io(read_buf, |read_buf| {
881                        opcode::Read::new(
882                            types::Fd(test.file.as_fd().as_raw_fd()),
883                            read_buf.as_mut_ptr(),
884                            read_buf.len() as _,
885                        )
886                        .offset(0)
887                        .build()
888                    })
889                    .await
890            };
891            r.unwrap();
892
893            assert_eq!(&write_buf[..], &read_buf[..]);
894        });
895    }
896
897    // TODO: This test requires higher memlock limits that scale with processor count, as set in
898    //       /etc/security/limits.conf and with ulimit -l.
899    //
900    //       Disable these in CI for now until the code is more aware of limits and can handle them and/or io-uring no
901    //       longer requires locked pages. A 16 core build agent requires more than the default set.
902    #[test]
903    #[cfg(not(feature = "ci"))]
904    fn test_split_io() {
905        skip_if_no_io_uring_support!();
906        let (_env, test) = new_test();
907
908        block_on(async move {
909            let _ = &test;
910            let mut write_buf1 = vec![0u8; PAGE_SIZE];
911            for (i, b) in write_buf1.iter_mut().enumerate() {
912                *b = i as u8;
913            }
914            let sqe1 = opcode::Write::new(
915                types::Fd(test.file.as_fd().as_raw_fd()),
916                write_buf1.as_mut_ptr(),
917                write_buf1.len() as _,
918            )
919            .offset(0)
920            .build();
921            // SAFETY: The only memory being referenced in the submission is write_buf1.
922            let write1 = unsafe { Io::new(test.tp.initiator(), sqe1, write_buf1) };
923
924            let mut write_buf2 = vec![0u8; PAGE_SIZE];
925            for (i, b) in write_buf2.iter_mut().enumerate() {
926                *b = i as u8;
927            }
928            let sqe2 = opcode::Write::new(
929                types::Fd(test.file.as_fd().as_raw_fd()),
930                write_buf2.as_mut_ptr(),
931                write_buf2.len() as _,
932            )
933            .offset(4096)
934            .build();
935            // SAFETY: The only memory being referenced in the submission is write_buf2.
936            let write2 = unsafe { Io::new(test.tp.initiator(), sqe2, write_buf2) };
937
938            let (r1, r2) = futures::join!(write1, write2);
939            r1.unwrap();
940            r2.unwrap();
941        });
942    }
943
944    // TODO: This test requires higher memlock limits that scale with processor count, as set in
945    //       /etc/security/limits.conf and with ulimit -l.
946    //
947    //       Disable these in CI for now until the code is more aware of limits and can handle them and/or io-uring no
948    //       longer requires locked pages. A 16 core build agent requires more than the default set.
949    #[test]
950    #[cfg(not(feature = "ci"))]
951    fn test_tp_io() {
952        skip_if_no_io_uring_support!();
953        let (_env, test) = new_test();
954
955        test.tp
956            .initiator()
957            .clone()
958            .spawn("test", async move {
959                let _ = &test;
960                let mut write_buf = vec![0u8; PAGE_SIZE];
961                for (i, b) in write_buf.iter_mut().enumerate() {
962                    *b = i as u8;
963                }
964
965                let sqe = opcode::Write::new(
966                    types::Fd(test.file.as_fd().as_raw_fd()),
967                    write_buf.as_mut_ptr(),
968                    write_buf.len() as _,
969                )
970                .offset(0)
971                .build();
972
973                // SAFETY: The only memory being referenced in the submission is write_buf.
974                let mut write_io = unsafe { Io::new(test.tp.initiator(), sqe, write_buf) };
975                (&mut write_io).await.unwrap();
976                let write_buf = write_io.into_mem();
977
978                let mut read_buf = vec![0u8; PAGE_SIZE];
979
980                let sqe = opcode::Read::new(
981                    types::Fd(test.file.as_fd().as_raw_fd()),
982                    read_buf.as_mut_ptr(),
983                    read_buf.len() as _,
984                )
985                .offset(0)
986                .build();
987
988                // SAFETY: The only memory being referenced in the submission is read_buf.
989                let mut read_io = unsafe { Io::new(test.tp.initiator(), sqe, read_buf) };
990                (&mut read_io).await.unwrap();
991                let read_buf = read_io.into_mem();
992
993                assert_eq!(&write_buf[..], &read_buf[..]);
994            })
995            .detach();
996    }
997
998    #[test]
999    fn test_run_until_none() {
1000        skip_if_no_io_uring_support!();
1001        let (send, recv) = futures::channel::oneshot::channel();
1002        let (send2, recv2) = futures::channel::oneshot::channel();
1003        let pool = IoUringPool::new("test", 16).unwrap();
1004        let done = Arc::new(AtomicBool::new(false));
1005        pool.client()
1006            .initiator()
1007            .spawn("hmm", {
1008                async move {
1009                    recv.await.unwrap();
1010                    send2.send(()).unwrap();
1011                }
1012            })
1013            .detach();
1014        pool.client().set_idle_task({
1015            let done = done.clone();
1016            |_ctl| async move {
1017                send.send(()).unwrap();
1018                recv2.await.unwrap();
1019                done.store(true, Ordering::SeqCst);
1020            }
1021        });
1022        pool.run();
1023        assert!(done.load(Ordering::SeqCst));
1024    }
1025}