1#![cfg_attr(any(windows, target_os = "linux"), expect(unsafe_code))]
10
11use inspect::Inspect;
12use pal_async::driver::Driver;
13use pal_async::task::Spawn;
14use pal_async::task::TaskMetadata;
15use std::fmt::Debug;
16use std::pin::Pin;
17use std::sync::Arc;
18
19#[derive(Clone)]
27pub struct VmTaskDriverSource {
28 backend: Arc<dyn DynVmBackend>,
29}
30
31impl VmTaskDriverSource {
32 pub fn new(backend: impl 'static + BuildVmTaskDriver) -> Self {
34 Self {
35 backend: Arc::new(backend),
36 }
37 }
38
39 pub fn simple(&self) -> VmTaskDriver {
43 self.builder().build("")
46 }
47
48 pub fn current(&self) -> VmTaskDriver {
54 VmTaskDriver {
55 inner: self.backend.build_current(),
56 }
57 }
58
59 pub fn builder(&self) -> VmTaskDriverBuilder<'_> {
61 VmTaskDriverBuilder {
62 backend: self.backend.as_ref(),
63 run_on_target: false,
64 target_vp: None,
65 }
66 }
67}
68
69pub trait BuildVmTaskDriver: Send + Sync {
71 type Driver: TargetedDriver;
73 type CurrentDriver: TargetedDriver;
75
76 fn build(&self, name: String, target_vp: Option<u32>, run_on_target: bool) -> Self::Driver;
78
79 fn build_current(&self) -> Self::CurrentDriver;
81}
82
83pub trait TargetedDriver: 'static + Send + Sync + Inspect {
85 fn spawner(&self) -> &dyn Spawn;
87 fn driver(&self) -> &dyn Driver;
89 fn retarget_vp(&self, target_vp: u32);
91 fn is_target_vp_ready(&self) -> bool {
96 true
97 }
98 fn wait_target_vp_ready(&self) -> impl Future<Output = ()> + Send {
100 std::future::ready(())
101 }
102}
103
104trait DynTargetedDriver: 'static + Send + Sync + Inspect {
105 fn spawner(&self) -> &dyn Spawn;
106 fn driver(&self) -> &dyn Driver;
107 fn retarget_vp(&self, target_vp: u32);
108 fn is_ready(&self) -> bool;
109 fn wait_ready(&self) -> Pin<Box<dyn '_ + Future<Output = ()> + Send>>;
110}
111
112impl<T: TargetedDriver> DynTargetedDriver for T {
113 fn spawner(&self) -> &dyn Spawn {
114 self.spawner()
115 }
116
117 fn driver(&self) -> &dyn Driver {
118 self.driver()
119 }
120
121 fn retarget_vp(&self, target_vp: u32) {
122 self.retarget_vp(target_vp)
123 }
124
125 fn is_ready(&self) -> bool {
126 self.is_target_vp_ready()
127 }
128
129 fn wait_ready(&self) -> Pin<Box<dyn '_ + Future<Output = ()> + Send>> {
130 Box::pin(self.wait_target_vp_ready())
131 }
132}
133
134trait DynVmBackend: Send + Sync {
135 fn build(
136 &self,
137 name: String,
138 target_vp: Option<u32>,
139 run_on_target: bool,
140 ) -> Arc<dyn DynTargetedDriver>;
141
142 fn build_current(&self) -> Arc<dyn DynTargetedDriver>;
143}
144
145impl<T: BuildVmTaskDriver> DynVmBackend for T {
146 fn build(
147 &self,
148 name: String,
149 target_vp: Option<u32>,
150 run_on_target: bool,
151 ) -> Arc<dyn DynTargetedDriver> {
152 Arc::new(self.build(name, target_vp, run_on_target))
153 }
154
155 fn build_current(&self) -> Arc<dyn DynTargetedDriver> {
156 Arc::new(BuildVmTaskDriver::build_current(self))
157 }
158}
159
160pub struct VmTaskDriverBuilder<'a> {
162 backend: &'a dyn DynVmBackend,
163 run_on_target: bool,
164 target_vp: Option<u32>,
165}
166
167impl VmTaskDriverBuilder<'_> {
168 pub fn run_on_target(&mut self, run_on_target: bool) -> &mut Self {
180 self.run_on_target = run_on_target;
181 self
182 }
183
184 pub fn target_vp(&mut self, target_vp: u32) -> &mut Self {
195 self.target_vp = Some(target_vp);
196 self
197 }
198
199 pub fn build(&self, name: impl Into<String>) -> VmTaskDriver {
204 VmTaskDriver {
205 inner: self
206 .backend
207 .build(name.into(), self.target_vp, self.run_on_target),
208 }
209 }
210}
211
212#[derive(Clone, Inspect)]
216pub struct VmTaskDriver {
217 #[inspect(flatten)]
218 inner: Arc<dyn DynTargetedDriver>,
219}
220
221impl VmTaskDriver {
222 pub fn retarget_vp(&self, target_vp: u32) {
229 self.inner.retarget_vp(target_vp)
230 }
231
232 pub fn is_target_vp_ready(&self) -> bool {
237 self.inner.is_ready()
238 }
239
240 pub async fn wait_target_vp_ready(&self) {
242 self.inner.wait_ready().await
243 }
244}
245
246impl Driver for VmTaskDriver {
247 fn new_dyn_timer(&self) -> pal_async::driver::PollImpl<dyn pal_async::timer::PollTimer> {
248 self.inner.driver().new_dyn_timer()
249 }
250
251 #[cfg(unix)]
252 fn new_dyn_fd_ready(
253 &self,
254 fd: std::os::fd::RawFd,
255 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::fd::PollFdReady>> {
256 self.inner.driver().new_dyn_fd_ready(fd)
257 }
258
259 #[cfg(unix)]
260 fn new_dyn_socket_ready(
261 &self,
262 socket: std::os::fd::RawFd,
263 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::socket::PollSocketReady>> {
264 self.inner.driver().new_dyn_socket_ready(socket)
265 }
266
267 #[cfg(windows)]
268 fn new_dyn_socket_ready(
269 &self,
270 socket: std::os::windows::io::RawSocket,
271 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::socket::PollSocketReady>> {
272 self.inner.driver().new_dyn_socket_ready(socket)
273 }
274
275 #[cfg(unix)]
276 fn new_dyn_wait(
277 &self,
278 fd: std::os::fd::RawFd,
279 read_size: usize,
280 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::wait::PollWait>> {
281 self.inner.driver().new_dyn_wait(fd, read_size)
282 }
283
284 #[cfg(windows)]
285 fn new_dyn_wait(
286 &self,
287 handle: std::os::windows::io::RawHandle,
288 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::wait::PollWait>> {
289 self.inner.driver().new_dyn_wait(handle)
290 }
291
292 #[cfg(windows)]
293 unsafe fn new_dyn_overlapped_file(
294 &self,
295 handle: std::os::windows::io::RawHandle,
296 ) -> std::io::Result<
297 pal_async::driver::PollImpl<dyn pal_async::windows::overlapped::IoOverlapped>,
298 > {
299 unsafe { self.inner.driver().new_dyn_overlapped_file(handle) }
301 }
302
303 #[cfg(target_os = "linux")]
304 fn io_uring_probe(&self, opcode: u8) -> bool {
305 self.inner.driver().io_uring_probe(opcode)
306 }
307
308 #[cfg(target_os = "linux")]
309 unsafe fn io_uring_submit(
310 &self,
311 sqe: pal_async::io_uring::Entry,
312 ) -> Pin<Box<dyn Future<Output = std::io::Result<i32>> + Send + '_>> {
313 unsafe { self.inner.driver().io_uring_submit(sqe) }
315 }
316
317 #[cfg(target_os = "macos")]
318 fn new_dyn_process_wait(
319 &self,
320 pid: i32,
321 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::process::macos::PollProcessWait>>
322 {
323 self.inner.driver().new_dyn_process_wait(pid)
324 }
325}
326
327impl Spawn for VmTaskDriver {
328 fn scheduler(&self, metadata: &TaskMetadata) -> Arc<dyn pal_async::task::Schedule> {
329 self.inner.spawner().scheduler(metadata)
330 }
331}
332
333#[derive(Debug)]
335pub struct SingleDriverBackend<T>(T);
336
337impl<T: Driver + Spawn + Clone> SingleDriverBackend<T> {
338 pub fn new(driver: T) -> Self {
341 Self(driver)
342 }
343}
344
345#[derive(Debug)]
347pub struct SingleDriver<T>(T);
348
349impl<T> Inspect for SingleDriver<T> {
350 fn inspect(&self, req: inspect::Request<'_>) {
351 req.ignore();
352 }
353}
354
355impl<T: Driver + Spawn + Clone> BuildVmTaskDriver for SingleDriverBackend<T> {
356 type Driver = SingleDriver<T>;
357 type CurrentDriver = SingleDriver<T>;
358
359 fn build(&self, _name: String, _target_vp: Option<u32>, _run_on_target: bool) -> Self::Driver {
360 SingleDriver(self.0.clone())
361 }
362
363 fn build_current(&self) -> Self::CurrentDriver {
364 SingleDriver(self.0.clone())
365 }
366}
367
368impl<T: Driver + Spawn> TargetedDriver for SingleDriver<T> {
369 fn spawner(&self) -> &dyn Spawn {
370 &self.0
371 }
372
373 fn driver(&self) -> &dyn Driver {
374 &self.0
375 }
376
377 fn retarget_vp(&self, _target_vp: u32) {}
378}
379
380pub mod thread {
381 use super::BuildVmTaskDriver;
385 use super::TargetedDriver;
386 use inspect::Inspect;
387 use loan_cell::LoanCell;
388 use pal_async::DefaultDriver;
389 use pal_async::DefaultPool;
390 use pal_async::driver::Driver;
391 use pal_async::task::Spawn;
392 use pal_async::task::TaskMetadata;
393 use std::sync::Arc;
394
395 thread_local! {
396 static CURRENT_DRIVER: LoanCell<DefaultDriver> = const { LoanCell::new() };
397 }
398
399 #[derive(Debug)]
406 pub struct ThreadDriverBackend {
407 default_driver: DefaultDriver,
408 }
409
410 impl ThreadDriverBackend {
411 pub fn new(default_driver: DefaultDriver) -> Self {
414 Self { default_driver }
415 }
416 }
417
418 impl BuildVmTaskDriver for ThreadDriverBackend {
419 type Driver = ThreadDriver;
420 type CurrentDriver = CurrentThreadDriver;
421
422 fn build(
423 &self,
424 name: String,
425 target_vp: Option<u32>,
426 _run_on_target: bool,
427 ) -> Self::Driver {
428 if target_vp.is_some() {
430 let pool = DefaultPool::new();
431 let driver = pool.driver();
432 let tls_driver = driver.clone();
433 std::thread::Builder::new()
434 .name(name)
435 .spawn(move || {
436 CURRENT_DRIVER.with(|cell| cell.lend(&tls_driver, || pool.run()));
437 })
438 .unwrap();
439 ThreadDriver {
440 inner: driver,
441 has_dedicated_thread: true,
442 }
443 } else {
444 ThreadDriver {
445 inner: self.default_driver.clone(),
446 has_dedicated_thread: false,
447 }
448 }
449 }
450
451 fn build_current(&self) -> Self::CurrentDriver {
452 CurrentThreadDriver {
453 default: self.default_driver.clone(),
454 }
455 }
456 }
457
458 #[derive(Debug, Inspect)]
460 pub struct ThreadDriver {
461 #[inspect(skip)]
462 inner: DefaultDriver,
463 has_dedicated_thread: bool,
464 }
465
466 impl TargetedDriver for ThreadDriver {
467 fn spawner(&self) -> &dyn Spawn {
468 &self.inner
469 }
470
471 fn driver(&self) -> &dyn Driver {
472 &self.inner
473 }
474
475 fn retarget_vp(&self, _target_vp: u32) {}
476 }
477
478 #[derive(Inspect)]
484 pub struct CurrentThreadDriver {
485 #[inspect(skip)]
486 default: DefaultDriver,
487 }
488
489 impl CurrentThreadDriver {
490 fn with_driver<R>(&self, f: impl FnOnce(&DefaultDriver) -> R) -> R {
491 CURRENT_DRIVER.with(|cell| cell.borrow(|driver| f(driver.unwrap_or(&self.default))))
492 }
493 }
494
495 impl Driver for CurrentThreadDriver {
496 fn new_dyn_timer(&self) -> pal_async::driver::PollImpl<dyn pal_async::timer::PollTimer> {
497 self.with_driver(|d| d.new_dyn_timer())
498 }
499
500 #[cfg(unix)]
501 fn new_dyn_fd_ready(
502 &self,
503 fd: std::os::fd::RawFd,
504 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::fd::PollFdReady>> {
505 self.with_driver(|d| d.new_dyn_fd_ready(fd))
506 }
507
508 #[cfg(unix)]
509 fn new_dyn_socket_ready(
510 &self,
511 socket: std::os::fd::RawFd,
512 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::socket::PollSocketReady>>
513 {
514 self.with_driver(|d| d.new_dyn_socket_ready(socket))
515 }
516
517 #[cfg(windows)]
518 fn new_dyn_socket_ready(
519 &self,
520 socket: std::os::windows::io::RawSocket,
521 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::socket::PollSocketReady>>
522 {
523 self.with_driver(|d| d.new_dyn_socket_ready(socket))
524 }
525
526 #[cfg(unix)]
527 fn new_dyn_wait(
528 &self,
529 fd: std::os::fd::RawFd,
530 read_size: usize,
531 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::wait::PollWait>> {
532 self.with_driver(|d| d.new_dyn_wait(fd, read_size))
533 }
534
535 #[cfg(windows)]
536 fn new_dyn_wait(
537 &self,
538 handle: std::os::windows::io::RawHandle,
539 ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::wait::PollWait>> {
540 self.with_driver(|d| d.new_dyn_wait(handle))
541 }
542
543 #[cfg(windows)]
544 unsafe fn new_dyn_overlapped_file(
545 &self,
546 handle: std::os::windows::io::RawHandle,
547 ) -> std::io::Result<
548 pal_async::driver::PollImpl<dyn pal_async::windows::overlapped::IoOverlapped>,
549 > {
550 self.with_driver(|d| {
551 unsafe { d.new_dyn_overlapped_file(handle) }
553 })
554 }
555
556 #[cfg(target_os = "linux")]
557 fn io_uring_probe(&self, opcode: u8) -> bool {
558 self.with_driver(|d| d.io_uring_probe(opcode))
559 }
560
561 #[cfg(target_os = "linux")]
562 unsafe fn io_uring_submit(
563 &self,
564 sqe: pal_async::io_uring::Entry,
565 ) -> std::pin::Pin<Box<dyn Future<Output = std::io::Result<i32>> + Send + '_>> {
566 use pal_async::io_uring::{IoUringDriver, IoUringSubmit};
567 Box::pin(async move {
568 let driver = CURRENT_DRIVER.with(|cell| cell.borrow(|driver| driver.cloned()));
574 let driver = driver.as_ref().unwrap_or(&self.default);
575 unsafe {
577 driver
578 .io_uring_submitter()
579 .ok_or(std::io::ErrorKind::Unsupported)?
580 .submit(sqe)
581 .await
582 }
583 })
584 }
585
586 #[cfg(target_os = "macos")]
587 fn new_dyn_process_wait(
588 &self,
589 pid: i32,
590 ) -> std::io::Result<
591 pal_async::driver::PollImpl<dyn pal_async::process::macos::PollProcessWait>,
592 > {
593 self.with_driver(|d| d.new_dyn_process_wait(pid))
594 }
595 }
596
597 impl Spawn for CurrentThreadDriver {
598 fn scheduler(&self, metadata: &TaskMetadata) -> Arc<dyn pal_async::task::Schedule> {
599 self.with_driver(|d| d.scheduler(metadata))
600 }
601 }
602
603 impl TargetedDriver for CurrentThreadDriver {
604 fn spawner(&self) -> &dyn Spawn {
605 self
606 }
607
608 fn driver(&self) -> &dyn Driver {
609 self
610 }
611
612 fn retarget_vp(&self, _target_vp: u32) {}
613 }
614}