Skip to main content

vmcore/
vm_task.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Infrastructure for spawning tasks and issuing async IO related to VM
5//! activity.
6
7// UNSAFETY: Needed to implement the unsafe new_dyn_overlapped_file method on
8// Windows and to implement the unsafe io_uring_submit method on Linux.
9#![cfg_attr(any(windows, target_os = "linux"), expect(unsafe_code))]
10
11use inspect::Inspect;
12use pal_async::driver::Driver;
13use pal_async::task::Spawn;
14use pal_async::task::TaskMetadata;
15use std::fmt::Debug;
16use std::pin::Pin;
17use std::sync::Arc;
18
19/// A source for [`VmTaskDriver`]s.
20///
21/// This is used to create device-specific drivers that implement [`Driver`] and
22/// [`Spawn`]. These drivers' behavior can be customized based on the needs of
23/// the device.
24///
25/// The backend for these drivers is customizable for different environments.
26#[derive(Clone)]
27pub struct VmTaskDriverSource {
28    backend: Arc<dyn DynVmBackend>,
29}
30
31impl VmTaskDriverSource {
32    /// Returns a new task driver source backed by `backend`.
33    pub fn new(backend: impl 'static + BuildVmTaskDriver) -> Self {
34        Self {
35            backend: Arc::new(backend),
36        }
37    }
38
39    /// Returns a VM task driver with default parameters.
40    ///
41    /// Use this when you don't care where your task runs.
42    pub fn simple(&self) -> VmTaskDriver {
43        // Don't provide a name, since backends won't do anything with it for
44        // default settings.
45        self.builder().build("")
46    }
47
48    /// Returns a driver that dispatches to the current thread's executor.
49    ///
50    /// Use this when a shared resource (e.g., a disk) should use whatever
51    /// executor its caller is running on, rather than a fixed executor
52    /// captured at construction time.
53    pub fn current(&self) -> VmTaskDriver {
54        VmTaskDriver {
55            inner: self.backend.build_current(),
56        }
57    }
58
59    /// Returns a builder for a custom VM task driver.
60    pub fn builder(&self) -> VmTaskDriverBuilder<'_> {
61        VmTaskDriverBuilder {
62            backend: self.backend.as_ref(),
63            run_on_target: false,
64            target_vp: None,
65        }
66    }
67}
68
69/// Trait implemented by backends for [`VmTaskDriverSource`].
70pub trait BuildVmTaskDriver: Send + Sync {
71    /// The associated driver type.
72    type Driver: TargetedDriver;
73    /// The driver type for `build_current`.
74    type CurrentDriver: TargetedDriver;
75
76    /// Builds a new driver that can drive IO and spawn tasks.
77    fn build(&self, name: String, target_vp: Option<u32>, run_on_target: bool) -> Self::Driver;
78
79    /// Builds a driver that dispatches to the current thread's executor.
80    fn build_current(&self) -> Self::CurrentDriver;
81}
82
83/// Trait implemented by drivers built with [`BuildVmTaskDriver`].
84pub trait TargetedDriver: 'static + Send + Sync + Inspect {
85    /// Returns the implementation to use for spawning tasks.
86    fn spawner(&self) -> &dyn Spawn;
87    /// Returns the implementation to use for driving IO.
88    fn driver(&self) -> &dyn Driver;
89    /// Retargets the driver to the specified virtual processor.
90    fn retarget_vp(&self, target_vp: u32);
91    /// Returns whether a driver's target VP is ready for tasks and IO.
92    ///
93    /// A driver must be operable even if this is false, but the tasks and IO
94    /// may run on a different target VP.
95    fn is_target_vp_ready(&self) -> bool {
96        true
97    }
98    /// Waits for this driver's target VP to be ready for tasks and IO.
99    fn wait_target_vp_ready(&self) -> impl Future<Output = ()> + Send {
100        std::future::ready(())
101    }
102}
103
104trait DynTargetedDriver: 'static + Send + Sync + Inspect {
105    fn spawner(&self) -> &dyn Spawn;
106    fn driver(&self) -> &dyn Driver;
107    fn retarget_vp(&self, target_vp: u32);
108    fn is_ready(&self) -> bool;
109    fn wait_ready(&self) -> Pin<Box<dyn '_ + Future<Output = ()> + Send>>;
110}
111
112impl<T: TargetedDriver> DynTargetedDriver for T {
113    fn spawner(&self) -> &dyn Spawn {
114        self.spawner()
115    }
116
117    fn driver(&self) -> &dyn Driver {
118        self.driver()
119    }
120
121    fn retarget_vp(&self, target_vp: u32) {
122        self.retarget_vp(target_vp)
123    }
124
125    fn is_ready(&self) -> bool {
126        self.is_target_vp_ready()
127    }
128
129    fn wait_ready(&self) -> Pin<Box<dyn '_ + Future<Output = ()> + Send>> {
130        Box::pin(self.wait_target_vp_ready())
131    }
132}
133
134trait DynVmBackend: Send + Sync {
135    fn build(
136        &self,
137        name: String,
138        target_vp: Option<u32>,
139        run_on_target: bool,
140    ) -> Arc<dyn DynTargetedDriver>;
141
142    fn build_current(&self) -> Arc<dyn DynTargetedDriver>;
143}
144
145impl<T: BuildVmTaskDriver> DynVmBackend for T {
146    fn build(
147        &self,
148        name: String,
149        target_vp: Option<u32>,
150        run_on_target: bool,
151    ) -> Arc<dyn DynTargetedDriver> {
152        Arc::new(self.build(name, target_vp, run_on_target))
153    }
154
155    fn build_current(&self) -> Arc<dyn DynTargetedDriver> {
156        Arc::new(BuildVmTaskDriver::build_current(self))
157    }
158}
159
160/// A builder returned by [`VmTaskDriverSource::builder`].
161pub struct VmTaskDriverBuilder<'a> {
162    backend: &'a dyn DynVmBackend,
163    run_on_target: bool,
164    target_vp: Option<u32>,
165}
166
167impl VmTaskDriverBuilder<'_> {
168    /// A hint to the backend specifying whether spawned tasks should always
169    /// run on the thread handling the target VP.
170    ///
171    /// If `false` (the default), then when spawned tasks are awoken, they may
172    /// run on any executor (such as the current one). If `true`, the backend
173    /// will endeavor to run them on the same thread that would drive async IO.
174    ///
175    /// Some devices will want to override the default to reduce jitter or
176    /// ensure that IO is issued from the correct processor. For example,
177    /// StorVSP sets this to true for its channel workers, so that all of a
178    /// channel's IO and tasks ideally run on the same VP's thread.
179    pub fn run_on_target(&mut self, run_on_target: bool) -> &mut Self {
180        self.run_on_target = run_on_target;
181        self
182    }
183
184    /// A hint to the backend specifying the guest VP associated with spawned
185    /// tasks and IO.
186    ///
187    /// Backends can use this to ensure that spawned tasks and async IO will run
188    /// near or on the target VP. For example, StorVSP sets this to the VP
189    /// from the VMBus channel open request's `target_vp` field, so that
190    /// each channel's worker runs on the VP the guest specified.
191    ///
192    /// If not set, the backend uses its default scheduling (typically the
193    /// calling thread or a shared pool).
194    pub fn target_vp(&mut self, target_vp: u32) -> &mut Self {
195        self.target_vp = Some(target_vp);
196        self
197    }
198
199    /// Builds a VM task driver.
200    ///
201    /// `name` is used by some backends to identify a spawned thread. It is
202    /// ignored by other backends.
203    pub fn build(&self, name: impl Into<String>) -> VmTaskDriver {
204        VmTaskDriver {
205            inner: self
206                .backend
207                .build(name.into(), self.target_vp, self.run_on_target),
208        }
209    }
210}
211
212/// A driver returned by [`VmTaskDriverSource`].
213///
214/// This can be used to spawn tasks (via [`Spawn`]) and issue async IO (via [`Driver`]).
215#[derive(Clone, Inspect)]
216pub struct VmTaskDriver {
217    #[inspect(flatten)]
218    inner: Arc<dyn DynTargetedDriver>,
219}
220
221impl VmTaskDriver {
222    /// Updates the target VP for the task.
223    ///
224    /// The effectiveness of this call, and when it would take effect,
225    /// depends on the backend. For example, in the OpenHCL threadpool backend,
226    /// this will cause new IO to target the new VP's thread, but
227    /// existing IO will continue to run on the original VP's thread.
228    pub fn retarget_vp(&self, target_vp: u32) {
229        self.inner.retarget_vp(target_vp)
230    }
231
232    /// Returns whether the target VP is ready for tasks and IO.
233    ///
234    /// A driver must be operable even if this is false, but the tasks and IO
235    /// may run on a different target VP.
236    pub fn is_target_vp_ready(&self) -> bool {
237        self.inner.is_ready()
238    }
239
240    /// Waits for the target VP to be ready for tasks and IO.
241    pub async fn wait_target_vp_ready(&self) {
242        self.inner.wait_ready().await
243    }
244}
245
246impl Driver for VmTaskDriver {
247    fn new_dyn_timer(&self) -> pal_async::driver::PollImpl<dyn pal_async::timer::PollTimer> {
248        self.inner.driver().new_dyn_timer()
249    }
250
251    #[cfg(unix)]
252    fn new_dyn_fd_ready(
253        &self,
254        fd: std::os::fd::RawFd,
255    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::fd::PollFdReady>> {
256        self.inner.driver().new_dyn_fd_ready(fd)
257    }
258
259    #[cfg(unix)]
260    fn new_dyn_socket_ready(
261        &self,
262        socket: std::os::fd::RawFd,
263    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::socket::PollSocketReady>> {
264        self.inner.driver().new_dyn_socket_ready(socket)
265    }
266
267    #[cfg(windows)]
268    fn new_dyn_socket_ready(
269        &self,
270        socket: std::os::windows::io::RawSocket,
271    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::socket::PollSocketReady>> {
272        self.inner.driver().new_dyn_socket_ready(socket)
273    }
274
275    #[cfg(unix)]
276    fn new_dyn_wait(
277        &self,
278        fd: std::os::fd::RawFd,
279        read_size: usize,
280    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::wait::PollWait>> {
281        self.inner.driver().new_dyn_wait(fd, read_size)
282    }
283
284    #[cfg(windows)]
285    fn new_dyn_wait(
286        &self,
287        handle: std::os::windows::io::RawHandle,
288    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::wait::PollWait>> {
289        self.inner.driver().new_dyn_wait(handle)
290    }
291
292    #[cfg(windows)]
293    unsafe fn new_dyn_overlapped_file(
294        &self,
295        handle: std::os::windows::io::RawHandle,
296    ) -> std::io::Result<
297        pal_async::driver::PollImpl<dyn pal_async::windows::overlapped::IoOverlapped>,
298    > {
299        // SAFETY: passthru from caller
300        unsafe { self.inner.driver().new_dyn_overlapped_file(handle) }
301    }
302
303    #[cfg(target_os = "linux")]
304    fn io_uring_probe(&self, opcode: u8) -> bool {
305        self.inner.driver().io_uring_probe(opcode)
306    }
307
308    #[cfg(target_os = "linux")]
309    unsafe fn io_uring_submit(
310        &self,
311        sqe: pal_async::io_uring::Entry,
312    ) -> Pin<Box<dyn Future<Output = std::io::Result<i32>> + Send + '_>> {
313        // SAFETY: passthru from caller
314        unsafe { self.inner.driver().io_uring_submit(sqe) }
315    }
316
317    #[cfg(target_os = "macos")]
318    fn new_dyn_process_wait(
319        &self,
320        pid: i32,
321    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::process::macos::PollProcessWait>>
322    {
323        self.inner.driver().new_dyn_process_wait(pid)
324    }
325}
326
327impl Spawn for VmTaskDriver {
328    fn scheduler(&self, metadata: &TaskMetadata) -> Arc<dyn pal_async::task::Schedule> {
329        self.inner.spawner().scheduler(metadata)
330    }
331}
332
333/// A backend that spawns all tasks and IO on a single driver.
334#[derive(Debug)]
335pub struct SingleDriverBackend<T>(T);
336
337impl<T: Driver + Spawn + Clone> SingleDriverBackend<T> {
338    /// Returns a new driver backend that spawns all tasks and IO on `driver`,
339    /// regardless of policy.
340    pub fn new(driver: T) -> Self {
341        Self(driver)
342    }
343}
344
345/// The driver for [`SingleDriverBackend`].
346#[derive(Debug)]
347pub struct SingleDriver<T>(T);
348
349impl<T> Inspect for SingleDriver<T> {
350    fn inspect(&self, req: inspect::Request<'_>) {
351        req.ignore();
352    }
353}
354
355impl<T: Driver + Spawn + Clone> BuildVmTaskDriver for SingleDriverBackend<T> {
356    type Driver = SingleDriver<T>;
357    type CurrentDriver = SingleDriver<T>;
358
359    fn build(&self, _name: String, _target_vp: Option<u32>, _run_on_target: bool) -> Self::Driver {
360        SingleDriver(self.0.clone())
361    }
362
363    fn build_current(&self) -> Self::CurrentDriver {
364        SingleDriver(self.0.clone())
365    }
366}
367
368impl<T: Driver + Spawn> TargetedDriver for SingleDriver<T> {
369    fn spawner(&self) -> &dyn Spawn {
370        &self.0
371    }
372
373    fn driver(&self) -> &dyn Driver {
374        &self.0
375    }
376
377    fn retarget_vp(&self, _target_vp: u32) {}
378}
379
380pub mod thread {
381    //! Provides a thread-based task VM task driver backend
382    //! [`ThreadDriverBackend`].
383
384    use super::BuildVmTaskDriver;
385    use super::TargetedDriver;
386    use inspect::Inspect;
387    use loan_cell::LoanCell;
388    use pal_async::DefaultDriver;
389    use pal_async::DefaultPool;
390    use pal_async::driver::Driver;
391    use pal_async::task::Spawn;
392    use pal_async::task::TaskMetadata;
393    use std::sync::Arc;
394
395    thread_local! {
396        static CURRENT_DRIVER: LoanCell<DefaultDriver> = const { LoanCell::new() };
397    }
398
399    /// A backend for [`VmTaskDriverSource`](super::VmTaskDriverSource) based on
400    /// individual threads.
401    ///
402    /// If no target VP is specified, this backend will spawn tasks and IO a
403    /// default single-threaded IO driver. If a target VP is specified, the
404    /// backend will spawn a separate thread and spawn tasks and IOs there.
405    #[derive(Debug)]
406    pub struct ThreadDriverBackend {
407        default_driver: DefaultDriver,
408    }
409
410    impl ThreadDriverBackend {
411        /// Returns a new backend, using `default_driver` to back task drivers
412        /// that did not specify a target VP.
413        pub fn new(default_driver: DefaultDriver) -> Self {
414            Self { default_driver }
415        }
416    }
417
418    impl BuildVmTaskDriver for ThreadDriverBackend {
419        type Driver = ThreadDriver;
420        type CurrentDriver = CurrentThreadDriver;
421
422        fn build(
423            &self,
424            name: String,
425            target_vp: Option<u32>,
426            _run_on_target: bool,
427        ) -> Self::Driver {
428            // Build a standalone thread for this device if a target VP was specified.
429            if target_vp.is_some() {
430                let pool = DefaultPool::new();
431                let driver = pool.driver();
432                let tls_driver = driver.clone();
433                std::thread::Builder::new()
434                    .name(name)
435                    .spawn(move || {
436                        CURRENT_DRIVER.with(|cell| cell.lend(&tls_driver, || pool.run()));
437                    })
438                    .unwrap();
439                ThreadDriver {
440                    inner: driver,
441                    has_dedicated_thread: true,
442                }
443            } else {
444                ThreadDriver {
445                    inner: self.default_driver.clone(),
446                    has_dedicated_thread: false,
447                }
448            }
449        }
450
451        fn build_current(&self) -> Self::CurrentDriver {
452            CurrentThreadDriver {
453                default: self.default_driver.clone(),
454            }
455        }
456    }
457
458    /// A driver targeting a fixed executor thread.
459    #[derive(Debug, Inspect)]
460    pub struct ThreadDriver {
461        #[inspect(skip)]
462        inner: DefaultDriver,
463        has_dedicated_thread: bool,
464    }
465
466    impl TargetedDriver for ThreadDriver {
467        fn spawner(&self) -> &dyn Spawn {
468            &self.inner
469        }
470
471        fn driver(&self) -> &dyn Driver {
472            &self.inner
473        }
474
475        fn retarget_vp(&self, _target_vp: u32) {}
476    }
477
478    /// A driver that dispatches to the current thread's registered executor.
479    ///
480    /// If the current thread has no registered executor (i.e., it is not a
481    /// thread spawned by [`ThreadDriverBackend`]), falls back to a default
482    /// driver.
483    #[derive(Inspect)]
484    pub struct CurrentThreadDriver {
485        #[inspect(skip)]
486        default: DefaultDriver,
487    }
488
489    impl CurrentThreadDriver {
490        fn with_driver<R>(&self, f: impl FnOnce(&DefaultDriver) -> R) -> R {
491            CURRENT_DRIVER.with(|cell| cell.borrow(|driver| f(driver.unwrap_or(&self.default))))
492        }
493    }
494
495    impl Driver for CurrentThreadDriver {
496        fn new_dyn_timer(&self) -> pal_async::driver::PollImpl<dyn pal_async::timer::PollTimer> {
497            self.with_driver(|d| d.new_dyn_timer())
498        }
499
500        #[cfg(unix)]
501        fn new_dyn_fd_ready(
502            &self,
503            fd: std::os::fd::RawFd,
504        ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::fd::PollFdReady>> {
505            self.with_driver(|d| d.new_dyn_fd_ready(fd))
506        }
507
508        #[cfg(unix)]
509        fn new_dyn_socket_ready(
510            &self,
511            socket: std::os::fd::RawFd,
512        ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::socket::PollSocketReady>>
513        {
514            self.with_driver(|d| d.new_dyn_socket_ready(socket))
515        }
516
517        #[cfg(windows)]
518        fn new_dyn_socket_ready(
519            &self,
520            socket: std::os::windows::io::RawSocket,
521        ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::socket::PollSocketReady>>
522        {
523            self.with_driver(|d| d.new_dyn_socket_ready(socket))
524        }
525
526        #[cfg(unix)]
527        fn new_dyn_wait(
528            &self,
529            fd: std::os::fd::RawFd,
530            read_size: usize,
531        ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::wait::PollWait>> {
532            self.with_driver(|d| d.new_dyn_wait(fd, read_size))
533        }
534
535        #[cfg(windows)]
536        fn new_dyn_wait(
537            &self,
538            handle: std::os::windows::io::RawHandle,
539        ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::wait::PollWait>> {
540            self.with_driver(|d| d.new_dyn_wait(handle))
541        }
542
543        #[cfg(windows)]
544        unsafe fn new_dyn_overlapped_file(
545            &self,
546            handle: std::os::windows::io::RawHandle,
547        ) -> std::io::Result<
548            pal_async::driver::PollImpl<dyn pal_async::windows::overlapped::IoOverlapped>,
549        > {
550            self.with_driver(|d| {
551                // SAFETY: passthru from caller
552                unsafe { d.new_dyn_overlapped_file(handle) }
553            })
554        }
555
556        #[cfg(target_os = "linux")]
557        fn io_uring_probe(&self, opcode: u8) -> bool {
558            self.with_driver(|d| d.io_uring_probe(opcode))
559        }
560
561        #[cfg(target_os = "linux")]
562        unsafe fn io_uring_submit(
563            &self,
564            sqe: pal_async::io_uring::Entry,
565        ) -> std::pin::Pin<Box<dyn Future<Output = std::io::Result<i32>> + Send + '_>> {
566            use pal_async::io_uring::{IoUringDriver, IoUringSubmit};
567            Box::pin(async move {
568                // We have to clone the driver, since the current driver may
569                // change across the lifetime of the async block. This is not
570                // very expensive, though--in practice this is an Arc refcount
571                // increment, and the driver is per-thread so there is no cache
572                // contention.
573                let driver = CURRENT_DRIVER.with(|cell| cell.borrow(|driver| driver.cloned()));
574                let driver = driver.as_ref().unwrap_or(&self.default);
575                // SAFETY: passthru from caller
576                unsafe {
577                    driver
578                        .io_uring_submitter()
579                        .ok_or(std::io::ErrorKind::Unsupported)?
580                        .submit(sqe)
581                        .await
582                }
583            })
584        }
585
586        #[cfg(target_os = "macos")]
587        fn new_dyn_process_wait(
588            &self,
589            pid: i32,
590        ) -> std::io::Result<
591            pal_async::driver::PollImpl<dyn pal_async::process::macos::PollProcessWait>,
592        > {
593            self.with_driver(|d| d.new_dyn_process_wait(pid))
594        }
595    }
596
597    impl Spawn for CurrentThreadDriver {
598        fn scheduler(&self, metadata: &TaskMetadata) -> Arc<dyn pal_async::task::Schedule> {
599            self.with_driver(|d| d.scheduler(metadata))
600        }
601    }
602
603    impl TargetedDriver for CurrentThreadDriver {
604        fn spawner(&self) -> &dyn Spawn {
605            self
606        }
607
608        fn driver(&self) -> &dyn Driver {
609            self
610        }
611
612        fn retarget_vp(&self, _target_vp: u32) {}
613    }
614}