1#![cfg_attr(not(target_os = "linux"), expect(missing_docs))]
5#![cfg(target_os = "linux")]
6
7#![forbid(unsafe_code)]
12
13use cvm_tracing::CVM_ALLOWED;
14use inspect::Inspect;
15use loan_cell::LoanCell;
16use pal::unix::affinity::CpuSet;
17use pal_async::fd::FdReadyDriver;
18use pal_async::task::Runnable;
19use pal_async::task::Schedule;
20use pal_async::task::Spawn;
21use pal_async::task::SpawnLocal;
22use pal_async::task::TaskMetadata;
23use pal_async::timer::TimerDriver;
24use pal_async::wait::WaitDriver;
25use pal_uring::FdReady;
26use pal_uring::FdWait;
27use pal_uring::IdleControl;
28use pal_uring::Initiate;
29use pal_uring::IoInitiator;
30use pal_uring::IoUringPool;
31use pal_uring::PoolClient;
32use pal_uring::Timer;
33use parking_lot::Mutex;
34use std::future::poll_fn;
35use std::io;
36use std::marker::PhantomData;
37use std::os::fd::RawFd;
38use std::sync::Arc;
39use std::sync::OnceLock;
40use std::sync::atomic::AtomicBool;
41use std::sync::atomic::AtomicU32;
42use std::sync::atomic::Ordering::Relaxed;
43use std::task::Poll;
44use std::task::Waker;
45use thiserror::Error;
46
47#[derive(Debug, Inspect)]
49struct AffinitizedThreadpoolState {
50 #[inspect(iter_by_index)]
51 drivers: Vec<ThreadpoolDriver>,
52}
53
54#[derive(Clone, Debug, Inspect)]
56#[inspect(transparent)]
57pub struct AffinitizedThreadpool {
58 state: Arc<AffinitizedThreadpoolState>,
59}
60
61#[derive(Debug, Clone)]
63pub struct ThreadpoolBuilder {
64 max_bounded_workers: Option<u32>,
65 max_unbounded_workers: Option<u32>,
66 ring_size: u32,
67}
68
69impl ThreadpoolBuilder {
70 pub fn new() -> Self {
72 Self {
73 max_bounded_workers: None,
74 max_unbounded_workers: None,
75 ring_size: 256,
76 }
77 }
78
79 pub fn max_bounded_workers(&mut self, n: u32) -> &mut Self {
84 self.max_bounded_workers = Some(n);
85 self
86 }
87
88 pub fn max_unbounded_workers(&mut self, n: u32) -> &mut Self {
94 self.max_unbounded_workers = Some(n);
95 self
96 }
97
98 pub fn ring_size(&mut self, ring_size: u32) -> &mut Self {
100 assert_ne!(ring_size, 0);
101 self.ring_size = ring_size;
102 self
103 }
104
105 pub fn build(&self) -> io::Result<AffinitizedThreadpool> {
107 let proc_count = pal::unix::affinity::max_present_cpu()? + 1;
108
109 let builder = Arc::new(self.clone());
110 let mut drivers = Vec::with_capacity(proc_count as usize);
111 drivers.extend((0..proc_count).map(|processor| ThreadpoolDriver {
112 inner: Arc::new(ThreadpoolDriverInner {
113 once: OnceLock::new(),
114 cpu: processor,
115 builder: builder.clone(),
116 name: format!("threadpool-{}", processor).into(),
117 affinity_set: false.into(),
118 state: Mutex::new(ThreadpoolDriverState {
119 notifier: None,
120 affinity: AffinityState::Waiting(Vec::new()),
121 spawned: false,
122 }),
123 }),
124 }));
125
126 let state = Arc::new(AffinitizedThreadpoolState { drivers });
127
128 Ok(AffinitizedThreadpool { state })
129 }
130
131 fn spawn_pool(&self, cpu: u32, driver: ThreadpoolDriver) -> io::Result<PoolClient> {
140 tracing::debug!(cpu, "starting threadpool thread");
141
142 let online = is_cpu_online(cpu)?;
143 let mut affinity = CpuSet::new();
144 if online {
145 affinity.set(cpu);
146 } else {
147 let online_cpus = fs_err::read_to_string("/sys/devices/system/cpu/online")?;
153 affinity
154 .set_mask_list(&online_cpus)
155 .map_err(io::Error::other)?;
156 }
157
158 let affinity_ok = match pal::unix::affinity::set_current_thread_affinity(&affinity) {
161 Ok(()) => true,
162 Err(err) if err.kind() == io::ErrorKind::InvalidInput && !online => {
163 tracing::warn!(
167 CVM_ALLOWED,
168 cpu,
169 error = &err as &dyn std::error::Error,
170 "could not set package affinity for thread pool thread"
171 );
172 false
173 }
174 Err(err) => return Err(err),
175 };
176
177 let this = self.clone();
178 let (send, recv) = std::sync::mpsc::channel();
179 let thread = std::thread::Builder::new()
180 .name("tp".to_owned())
181 .spawn(move || {
182 let pool = match this
192 .make_ring(driver.inner.name.clone(), affinity_ok.then_some(&affinity))
193 {
194 Ok(pool) => pool,
195 Err(err) => {
196 send.send(Err(err)).ok();
197 return;
198 }
199 };
200
201 let driver = driver;
202 let notifier = {
203 let mut state = driver.inner.state.lock();
204 state.spawned = true;
205 if online {
206 driver.inner.affinity_set.store(true, Relaxed);
209 state.affinity = AffinityState::Set;
210 }
211 state.notifier.take()
212 };
213
214 send.send(Ok(pool.client().clone())).ok();
215
216 CURRENT_THREAD_DRIVER.with(|current| {
221 current.lend(&driver, || {
222 if let Some(notifier) = notifier {
223 (notifier.0)();
224 }
225 pool.run()
226 });
227 });
228 })?;
229
230 recv.recv().unwrap().inspect_err(|_| {
232 thread.join().unwrap();
234 })
235 }
236
237 fn make_ring(&self, name: Arc<str>, affinity: Option<&CpuSet>) -> io::Result<IoUringPool> {
238 let pool = IoUringPool::new(name, self.ring_size)?;
239 let client = pool.client();
240 client.set_iowq_max_workers(self.max_bounded_workers, self.max_unbounded_workers)?;
241 if let Some(affinity) = affinity {
242 client.set_iowq_affinity(affinity)?
243 }
244 Ok(pool)
245 }
246}
247
248pub fn is_cpu_online(cpu: u32) -> io::Result<bool> {
250 let cpu_sysfs_home = format!("/sys/devices/system/cpu/cpu{cpu}");
269 let cpu_sysfs_home = std::path::Path::new(cpu_sysfs_home.as_str());
270 let online = cpu_sysfs_home.join("online");
271 match fs_err::read_to_string(online) {
272 Ok(s) => Ok(s.trim() == "1"),
273 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(cpu_sysfs_home.exists()),
274 Err(err) => Err(err),
275 }
276}
277
278pub fn set_cpu_online(cpu: u32) -> io::Result<()> {
280 let online = format!("/sys/devices/system/cpu/cpu{cpu}/online");
281 match fs_err::read_to_string(&online) {
282 Ok(s) if s.trim() == "0" => {
283 fs_err::write(&online, "1")?;
284 }
285 Ok(_) => {
286 }
288 Err(err) if err.kind() == io::ErrorKind::NotFound => {
289 }
291 Err(err) => return Err(err),
292 }
293 Ok(())
294}
295
296impl AffinitizedThreadpool {
297 pub fn new(io_ring_size: u32) -> io::Result<Self> {
299 ThreadpoolBuilder::new().ring_size(io_ring_size).build()
300 }
301
302 pub fn current_driver(&self) -> &ThreadpoolDriver {
309 self.driver(pal::unix::affinity::get_cpu_number())
310 }
311
312 pub fn driver(&self, ring_id: u32) -> &ThreadpoolDriver {
319 &self.state.drivers[ring_id as usize]
320 }
321
322 pub fn active_drivers(&self) -> impl Iterator<Item = &ThreadpoolDriver> + Clone {
328 self.state
329 .drivers
330 .iter()
331 .filter(|driver| driver.is_affinity_set())
332 }
333}
334
335impl Schedule for AffinitizedThreadpoolState {
336 fn schedule(&self, runnable: Runnable) {
337 self.drivers[pal::unix::affinity::get_cpu_number() as usize]
338 .client(Some(runnable.metadata()))
339 .schedule(runnable);
340 }
341
342 fn name(&self) -> Arc<str> {
343 static NAME: OnceLock<Arc<str>> = OnceLock::new();
344 NAME.get_or_init(|| "tp".into()).clone()
345 }
346}
347
348impl Spawn for AffinitizedThreadpool {
349 fn scheduler(&self, _metadata: &TaskMetadata) -> Arc<dyn Schedule> {
350 self.state.clone()
351 }
352}
353
354impl Initiate for AffinitizedThreadpool {
356 fn initiator(&self) -> &IoInitiator {
357 self.current_driver().initiator()
358 }
359}
360
361#[derive(Debug, Copy, Clone)]
363pub struct Thread {
364 _not_send_sync: PhantomData<*const ()>,
365}
366
367impl Thread {
368 pub fn current() -> Option<Self> {
370 if !CURRENT_THREAD_DRIVER.with(|current| current.is_lent()) {
371 return None;
372 }
373 Some(Self {
374 _not_send_sync: PhantomData,
375 })
376 }
377
378 pub fn with_driver<R>(&self, f: impl FnOnce(&ThreadpoolDriver) -> R) -> R {
380 CURRENT_THREAD_DRIVER.with(|current| current.borrow(|driver| f(driver.unwrap())))
381 }
382
383 fn with_once<R>(&self, f: impl FnOnce(&ThreadpoolDriver, &ThreadpoolDriverOnce) -> R) -> R {
384 self.with_driver(|driver| f(driver, driver.inner.once.get().unwrap()))
385 }
386
387 pub fn set_idle_task<F>(&self, f: F)
394 where
395 F: 'static + Send + AsyncFnOnce(IdleControl),
396 {
397 self.with_once(|_, once| once.client.set_idle_task(f))
398 }
399
400 pub fn try_set_affinity(&self) -> Result<bool, SetAffinityError> {
404 self.with_once(|driver, once| {
405 let mut state = driver.inner.state.lock();
406 if matches!(state.affinity, AffinityState::Set) {
407 return Ok(true);
408 }
409 if !is_cpu_online(driver.inner.cpu).map_err(SetAffinityError::Online)? {
410 return Ok(false);
411 }
412
413 let mut affinity = CpuSet::new();
414 affinity.set(driver.inner.cpu);
415
416 pal::unix::affinity::set_current_thread_affinity(&affinity)
417 .map_err(SetAffinityError::Thread)?;
418 once.client
419 .set_iowq_affinity(&affinity)
420 .map_err(SetAffinityError::Ring)?;
421
422 let old_affinity_state = std::mem::replace(&mut state.affinity, AffinityState::Set);
423 driver.inner.affinity_set.store(true, Relaxed);
424 drop(state);
425
426 match old_affinity_state {
427 AffinityState::Waiting(wakers) => {
428 for waker in wakers {
429 waker.wake();
430 }
431 }
432 AffinityState::Set => unreachable!(),
433 }
434 Ok(true)
435 })
436 }
437
438 pub fn first_task(&self) -> Option<TaskInfo> {
442 self.with_once(|_, once| once.first_task.clone())
443 }
444}
445
446#[derive(Debug, Error)]
448pub enum SetAffinityError {
449 #[error("failed to check if CPU is online")]
451 Online(#[source] io::Error),
452 #[error("failed to set thread affinity")]
454 Thread(#[source] io::Error),
455 #[error("failed to set io-uring affinity")]
457 Ring(#[source] io::Error),
458}
459
460thread_local! {
461 static CURRENT_THREAD_DRIVER: LoanCell<ThreadpoolDriver> = const { LoanCell::new() };
462}
463
464impl SpawnLocal for Thread {
465 fn scheduler_local(&self, metadata: &TaskMetadata) -> Arc<dyn Schedule> {
466 self.with_driver(|driver| driver.scheduler(metadata).clone())
467 }
468}
469
470#[derive(Debug, Clone, Inspect)]
473#[inspect(transparent)]
474pub struct ThreadpoolDriver {
475 inner: Arc<ThreadpoolDriverInner>,
476}
477
478#[derive(Debug, Inspect)]
479struct ThreadpoolDriverInner {
480 #[inspect(flatten)]
481 once: OnceLock<ThreadpoolDriverOnce>,
482 #[inspect(skip)]
483 builder: Arc<ThreadpoolBuilder>,
484 cpu: u32,
485 name: Arc<str>,
486 affinity_set: AtomicBool,
487 #[inspect(flatten)]
488 state: Mutex<ThreadpoolDriverState>,
489}
490
491#[derive(Debug, Inspect)]
492struct ThreadpoolDriverOnce {
493 #[inspect(skip)]
494 client: PoolClient,
495 first_task: Option<TaskInfo>,
496}
497
498#[derive(Debug, Clone, Inspect)]
500pub struct TaskInfo {
501 pub name: Arc<str>,
503 #[inspect(display)]
505 pub location: &'static std::panic::Location<'static>,
506}
507
508#[derive(Debug, Inspect)]
509struct ThreadpoolDriverState {
510 affinity: AffinityState,
511 #[inspect(with = "|x| x.is_some()")]
512 notifier: Option<AffinityNotifier>,
513 spawned: bool,
514}
515
516#[derive(Debug, Inspect)]
517#[inspect(external_tag)]
518enum AffinityState {
519 #[inspect(transparent)]
520 Waiting(#[inspect(with = "|x| x.len()")] Vec<Waker>),
521 Set,
522}
523
524struct AffinityNotifier(Box<dyn FnOnce() + Send>);
525
526impl std::fmt::Debug for AffinityNotifier {
527 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
528 f.pad("AffinityNotifier")
529 }
530}
531
532impl ThreadpoolDriver {
533 fn once(&self, metadata: Option<&TaskMetadata>) -> &ThreadpoolDriverOnce {
534 self.inner.once.get_or_init(|| {
535 let this = self.clone();
536 let client = std::thread::spawn(move || {
537 let inner = this.inner.clone();
538 inner.builder.spawn_pool(inner.cpu, this)
539 })
540 .join()
541 .unwrap()
542 .expect("failed to spawn thread pool thread");
543
544 pal_async::task::with_current_task_metadata(|current_metadata| {
548 let metadata = metadata.or(current_metadata);
549 ThreadpoolDriverOnce {
550 client,
551 first_task: metadata.map(|metadata| TaskInfo {
552 name: metadata.name().clone(),
553 location: metadata.location(),
554 }),
555 }
556 })
557 })
558 }
559
560 fn client(&self, metadata: Option<&TaskMetadata>) -> &PoolClient {
561 &self.once(metadata).client
562 }
563
564 pub fn target_cpu(&self) -> u32 {
569 self.inner.cpu
570 }
571
572 pub fn is_affinity_set(&self) -> bool {
575 self.inner.affinity_set.load(Relaxed)
576 }
577
578 pub async fn wait_for_affinity(&self) {
582 pal_async::task::with_current_task_metadata(|metadata| self.once(metadata));
586 poll_fn(|cx| {
587 let mut state = self.inner.state.lock();
588 match &mut state.affinity {
589 AffinityState::Waiting(wakers) => {
590 if !wakers.iter().any(|w| w.will_wake(cx.waker())) {
591 wakers.push(cx.waker().clone());
592 }
593 Poll::Pending
594 }
595 AffinityState::Set => Poll::Ready(()),
596 }
597 })
598 .await
599 }
600
601 pub fn set_spawn_notifier<F: 'static + Send + FnOnce()>(&self, f: F) -> Result<(), F> {
605 let mut state = self.inner.state.lock();
606 if !state.spawned {
607 state.notifier = Some(AffinityNotifier(Box::new(f)));
608 Ok(())
609 } else {
610 Err(f)
611 }
612 }
613}
614
615impl Initiate for ThreadpoolDriver {
616 fn initiator(&self) -> &IoInitiator {
617 self.client(None).initiator()
618 }
619}
620
621impl Spawn for ThreadpoolDriver {
622 fn scheduler(&self, metadata: &TaskMetadata) -> Arc<dyn Schedule> {
623 self.client(Some(metadata)).initiator().scheduler(metadata)
624 }
625}
626
627impl FdReadyDriver for ThreadpoolDriver {
628 type FdReady = FdReady<Self>;
629
630 fn new_fd_ready(&self, fd: RawFd) -> io::Result<Self::FdReady> {
631 Ok(FdReady::new(self.clone(), fd))
632 }
633}
634
635impl WaitDriver for ThreadpoolDriver {
636 type Wait = FdWait<Self>;
637
638 fn new_wait(&self, fd: RawFd, read_size: usize) -> io::Result<Self::Wait> {
639 Ok(FdWait::new(self.clone(), fd, read_size))
640 }
641}
642
643impl TimerDriver for ThreadpoolDriver {
644 type Timer = Timer<Self>;
645
646 fn new_timer(&self) -> Self::Timer {
647 Timer::new(self.clone())
648 }
649}
650
651#[derive(Debug, Clone)]
654pub struct RetargetableDriver {
655 inner: Arc<RetargetableDriverInner>,
656}
657
658#[derive(Debug)]
659struct RetargetableDriverInner {
660 threadpool: AffinitizedThreadpool,
661 target_cpu: AtomicU32,
662}
663
664impl RetargetableDriver {
665 pub fn new(threadpool: AffinitizedThreadpool, target_cpu: u32) -> Self {
667 Self {
668 inner: Arc::new(RetargetableDriverInner {
669 threadpool,
670 target_cpu: target_cpu.into(),
671 }),
672 }
673 }
674
675 pub fn retarget(&self, target_cpu: u32) {
679 self.inner.target_cpu.store(target_cpu, Relaxed);
680 }
681
682 pub fn current_target_cpu(&self) -> u32 {
684 self.inner.target_cpu.load(Relaxed)
685 }
686
687 pub fn current_driver(&self) -> &ThreadpoolDriver {
689 self.inner.current_driver()
690 }
691}
692
693impl Initiate for RetargetableDriver {
694 fn initiator(&self) -> &IoInitiator {
695 self.inner.current_driver().initiator()
696 }
697}
698
699impl Spawn for RetargetableDriver {
700 fn scheduler(&self, _metadata: &TaskMetadata) -> Arc<dyn Schedule> {
701 self.inner.clone()
702 }
703}
704
705impl RetargetableDriverInner {
706 fn current_driver(&self) -> &ThreadpoolDriver {
707 self.threadpool.driver(self.target_cpu.load(Relaxed))
708 }
709}
710
711impl Schedule for RetargetableDriverInner {
712 fn schedule(&self, runnable: Runnable) {
713 self.current_driver()
714 .client(Some(runnable.metadata()))
715 .schedule(runnable)
716 }
717
718 fn name(&self) -> Arc<str> {
719 self.current_driver().inner.name.clone()
720 }
721}
722
723impl FdReadyDriver for RetargetableDriver {
724 type FdReady = FdReady<Self>;
725
726 fn new_fd_ready(&self, fd: RawFd) -> io::Result<Self::FdReady> {
727 Ok(FdReady::new(self.clone(), fd))
728 }
729}
730
731impl WaitDriver for RetargetableDriver {
732 type Wait = FdWait<Self>;
733
734 fn new_wait(&self, fd: RawFd, read_size: usize) -> io::Result<Self::Wait> {
735 Ok(FdWait::new(self.clone(), fd, read_size))
736 }
737}
738
739impl TimerDriver for RetargetableDriver {
740 type Timer = Timer<Self>;
741
742 fn new_timer(&self) -> Self::Timer {
743 Timer::new(self.clone())
744 }
745}