1use super::ioring::IoCompletionRing;
17use super::ioring::IoMemory;
18use super::ioring::IoRing;
19use futures::FutureExt;
20use inspect::Inspect;
21use io_uring::opcode;
22use io_uring::squeue;
23use loan_cell::LoanCell;
24use pal_async::task::Runnable;
25use pal_async::task::Schedule;
26use pal_async::task::Scheduler;
27use pal_async::task::Spawn;
28use pal_async::task::TaskMetadata;
29use pal_async::task::TaskQueue;
30use std::borrow::Borrow;
31use std::cell::Cell;
32use std::cell::RefCell;
33use std::fmt::Debug;
34use std::future::Future;
35use std::future::poll_fn;
36use std::io;
37use std::os::unix::prelude::*;
38use std::pin::Pin;
39use std::pin::pin;
40use std::process::abort;
41use std::sync::Arc;
42use std::task::Context;
43use std::task::Poll;
44use std::task::Wake;
45use std::task::Waker;
46
47pub struct IoUringPool {
49 client: PoolClient,
50 worker: Arc<Worker>,
51 completion_ring: IoCompletionRing,
52 queue: TaskQueue,
53}
54
55impl IoUringPool {
56 pub fn new(name: impl Into<Arc<str>>, ring_size: u32) -> io::Result<Self> {
58 let (queue, scheduler) = pal_async::task::task_queue(name);
59
60 let (io_ring, completion_ring) = IoRing::new(ring_size)?;
61 let worker = Arc::new(Worker::new(ring_size, io_ring));
62
63 Ok(Self {
64 client: PoolClient(worker.clone().initiator(scheduler)),
65 worker,
66 completion_ring,
67 queue,
68 })
69 }
70
71 pub fn client(&self) -> &PoolClient {
73 &self.client
74 }
75
76 pub fn run(mut self) {
79 drop(self.client);
80 self.worker.run(self.completion_ring, self.queue.run())
81 }
82}
83
84#[derive(Debug, Clone, Inspect)]
86#[inspect(transparent)]
87pub struct PoolClient(#[inspect(with = "|x| &x.client")] IoInitiator);
88
89impl PoolClient {
90 pub fn set_idle_task<F>(&self, f: F)
99 where
100 F: 'static + Send + AsyncFnOnce(IdleControl),
101 {
102 let keep_pool_alive = self.clone();
105 let f = Box::new(|fd| {
106 Box::pin(async move {
107 let _keep_pool_alive = keep_pool_alive;
108 f(fd).await
109 }) as Pin<Box<dyn Future<Output = _>>>
110 })
111 as Box<dyn Send + FnOnce(IdleControl) -> Pin<Box<dyn Future<Output = ()>>>>;
112
113 let worker_id = Arc::as_ptr(&self.0.client.worker) as usize; let task = self.0.spawn("set_idle_task", async move {
116 THREADPOOL_WORKER_STATE.with(|state| {
117 state.borrow(|state| {
118 let state = state.unwrap();
119 assert_eq!(Arc::as_ptr(&state.worker), worker_id as *const _);
120 state.new_idle_task.set(Some(f));
121 })
122 })
123 });
124
125 task.detach();
126 }
127
128 pub fn initiator(&self) -> &IoInitiator {
130 &self.0
131 }
132
133 pub fn set_iowq_affinity(&self, affinity: &pal::unix::affinity::CpuSet) -> io::Result<()> {
135 self.0.client.worker.io_ring.set_iowq_affinity(affinity)
136 }
137
138 pub fn set_iowq_max_workers(
141 &self,
142 bounded: Option<u32>,
143 unbounded: Option<u32>,
144 ) -> io::Result<()> {
145 self.0
146 .client
147 .worker
148 .io_ring
149 .set_iowq_max_workers(bounded, unbounded)
150 }
151}
152
153impl Schedule for PoolClient {
154 fn schedule(&self, runnable: Runnable) {
155 self.0.client.schedule(runnable)
156 }
157
158 fn name(&self) -> Arc<str> {
159 self.0.client.name()
160 }
161}
162
163#[derive(Debug, inspect::Inspect)]
164pub(crate) struct Worker {
165 io_ring_size: u32,
166 io_ring: IoRing,
167}
168
169type IdleTask = Pin<Box<dyn Future<Output = ()>>>;
170type IdleTaskSpawn = Box<dyn Send + FnOnce(IdleControl) -> IdleTask>;
171
172struct AffinitizedWorkerState {
173 worker: Arc<Worker>,
174 wake: Cell<bool>,
175 new_idle_task: Cell<Option<IdleTaskSpawn>>,
176 completion_ring: RefCell<IoCompletionRing>,
177}
178
179thread_local! {
180 static THREADPOOL_WORKER_STATE: LoanCell<AffinitizedWorkerState> = const { LoanCell::new() };
181}
182
183impl Wake for Worker {
184 fn wake_by_ref(self: &Arc<Self>) {
185 THREADPOOL_WORKER_STATE.with(|state| {
186 state.borrow(|state| {
187 if let Some(state) = state {
188 if Arc::ptr_eq(self, &state.worker) {
189 state.wake.set(true);
190 return;
191 }
192 }
193 unsafe {
197 self.io_ring.push(opcode::Nop::new().build(), true);
198 }
199 })
200 })
201 }
202
203 fn wake(self: Arc<Self>) {
204 self.wake_by_ref()
205 }
206}
207
208impl Worker {
209 pub fn new(io_ring_size: u32, io_ring: IoRing) -> Self {
210 Self {
211 io_ring_size,
212 io_ring,
213 }
214 }
215
216 pub fn initiator(self: Arc<Self>, scheduler: Scheduler) -> IoInitiator {
217 IoInitiator {
218 client: Arc::new(WorkerClient {
219 scheduler,
220 worker: self,
221 }),
222 }
223 }
224
225 pub fn run<Fut: Future>(
226 self: Arc<Self>,
227 completion_ring: IoCompletionRing,
228 fut: Fut,
229 ) -> Fut::Output {
230 tracing::debug!(
231 io_ring_size = self.io_ring_size,
232 "AffinitizedWorker running"
233 );
234
235 let waker = self.clone().into();
236 let mut cx = Context::from_waker(&waker);
237 let mut fut = pin!(fut);
238 let mut idle_task = None;
239 let state = AffinitizedWorkerState {
240 worker: self,
241 wake: Cell::new(false),
242 new_idle_task: Cell::new(None),
243 completion_ring: RefCell::new(completion_ring),
244 };
245
246 THREADPOOL_WORKER_STATE.with(|slot| {
247 slot.lend(&state, || {
248 loop {
249 state.completion_ring.borrow_mut().process();
251
252 match fut.poll_unpin(&mut cx) {
253 Poll::Ready(r) => {
254 tracing::debug!("AffinitizedWorker exiting");
255 break r;
256 }
257 Poll::Pending => {}
258 }
259
260 if !state.wake.take() {
261 if let Some(new_idle_task) = state.new_idle_task.take() {
262 idle_task = Some(new_idle_task(IdleControl {
263 inner: state.worker.clone(),
264 }));
265 tracing::debug!("new idle task");
266 }
267
268 if let Some(task) = &mut idle_task {
269 match task.poll_unpin(&mut cx) {
270 Poll::Ready(()) => {
271 tracing::debug!("idle task done");
272 idle_task = None;
273 }
274 Poll::Pending => {}
275 }
276
277 if state.wake.take() {
278 continue;
279 }
280 }
281
282 state.worker.io_ring.submit_and_wait();
283 }
284 }
285 })
286 })
287 }
288}
289
290#[derive(Debug)]
292pub struct IdleControl {
293 inner: Arc<Worker>,
294}
295
296impl IdleControl {
297 pub fn pre_block(&mut self) -> bool {
302 THREADPOOL_WORKER_STATE.with(|state| {
303 state.borrow(|state| {
304 let state = state.unwrap();
305 assert!(Arc::ptr_eq(&state.worker, &self.inner));
306
307 self.inner.io_ring.submit();
312 !state.wake.get() && state.completion_ring.borrow().is_empty()
315 })
316 })
317 }
318
319 pub fn ring_fd(&self) -> BorrowedFd<'_> {
323 self.inner.io_ring.as_fd()
324 }
325}
326
327impl Spawn for IoInitiator {
328 fn scheduler(&self, _metadata: &TaskMetadata) -> Arc<dyn Schedule> {
329 self.client.clone()
330 }
331}
332
333#[derive(Debug, inspect::Inspect)]
334struct WorkerClient {
335 #[inspect(skip)]
336 scheduler: Scheduler,
337 #[inspect(flatten)]
338 worker: Arc<Worker>,
339}
340
341impl Schedule for WorkerClient {
342 fn schedule(&self, runnable: Runnable) {
343 self.scheduler.schedule(runnable)
344 }
345
346 fn name(&self) -> Arc<str> {
347 self.scheduler.name()
348 }
349}
350
351#[derive(Debug, Clone)]
354pub struct IoInitiator {
355 client: Arc<WorkerClient>,
356}
357
358impl IoInitiator {
359 pub fn probe(&self, opcode: u8) -> bool {
361 self.client.worker.io_ring.probe(opcode)
362 }
363
364 pub async unsafe fn issue_io<T, F>(&self, mut io_mem: T, f: F) -> (io::Result<i32>, T)
386 where
387 T: 'static + Unpin,
388 F: FnOnce(&mut T) -> squeue::Entry,
389 {
390 struct AbortOnDrop;
394
395 impl Drop for AbortOnDrop {
396 fn drop(&mut self) {
397 eprintln!("io dropped in flight, may reference stack memory, aborting process");
398 abort();
399 }
400 }
401
402 let abort_on_drop = AbortOnDrop;
404
405 let result = poll_fn({
407 enum State<F> {
408 NotIssued(F),
409 Issued(usize),
410 Invalid,
411 }
412
413 let mut state = State::NotIssued(f);
414 let io_mem = &mut io_mem;
415 move |cx: &mut Context<'_>| {
416 match std::mem::replace(&mut state, State::Invalid) {
417 State::NotIssued(f) => {
418 state = State::Issued(unsafe {
420 self.submit_io((f)(io_mem), IoMemory::new(()), cx.waker().clone())
421 });
422
423 Poll::Pending
425 }
426 State::Issued(idx) => match self.poll_io(cx, idx) {
427 Poll::Ready((result, _)) => Poll::Ready(result),
428 Poll::Pending => {
429 state = State::Issued(idx);
430 Poll::Pending
431 }
432 },
433 State::Invalid => unreachable!(),
434 }
435 }
436 })
437 .await;
438
439 std::mem::forget(abort_on_drop);
441
442 let result = if result >= 0 {
443 Ok(result)
444 } else {
445 Err(io::Error::from_raw_os_error(-result))
446 };
447
448 (result, io_mem)
449 }
450
451 unsafe fn submit_io(&self, sqe: squeue::Entry, io_mem: IoMemory, waker: Waker) -> usize {
455 let needs_submit = THREADPOOL_WORKER_STATE.with(|state| {
458 state.borrow(|state| {
459 state.is_none_or(|state| !Arc::ptr_eq(&state.worker, &self.client.worker))
460 })
461 });
462
463 unsafe {
465 self.client
466 .worker
467 .io_ring
468 .new_io(sqe, io_mem, waker, needs_submit)
469 }
470 }
471
472 fn poll_io(&self, cx: &mut Context<'_>, idx: usize) -> Poll<(i32, IoMemory)> {
473 self.client.worker.io_ring.poll_io(cx, idx)
474 }
475
476 fn drop_io(&self, idx: usize) {
477 self.client.worker.io_ring.drop_io(idx);
478 }
479}
480
481pub struct Io<T, Init: Borrow<IoInitiator> = IoInitiator> {
483 initiator: Init,
484 state: IoState<T>,
485}
486
487enum IoState<T> {
488 NotStarted(squeue::Entry, T),
489 Started(usize),
490 Completed(i32, T),
491 Invalid,
492}
493
494impl<T, Init: Borrow<IoInitiator>> Debug for Io<T, Init> {
495 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496 f.pad("Io")
497 }
498}
499
500impl<T: 'static + Send + Sync + Unpin, Init: Borrow<IoInitiator> + Unpin> Io<T, Init> {
501 pub unsafe fn new(initiator: Init, submission_queue_entry: squeue::Entry, io_mem: T) -> Self {
509 Self {
510 initiator,
511 state: IoState::NotStarted(submission_queue_entry, io_mem),
512 }
513 }
514
515 pub fn initiator(&self) -> &Init {
517 &self.initiator
518 }
519
520 pub fn cancel(&self) {
522 unsafe {
524 self.cancel_inner(|user_data| opcode::AsyncCancel::new(user_data).build());
525 }
526 }
527
528 pub fn cancel_timeout(&self) {
530 unsafe {
532 self.cancel_inner(|user_data| opcode::TimeoutRemove::new(user_data).build());
533 }
534 }
535
536 pub fn cancel_poll(&self) {
538 unsafe {
540 self.cancel_inner(|user_data| opcode::PollRemove::new(user_data).build());
541 }
542 }
543
544 unsafe fn cancel_inner(&self, f: impl FnOnce(u64) -> squeue::Entry) {
548 let sqe = f(self.user_data().unwrap());
549 let idx = unsafe {
551 self.initiator
552 .borrow()
553 .submit_io(sqe, IoMemory::new(()), Waker::noop().clone())
554 };
555 self.initiator.borrow().drop_io(idx);
556 }
557
558 pub fn into_mem(mut self) -> T {
562 match std::mem::replace(&mut self.state, IoState::Invalid) {
563 IoState::Started(_) => {
564 panic!("io is not complete");
565 }
566 IoState::NotStarted(_, io_mem) | IoState::Completed(_, io_mem) => io_mem,
567 IoState::Invalid => unreachable!(),
568 }
569 }
570
571 pub fn user_data(&self) -> Option<u64> {
575 match self.state {
576 IoState::Started(idx) => Some(idx as u64),
577 IoState::NotStarted(_, _) | IoState::Completed(_, _) | IoState::Invalid => None,
578 }
579 }
580}
581
582impl<T: 'static + Sync + Send + Unpin, Init: Borrow<IoInitiator> + Unpin> Future for Io<T, Init> {
583 type Output = io::Result<i32>;
584
585 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
586 let this = Pin::get_mut(self);
587 let result = match std::mem::replace(&mut this.state, IoState::Invalid) {
588 IoState::NotStarted(entry, io_mem) => {
589 let idx = unsafe {
591 this.initiator.borrow().submit_io(
592 entry,
593 IoMemory::new(io_mem),
594 cx.waker().clone(),
595 )
596 };
597 this.state = IoState::Started(idx);
598 return Poll::Pending;
599 }
600 IoState::Started(idx) => {
601 this.state = IoState::Started(idx);
602 let (result, io_mem) = std::task::ready!(this.initiator.borrow().poll_io(cx, idx));
603 this.state = IoState::Completed(result, io_mem.downcast());
604 result
605 }
606 IoState::Completed(result, io_mem) => {
607 this.state = IoState::Completed(result, io_mem);
608 result
609 }
610 IoState::Invalid => unreachable!(),
611 };
612 let result = if result >= 0 {
613 Ok(result)
614 } else {
615 Err(io::Error::from_raw_os_error(-result))
616 };
617 Poll::Ready(result)
618 }
619}
620
621impl<T, Init: Borrow<IoInitiator>> Drop for Io<T, Init> {
622 fn drop(&mut self) {
623 if let IoState::Started(idx) = self.state {
624 self.initiator.borrow().drop_io(idx);
625 }
626 }
627}
628
629#[cfg(test)]
630mod tests {
631 #![expect(
632 clippy::disallowed_methods,
633 reason = "test code using futures channels"
634 )]
635
636 use super::Io;
637 use super::IoRing;
638 use crate::IoUringPool;
639 use crate::uring::tests::SingleThreadPool;
640 use futures::executor::block_on;
641 use io_uring::opcode;
642 use io_uring::types;
643 use pal_async::task::Spawn;
644 use parking_lot::Mutex;
645 use std::future::Future;
646 use std::os::unix::prelude::*;
647 use std::pin::Pin;
648 use std::sync::Arc;
649 use std::sync::atomic::AtomicBool;
650 use std::sync::atomic::Ordering;
651 use std::task::Context;
652 use std::task::Poll;
653 use std::task::Waker;
654 use std::thread;
655 use std::time::Duration;
656 use std::time::Instant;
657 use tempfile::NamedTempFile;
658 use test_with_tracing::test;
659
660 const PAGE_SIZE: usize = 4096;
661
662 fn new_test_file() -> NamedTempFile {
663 let mut file = NamedTempFile::new().unwrap();
664 file.as_file_mut().set_len(1024 * 64).unwrap();
665 file
666 }
667
668 struct Env {
669 rx: std::sync::mpsc::Receiver<()>,
670 }
671
672 struct TestCase {
673 tp: SingleThreadPool,
674 file: NamedTempFile,
675 _tx: std::sync::mpsc::Sender<()>,
676 }
677
678 impl Drop for Env {
679 fn drop(&mut self) {
680 while self.rx.recv().is_ok() {}
681 }
682 }
683
684 fn new_test() -> (Env, TestCase) {
685 let file = new_test_file();
686 let tp = SingleThreadPool::new().unwrap();
687 let (tx, rx) = std::sync::mpsc::channel();
688 (Env { rx }, TestCase { tp, file, _tx: tx })
689 }
690
691 struct Timeout {
692 shared_state: Arc<Mutex<TimeoutState>>,
693 }
694
695 struct TimeoutState {
696 completed: bool,
697 waker: Option<Waker>,
698 }
699
700 impl Future for Timeout {
701 type Output = ();
702
703 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
704 let mut shared_state = self.shared_state.lock();
705 if shared_state.completed {
706 Poll::Ready(())
707 } else {
708 shared_state.waker = Some(cx.waker().clone());
709 Poll::Pending
710 }
711 }
712 }
713
714 impl Timeout {
715 fn new(duration: Duration) -> Self {
716 let shared_state = Arc::new(Mutex::new(TimeoutState {
717 completed: false,
718 waker: None,
719 }));
720
721 let thread_shared_state = shared_state.clone();
723 thread::spawn(move || {
724 thread::sleep(duration);
725 let mut shared_state = thread_shared_state.lock();
726 shared_state.completed = true;
727 if let Some(waker) = shared_state.waker.take() {
728 waker.wake()
729 }
730 });
731
732 Timeout { shared_state }
733 }
734 }
735
736 macro_rules! skip_if_no_io_uring_support {
739 () => {
740 if IoRing::new(1).is_err() {
741 println!("Test case skipped (no IO-Uring support)");
742 return;
743 }
744 };
745 }
746
747 #[test]
748 fn test_task_executor() {
749 skip_if_no_io_uring_support!();
750 let tp = SingleThreadPool::new().unwrap();
751
752 let (tx, rx) = std::sync::mpsc::channel();
753
754 tp.initiator()
755 .spawn("test", async move {
756 let now = Instant::now();
757 Timeout::new(Duration::from_secs(2)).await;
758 assert!(now.elapsed().as_secs() >= 2);
759 tx.send(()).unwrap();
760 })
761 .detach();
762
763 rx.recv().unwrap();
764 }
765
766 #[test]
767 fn test_local_task_executor() {
768 skip_if_no_io_uring_support!();
769 let tp = SingleThreadPool::new().unwrap();
770
771 let (tx, rx) = std::sync::mpsc::channel();
772
773 tp.initiator()
774 .spawn("test", async move {
775 let now = Instant::now();
776 Timeout::new(Duration::from_secs(2)).await;
777 assert!(now.elapsed().as_secs() >= 2);
778 tx.send(()).unwrap();
779 })
780 .detach();
781
782 rx.recv().unwrap();
783 }
784
785 #[test]
786 fn test_serial_io() {
787 skip_if_no_io_uring_support!();
788 let (_env, test) = new_test();
789
790 block_on(async move {
791 let _ = &test;
792 let mut write_buf = vec![0u8; PAGE_SIZE];
793 for (i, b) in write_buf.iter_mut().enumerate() {
794 *b = i as u8;
795 }
796
797 let sqe = opcode::Write::new(
798 types::Fd(test.file.as_fd().as_raw_fd()),
799 write_buf.as_ptr(),
800 write_buf.len() as _,
801 )
802 .offset(0)
803 .build();
804
805 let mut write_io = unsafe { Io::new(test.tp.initiator(), sqe, write_buf) };
807 (&mut write_io).await.unwrap();
808 let write_buf = write_io.into_mem();
809
810 let sqe = opcode::Fsync::new(types::Fd(test.file.as_fd().as_raw_fd())).build();
811 unsafe {
813 Io::new(test.tp.initiator(), sqe, ()).await.unwrap();
814 }
815
816 let mut read_buf = vec![0u8; PAGE_SIZE];
817 let sqe = opcode::Read::new(
818 types::Fd(test.file.as_fd().as_raw_fd()),
819 read_buf.as_mut_ptr(),
820 read_buf.len() as _,
821 )
822 .offset(0)
823 .build();
824
825 let mut read_io = unsafe { Io::new(test.tp.initiator(), sqe, read_buf) };
827 (&mut read_io).await.unwrap();
828 let read_buf = read_io.into_mem();
829
830 assert_eq!(&write_buf[..], &read_buf[..]);
831 });
832 }
833
834 #[test]
835 fn test_stack_io() {
836 skip_if_no_io_uring_support!();
837 let (_env, test) = new_test();
838
839 block_on(async move {
840 let _ = &test;
841 let mut write_buf = [0; 100];
842 for (i, b) in write_buf.iter_mut().enumerate() {
843 *b = i as u8;
844 }
845
846 let (r, write_buf) = unsafe {
848 test.tp
849 .initiator()
850 .issue_io(write_buf, |write_buf| {
851 opcode::Write::new(
852 types::Fd(test.file.as_fd().as_raw_fd()),
853 write_buf.as_ptr(),
854 write_buf.len() as _,
855 )
856 .offset(0)
857 .build()
858 })
859 .await
860 };
861 r.unwrap();
862
863 unsafe {
865 test.tp
866 .initiator()
867 .issue_io((), |_| {
868 opcode::Fsync::new(types::Fd(test.file.as_fd().as_raw_fd())).build()
869 })
870 .await
871 .0
872 .unwrap();
873 }
874
875 let read_buf = [0u8; 100];
876 let (r, read_buf) = unsafe {
878 test.tp
879 .initiator()
880 .issue_io(read_buf, |read_buf| {
881 opcode::Read::new(
882 types::Fd(test.file.as_fd().as_raw_fd()),
883 read_buf.as_mut_ptr(),
884 read_buf.len() as _,
885 )
886 .offset(0)
887 .build()
888 })
889 .await
890 };
891 r.unwrap();
892
893 assert_eq!(&write_buf[..], &read_buf[..]);
894 });
895 }
896
897 #[test]
903 #[cfg(not(feature = "ci"))]
904 fn test_split_io() {
905 skip_if_no_io_uring_support!();
906 let (_env, test) = new_test();
907
908 block_on(async move {
909 let _ = &test;
910 let mut write_buf1 = vec![0u8; PAGE_SIZE];
911 for (i, b) in write_buf1.iter_mut().enumerate() {
912 *b = i as u8;
913 }
914 let sqe1 = opcode::Write::new(
915 types::Fd(test.file.as_fd().as_raw_fd()),
916 write_buf1.as_mut_ptr(),
917 write_buf1.len() as _,
918 )
919 .offset(0)
920 .build();
921 let write1 = unsafe { Io::new(test.tp.initiator(), sqe1, write_buf1) };
923
924 let mut write_buf2 = vec![0u8; PAGE_SIZE];
925 for (i, b) in write_buf2.iter_mut().enumerate() {
926 *b = i as u8;
927 }
928 let sqe2 = opcode::Write::new(
929 types::Fd(test.file.as_fd().as_raw_fd()),
930 write_buf2.as_mut_ptr(),
931 write_buf2.len() as _,
932 )
933 .offset(4096)
934 .build();
935 let write2 = unsafe { Io::new(test.tp.initiator(), sqe2, write_buf2) };
937
938 let (r1, r2) = futures::join!(write1, write2);
939 r1.unwrap();
940 r2.unwrap();
941 });
942 }
943
944 #[test]
950 #[cfg(not(feature = "ci"))]
951 fn test_tp_io() {
952 skip_if_no_io_uring_support!();
953 let (_env, test) = new_test();
954
955 test.tp
956 .initiator()
957 .clone()
958 .spawn("test", async move {
959 let _ = &test;
960 let mut write_buf = vec![0u8; PAGE_SIZE];
961 for (i, b) in write_buf.iter_mut().enumerate() {
962 *b = i as u8;
963 }
964
965 let sqe = opcode::Write::new(
966 types::Fd(test.file.as_fd().as_raw_fd()),
967 write_buf.as_mut_ptr(),
968 write_buf.len() as _,
969 )
970 .offset(0)
971 .build();
972
973 let mut write_io = unsafe { Io::new(test.tp.initiator(), sqe, write_buf) };
975 (&mut write_io).await.unwrap();
976 let write_buf = write_io.into_mem();
977
978 let mut read_buf = vec![0u8; PAGE_SIZE];
979
980 let sqe = opcode::Read::new(
981 types::Fd(test.file.as_fd().as_raw_fd()),
982 read_buf.as_mut_ptr(),
983 read_buf.len() as _,
984 )
985 .offset(0)
986 .build();
987
988 let mut read_io = unsafe { Io::new(test.tp.initiator(), sqe, read_buf) };
990 (&mut read_io).await.unwrap();
991 let read_buf = read_io.into_mem();
992
993 assert_eq!(&write_buf[..], &read_buf[..]);
994 })
995 .detach();
996 }
997
998 #[test]
999 fn test_run_until_none() {
1000 skip_if_no_io_uring_support!();
1001 let (send, recv) = futures::channel::oneshot::channel();
1002 let (send2, recv2) = futures::channel::oneshot::channel();
1003 let pool = IoUringPool::new("test", 16).unwrap();
1004 let done = Arc::new(AtomicBool::new(false));
1005 pool.client()
1006 .initiator()
1007 .spawn("hmm", {
1008 async move {
1009 recv.await.unwrap();
1010 send2.send(()).unwrap();
1011 }
1012 })
1013 .detach();
1014 pool.client().set_idle_task({
1015 let done = done.clone();
1016 |_ctl| async move {
1017 send.send(()).unwrap();
1018 recv2.await.unwrap();
1019 done.store(true, Ordering::SeqCst);
1020 }
1021 });
1022 pool.run();
1023 assert!(done.load(Ordering::SeqCst));
1024 }
1025}