underhill_threadpool/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4#![cfg_attr(not(target_os = "linux"), expect(missing_docs))]
5#![cfg(target_os = "linux")]
6
7//! The Underhill per-CPU thread pool used to run async tasks and IO.
8//!
9//! This is built on top of [`pal_uring`] and [`pal_async`].
10
11#![forbid(unsafe_code)]
12
13use cvm_tracing::CVM_ALLOWED;
14use inspect::Inspect;
15use loan_cell::LoanCell;
16use pal::unix::affinity::CpuSet;
17use pal_async::fd::FdReadyDriver;
18use pal_async::task::Runnable;
19use pal_async::task::Schedule;
20use pal_async::task::Spawn;
21use pal_async::task::SpawnLocal;
22use pal_async::task::TaskMetadata;
23use pal_async::timer::TimerDriver;
24use pal_async::wait::WaitDriver;
25use pal_uring::FdReady;
26use pal_uring::FdWait;
27use pal_uring::IdleControl;
28use pal_uring::Initiate;
29use pal_uring::IoInitiator;
30use pal_uring::IoUringPool;
31use pal_uring::PoolClient;
32use pal_uring::Timer;
33use parking_lot::Mutex;
34use std::future::poll_fn;
35use std::io;
36use std::marker::PhantomData;
37use std::os::fd::RawFd;
38use std::sync::Arc;
39use std::sync::OnceLock;
40use std::sync::atomic::AtomicBool;
41use std::sync::atomic::AtomicU32;
42use std::sync::atomic::Ordering::Relaxed;
43use std::task::Poll;
44use std::task::Waker;
45use thiserror::Error;
46
47/// Represents the internal state of an `AffinitizedThreadpool`.
48#[derive(Debug, Inspect)]
49struct AffinitizedThreadpoolState {
50    #[inspect(iter_by_index)]
51    drivers: Vec<ThreadpoolDriver>,
52}
53
54/// A pool of affinitized worker threads.
55#[derive(Clone, Debug, Inspect)]
56#[inspect(transparent)]
57pub struct AffinitizedThreadpool {
58    state: Arc<AffinitizedThreadpoolState>,
59}
60
61/// A builder for [`AffinitizedThreadpool`].
62#[derive(Debug, Clone)]
63pub struct ThreadpoolBuilder {
64    max_bounded_workers: Option<u32>,
65    max_unbounded_workers: Option<u32>,
66    ring_size: u32,
67}
68
69impl ThreadpoolBuilder {
70    /// Returns a new builder.
71    pub fn new() -> Self {
72        Self {
73            max_bounded_workers: None,
74            max_unbounded_workers: None,
75            ring_size: 256,
76        }
77    }
78
79    /// Sets the maximum number of bounded kernel workers for each worker ring,
80    /// per NUMA node.
81    ///
82    /// This defaults in the kernel to `min(io_ring_size, cpu_count * 4)`.
83    pub fn max_bounded_workers(&mut self, n: u32) -> &mut Self {
84        self.max_bounded_workers = Some(n);
85        self
86    }
87
88    /// Sets the maximum number of unbounded kernel workers for each worker
89    /// ring, per NUMA node.
90    ///
91    /// This defaults to the process's `RLIMIT_NPROC` limit at time of
92    /// threadpool creation.
93    pub fn max_unbounded_workers(&mut self, n: u32) -> &mut Self {
94        self.max_unbounded_workers = Some(n);
95        self
96    }
97
98    /// Sets the IO ring size. Defaults to 256.
99    pub fn ring_size(&mut self, ring_size: u32) -> &mut Self {
100        assert_ne!(ring_size, 0);
101        self.ring_size = ring_size;
102        self
103    }
104
105    /// Builds the thread pool.
106    pub fn build(&self) -> io::Result<AffinitizedThreadpool> {
107        let proc_count = pal::unix::affinity::max_present_cpu()? + 1;
108
109        let builder = Arc::new(self.clone());
110        let mut drivers = Vec::with_capacity(proc_count as usize);
111        drivers.extend((0..proc_count).map(|processor| ThreadpoolDriver {
112            inner: Arc::new(ThreadpoolDriverInner {
113                once: OnceLock::new(),
114                cpu: processor,
115                builder: builder.clone(),
116                name: format!("threadpool-{}", processor).into(),
117                affinity_set: false.into(),
118                state: Mutex::new(ThreadpoolDriverState {
119                    notifier: None,
120                    affinity: AffinityState::Waiting(Vec::new()),
121                    spawned: false,
122                }),
123            }),
124        }));
125
126        let state = Arc::new(AffinitizedThreadpoolState { drivers });
127
128        Ok(AffinitizedThreadpool { state })
129    }
130
131    // Spawn a pool on the specified CPU.
132    //
133    // If the specified CPU is present but not online, spawns a thread with
134    // affinity set to all processors that are in the same package, if possible.
135    //
136    // Note that this sets affinity of the current thread and does not revert
137    // it. Call this from a temporary thread to avoid permanently changing the
138    // affinity of the current thread.
139    fn spawn_pool(&self, cpu: u32, driver: ThreadpoolDriver) -> io::Result<PoolClient> {
140        tracing::debug!(cpu, "starting threadpool thread");
141
142        let online = is_cpu_online(cpu)?;
143        let mut affinity = CpuSet::new();
144        if online {
145            affinity.set(cpu);
146        } else {
147            // The CPU is not online. Set the affinity to match the package.
148            //
149            // TODO: figure out how to do this (maybe pass in
150            // ProcessorTopology)--the sysfs topology directory does not exist
151            // for offline CPUs. For now, just allow all CPUs.
152            let online_cpus = fs_err::read_to_string("/sys/devices/system/cpu/online")?;
153            affinity
154                .set_mask_list(&online_cpus)
155                .map_err(io::Error::other)?;
156        }
157
158        // Set the current thread's affinity so that allocations for the worker
159        // thread are performed in the correct node.
160        let affinity_ok = match pal::unix::affinity::set_current_thread_affinity(&affinity) {
161            Ok(()) => true,
162            Err(err) if err.kind() == io::ErrorKind::InvalidInput && !online => {
163                // None of the CPUs in the package are online. That's not ideal,
164                // because the thread will probably get allocated with the wrong node,
165                // but it's recoverable.
166                tracing::warn!(
167                    CVM_ALLOWED,
168                    cpu,
169                    error = &err as &dyn std::error::Error,
170                    "could not set package affinity for thread pool thread"
171                );
172                false
173            }
174            Err(err) => return Err(err),
175        };
176
177        let this = self.clone();
178        let (send, recv) = std::sync::mpsc::channel();
179        let thread = std::thread::Builder::new()
180            .name("tp".to_owned())
181            .spawn(move || {
182                // Create the pool and report back the result. This must be done
183                // on the thread so that the io-uring task context gets created.
184                // If we create this back on the initiating thread, then the
185                // task context gets created and then destroyed, and subsequent
186                // calls to update the affinity fail until the task context gets
187                // recreated (next time an IO is issued).
188                //
189                // FUTURE: take advantage of the per-thread task context and
190                // pre-register the ring via IORING_REGISTER_RING_FDS.
191                let pool = match this
192                    .make_ring(driver.inner.name.clone(), affinity_ok.then_some(&affinity))
193                {
194                    Ok(pool) => pool,
195                    Err(err) => {
196                        send.send(Err(err)).ok();
197                        return;
198                    }
199                };
200
201                let driver = driver;
202                let notifier = {
203                    let mut state = driver.inner.state.lock();
204                    state.spawned = true;
205                    if online {
206                        // There cannot be any waiters yet since they can only
207                        // be registered from the current thread.
208                        driver.inner.affinity_set.store(true, Relaxed);
209                        state.affinity = AffinityState::Set;
210                    }
211                    state.notifier.take()
212                };
213
214                send.send(Ok(pool.client().clone())).ok();
215
216                // Store the current thread's driver so that spawned tasks can
217                // find it via `Thread::current()`. Do this via a loan instead
218                // of storing it directly in TLS to avoid the overhead of
219                // registering a destructor.
220                CURRENT_THREAD_DRIVER.with(|current| {
221                    current.lend(&driver, || {
222                        if let Some(notifier) = notifier {
223                            (notifier.0)();
224                        }
225                        pool.run()
226                    });
227                });
228            })?;
229
230        // Wait for the pool to be initialized.
231        recv.recv().unwrap().inspect_err(|_| {
232            // Wait for the child thread to exit to bound resource use.
233            thread.join().unwrap();
234        })
235    }
236
237    fn make_ring(&self, name: Arc<str>, affinity: Option<&CpuSet>) -> io::Result<IoUringPool> {
238        let pool = IoUringPool::new(name, self.ring_size)?;
239        let client = pool.client();
240        client.set_iowq_max_workers(self.max_bounded_workers, self.max_unbounded_workers)?;
241        if let Some(affinity) = affinity {
242            client.set_iowq_affinity(affinity)?
243        }
244        Ok(pool)
245    }
246}
247
248/// Returns whether the specified CPU is online.
249pub fn is_cpu_online(cpu: u32) -> io::Result<bool> {
250    // Depending at the very minimum on whether the kernel has been built with
251    // `CONFIG_HOTPLUG_CPU` or not, the individual `online` pseudo-files will be
252    // present or absent.
253    //
254    // The other factors at play are the firmware-reported system properties and
255    // the `cpu_ops` structures defined for the platform. All these lead ultimately
256    // to setting the `hotpluggable` property on the cpu device in the kernel.
257    // If that property is set, the `online` file will be present for the given CPU.
258    //
259    // If that file is absent for the CPU in question, that means it is online, and
260    // due to various factors (e.g. BSP on x86_64, missing `cpu_die` handler, etc)
261    // the CPU is not allowed to be offlined.
262    //
263    // The well-established cross-platform tools (e.g. `perf`) in the kernel repo
264    // rely on the same: if the `online` file is missing, assume the CPU is online
265    // provided the CPU "home" directory is present (although they don't have
266    // comments like this one :)).
267
268    let cpu_sysfs_home = format!("/sys/devices/system/cpu/cpu{cpu}");
269    let cpu_sysfs_home = std::path::Path::new(cpu_sysfs_home.as_str());
270    let online = cpu_sysfs_home.join("online");
271    match fs_err::read_to_string(online) {
272        Ok(s) => Ok(s.trim() == "1"),
273        Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(cpu_sysfs_home.exists()),
274        Err(err) => Err(err),
275    }
276}
277
278/// Sets the specified CPU online, if it is not already online.
279pub fn set_cpu_online(cpu: u32) -> io::Result<()> {
280    let online = format!("/sys/devices/system/cpu/cpu{cpu}/online");
281    match fs_err::read_to_string(&online) {
282        Ok(s) if s.trim() == "0" => {
283            fs_err::write(&online, "1")?;
284        }
285        Ok(_) => {
286            // Already online.
287        }
288        Err(err) if err.kind() == io::ErrorKind::NotFound => {
289            // The file doesn't exist, so the processor is always online.
290        }
291        Err(err) => return Err(err),
292    }
293    Ok(())
294}
295
296impl AffinitizedThreadpool {
297    /// Creates a new threadpool with the specified ring size.
298    pub fn new(io_ring_size: u32) -> io::Result<Self> {
299        ThreadpoolBuilder::new().ring_size(io_ring_size).build()
300    }
301
302    /// Returns an object that can be used to submit IOs or spawn tasks to the
303    /// current processor's ring.
304    ///
305    /// Spawned tasks will remain affinitized to the current thread. Spawn
306    /// directly on the threadpool object to get a task that will run on any
307    /// thread.
308    pub fn current_driver(&self) -> &ThreadpoolDriver {
309        self.driver(pal::unix::affinity::get_cpu_number())
310    }
311
312    /// Returns an object that can be used to submit IOs to the specified ring
313    /// in the pool, or to spawn tasks on the specified thread.
314    ///
315    /// Spawned tasks will remain affinitized to the specified thread. Spawn
316    /// directly on the threadpool object to get a task that will run on any
317    /// thread.
318    pub fn driver(&self, ring_id: u32) -> &ThreadpoolDriver {
319        &self.state.drivers[ring_id as usize]
320    }
321
322    /// Returns an iterator of drivers for threads that are running and have
323    /// their affinity set.
324    ///
325    /// This is useful for getting a set of drivers that can be used to
326    /// parallelize work.
327    pub fn active_drivers(&self) -> impl Iterator<Item = &ThreadpoolDriver> + Clone {
328        self.state
329            .drivers
330            .iter()
331            .filter(|driver| driver.is_affinity_set())
332    }
333}
334
335impl Schedule for AffinitizedThreadpoolState {
336    fn schedule(&self, runnable: Runnable) {
337        self.drivers[pal::unix::affinity::get_cpu_number() as usize]
338            .client(Some(runnable.metadata()))
339            .schedule(runnable);
340    }
341
342    fn name(&self) -> Arc<str> {
343        static NAME: OnceLock<Arc<str>> = OnceLock::new();
344        NAME.get_or_init(|| "tp".into()).clone()
345    }
346}
347
348impl Spawn for AffinitizedThreadpool {
349    fn scheduler(&self, _metadata: &TaskMetadata) -> Arc<dyn Schedule> {
350        self.state.clone()
351    }
352}
353
354/// Initiate IOs to the current CPU's thread.
355impl Initiate for AffinitizedThreadpool {
356    fn initiator(&self) -> &IoInitiator {
357        self.current_driver().initiator()
358    }
359}
360
361/// The state for the thread pool thread for the currently running CPU.
362#[derive(Debug, Copy, Clone)]
363pub struct Thread {
364    _not_send_sync: PhantomData<*const ()>,
365}
366
367impl Thread {
368    /// Returns an instance for the current CPU.
369    pub fn current() -> Option<Self> {
370        if !CURRENT_THREAD_DRIVER.with(|current| current.is_lent()) {
371            return None;
372        }
373        Some(Self {
374            _not_send_sync: PhantomData,
375        })
376    }
377
378    /// Calls `f` with the driver for the current thread.
379    pub fn with_driver<R>(&self, f: impl FnOnce(&ThreadpoolDriver) -> R) -> R {
380        CURRENT_THREAD_DRIVER.with(|current| current.borrow(|driver| f(driver.unwrap())))
381    }
382
383    fn with_once<R>(&self, f: impl FnOnce(&ThreadpoolDriver, &ThreadpoolDriverOnce) -> R) -> R {
384        self.with_driver(|driver| f(driver, driver.inner.once.get().unwrap()))
385    }
386
387    /// Sets the idle task to run. The task is returned by `f`, which receives
388    /// the file descriptor of the IO ring.
389    ///
390    /// The idle task is run before waiting on the IO ring. The idle task can
391    /// block synchronously by first calling [`IdleControl::pre_block`], and
392    /// then by polling on the IO ring while the task blocks.
393    pub fn set_idle_task<F>(&self, f: F)
394    where
395        F: 'static + Send + AsyncFnOnce(IdleControl),
396    {
397        self.with_once(|_, once| once.client.set_idle_task(f))
398    }
399
400    /// Tries to set the affinity to this thread's intended CPU, if it has not
401    /// already been set. Returns `Ok(false)` if the intended CPU is still
402    /// offline.
403    pub fn try_set_affinity(&self) -> Result<bool, SetAffinityError> {
404        self.with_once(|driver, once| {
405            let mut state = driver.inner.state.lock();
406            if matches!(state.affinity, AffinityState::Set) {
407                return Ok(true);
408            }
409            if !is_cpu_online(driver.inner.cpu).map_err(SetAffinityError::Online)? {
410                return Ok(false);
411            }
412
413            let mut affinity = CpuSet::new();
414            affinity.set(driver.inner.cpu);
415
416            pal::unix::affinity::set_current_thread_affinity(&affinity)
417                .map_err(SetAffinityError::Thread)?;
418            once.client
419                .set_iowq_affinity(&affinity)
420                .map_err(SetAffinityError::Ring)?;
421
422            let old_affinity_state = std::mem::replace(&mut state.affinity, AffinityState::Set);
423            driver.inner.affinity_set.store(true, Relaxed);
424            drop(state);
425
426            match old_affinity_state {
427                AffinityState::Waiting(wakers) => {
428                    for waker in wakers {
429                        waker.wake();
430                    }
431                }
432                AffinityState::Set => unreachable!(),
433            }
434            Ok(true)
435        })
436    }
437
438    /// Returns the that caused this thread to spawn.
439    ///
440    /// Returns `None` if the thread was spawned to issue IO.
441    pub fn first_task(&self) -> Option<TaskInfo> {
442        self.with_once(|_, once| once.first_task.clone())
443    }
444}
445
446/// An error that can occur when setting the affinity of a thread.
447#[derive(Debug, Error)]
448pub enum SetAffinityError {
449    /// An error occurred while checking if the CPU is online.
450    #[error("failed to check if CPU is online")]
451    Online(#[source] io::Error),
452    /// An error occurred while setting the thread affinity.
453    #[error("failed to set thread affinity")]
454    Thread(#[source] io::Error),
455    /// An error occurred while setting the IO ring affinity.
456    #[error("failed to set io-uring affinity")]
457    Ring(#[source] io::Error),
458}
459
460thread_local! {
461    static CURRENT_THREAD_DRIVER: LoanCell<ThreadpoolDriver> = const { LoanCell::new() };
462}
463
464impl SpawnLocal for Thread {
465    fn scheduler_local(&self, metadata: &TaskMetadata) -> Arc<dyn Schedule> {
466        self.with_driver(|driver| driver.scheduler(metadata).clone())
467    }
468}
469
470/// A driver for [`AffinitizedThreadpool`] that is targeted at a specific
471/// CPU.
472#[derive(Debug, Clone, Inspect)]
473#[inspect(transparent)]
474pub struct ThreadpoolDriver {
475    inner: Arc<ThreadpoolDriverInner>,
476}
477
478#[derive(Debug, Inspect)]
479struct ThreadpoolDriverInner {
480    #[inspect(flatten)]
481    once: OnceLock<ThreadpoolDriverOnce>,
482    #[inspect(skip)]
483    builder: Arc<ThreadpoolBuilder>,
484    cpu: u32,
485    name: Arc<str>,
486    affinity_set: AtomicBool,
487    #[inspect(flatten)]
488    state: Mutex<ThreadpoolDriverState>,
489}
490
491#[derive(Debug, Inspect)]
492struct ThreadpoolDriverOnce {
493    #[inspect(skip)]
494    client: PoolClient,
495    first_task: Option<TaskInfo>,
496}
497
498/// Information about a task that caused a thread to spawn.
499#[derive(Debug, Clone, Inspect)]
500pub struct TaskInfo {
501    /// The name of the task.
502    pub name: Arc<str>,
503    /// The location of the task.
504    #[inspect(display)]
505    pub location: &'static std::panic::Location<'static>,
506}
507
508#[derive(Debug, Inspect)]
509struct ThreadpoolDriverState {
510    affinity: AffinityState,
511    #[inspect(with = "|x| x.is_some()")]
512    notifier: Option<AffinityNotifier>,
513    spawned: bool,
514}
515
516#[derive(Debug, Inspect)]
517#[inspect(external_tag)]
518enum AffinityState {
519    #[inspect(transparent)]
520    Waiting(#[inspect(with = "|x| x.len()")] Vec<Waker>),
521    Set,
522}
523
524struct AffinityNotifier(Box<dyn FnOnce() + Send>);
525
526impl std::fmt::Debug for AffinityNotifier {
527    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
528        f.pad("AffinityNotifier")
529    }
530}
531
532impl ThreadpoolDriver {
533    fn once(&self, metadata: Option<&TaskMetadata>) -> &ThreadpoolDriverOnce {
534        self.inner.once.get_or_init(|| {
535            let this = self.clone();
536            let client = std::thread::spawn(move || {
537                let inner = this.inner.clone();
538                inner.builder.spawn_pool(inner.cpu, this)
539            })
540            .join()
541            .unwrap()
542            .expect("failed to spawn thread pool thread");
543
544            // If no task metadata was provided (because the thread is being
545            // spawned to issue IO) use the current task's metadata as the
546            // initiating task.
547            pal_async::task::with_current_task_metadata(|current_metadata| {
548                let metadata = metadata.or(current_metadata);
549                ThreadpoolDriverOnce {
550                    client,
551                    first_task: metadata.map(|metadata| TaskInfo {
552                        name: metadata.name().clone(),
553                        location: metadata.location(),
554                    }),
555                }
556            })
557        })
558    }
559
560    fn client(&self, metadata: Option<&TaskMetadata>) -> &PoolClient {
561        &self.once(metadata).client
562    }
563
564    /// Returns the target CPU number for this thread.
565    ///
566    /// This may be different from the CPU tasks actually run on if the affinity
567    /// has not yet been set for the thread.
568    pub fn target_cpu(&self) -> u32 {
569        self.inner.cpu
570    }
571
572    /// Returns whether this thread's CPU affinity has been set to the intended
573    /// CPU.
574    pub fn is_affinity_set(&self) -> bool {
575        self.inner.affinity_set.load(Relaxed)
576    }
577
578    /// Waits for the affinity to be set to this thread's intended CPU. If the
579    /// CPU was not online when the thread was created, then this will block
580    /// until the CPU is online and someone calls `try_set_affinity`.
581    pub async fn wait_for_affinity(&self) {
582        // Ensure the thread has been spawned and that the notifier has been
583        // called. Use the calling task as the initiating task for diagnostics
584        // purposes.
585        pal_async::task::with_current_task_metadata(|metadata| self.once(metadata));
586        poll_fn(|cx| {
587            let mut state = self.inner.state.lock();
588            match &mut state.affinity {
589                AffinityState::Waiting(wakers) => {
590                    if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
591                        wakers.push(cx.waker().clone());
592                    }
593                    Poll::Pending
594                }
595                AffinityState::Set => Poll::Ready(()),
596            }
597        })
598        .await
599    }
600
601    /// Sets a function to be called when the thread gets spawned.
602    ///
603    /// Return `Err(f)` if the thread is already spawned.
604    pub fn set_spawn_notifier<F: 'static + Send + FnOnce()>(&self, f: F) -> Result<(), F> {
605        let mut state = self.inner.state.lock();
606        if !state.spawned {
607            state.notifier = Some(AffinityNotifier(Box::new(f)));
608            Ok(())
609        } else {
610            Err(f)
611        }
612    }
613}
614
615impl Initiate for ThreadpoolDriver {
616    fn initiator(&self) -> &IoInitiator {
617        self.client(None).initiator()
618    }
619}
620
621impl Spawn for ThreadpoolDriver {
622    fn scheduler(&self, metadata: &TaskMetadata) -> Arc<dyn Schedule> {
623        self.client(Some(metadata)).initiator().scheduler(metadata)
624    }
625}
626
627impl FdReadyDriver for ThreadpoolDriver {
628    type FdReady = FdReady<Self>;
629
630    fn new_fd_ready(&self, fd: RawFd) -> io::Result<Self::FdReady> {
631        Ok(FdReady::new(self.clone(), fd))
632    }
633}
634
635impl WaitDriver for ThreadpoolDriver {
636    type Wait = FdWait<Self>;
637
638    fn new_wait(&self, fd: RawFd, read_size: usize) -> io::Result<Self::Wait> {
639        Ok(FdWait::new(self.clone(), fd, read_size))
640    }
641}
642
643impl TimerDriver for ThreadpoolDriver {
644    type Timer = Timer<Self>;
645
646    fn new_timer(&self) -> Self::Timer {
647        Timer::new(self.clone())
648    }
649}
650
651/// A driver for [`AffinitizedThreadpool`] that can be retargeted to different
652/// CPUs.
653#[derive(Debug, Clone)]
654pub struct RetargetableDriver {
655    inner: Arc<RetargetableDriverInner>,
656}
657
658#[derive(Debug)]
659struct RetargetableDriverInner {
660    threadpool: AffinitizedThreadpool,
661    target_cpu: AtomicU32,
662}
663
664impl RetargetableDriver {
665    /// Returns a new driver, initially targeted to `target_cpu`.
666    pub fn new(threadpool: AffinitizedThreadpool, target_cpu: u32) -> Self {
667        Self {
668            inner: Arc::new(RetargetableDriverInner {
669                threadpool,
670                target_cpu: target_cpu.into(),
671            }),
672        }
673    }
674
675    /// Retargets the driver to `target_cpu`.
676    ///
677    /// In-flight IOs will not be retargeted.
678    pub fn retarget(&self, target_cpu: u32) {
679        self.inner.target_cpu.store(target_cpu, Relaxed);
680    }
681
682    /// Returns the current target CPU.
683    pub fn current_target_cpu(&self) -> u32 {
684        self.inner.target_cpu.load(Relaxed)
685    }
686
687    /// Returns the current driver.
688    pub fn current_driver(&self) -> &ThreadpoolDriver {
689        self.inner.current_driver()
690    }
691}
692
693impl Initiate for RetargetableDriver {
694    fn initiator(&self) -> &IoInitiator {
695        self.inner.current_driver().initiator()
696    }
697}
698
699impl Spawn for RetargetableDriver {
700    fn scheduler(&self, _metadata: &TaskMetadata) -> Arc<dyn Schedule> {
701        self.inner.clone()
702    }
703}
704
705impl RetargetableDriverInner {
706    fn current_driver(&self) -> &ThreadpoolDriver {
707        self.threadpool.driver(self.target_cpu.load(Relaxed))
708    }
709}
710
711impl Schedule for RetargetableDriverInner {
712    fn schedule(&self, runnable: Runnable) {
713        self.current_driver()
714            .client(Some(runnable.metadata()))
715            .schedule(runnable)
716    }
717
718    fn name(&self) -> Arc<str> {
719        self.current_driver().inner.name.clone()
720    }
721}
722
723impl FdReadyDriver for RetargetableDriver {
724    type FdReady = FdReady<Self>;
725
726    fn new_fd_ready(&self, fd: RawFd) -> io::Result<Self::FdReady> {
727        Ok(FdReady::new(self.clone(), fd))
728    }
729}
730
731impl WaitDriver for RetargetableDriver {
732    type Wait = FdWait<Self>;
733
734    fn new_wait(&self, fd: RawFd, read_size: usize) -> io::Result<Self::Wait> {
735        Ok(FdWait::new(self.clone(), fd, read_size))
736    }
737}
738
739impl TimerDriver for RetargetableDriver {
740    type Timer = Timer<Self>;
741
742    fn new_timer(&self) -> Self::Timer {
743        Timer::new(self.clone())
744    }
745}