vmcore/
vm_task.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Infrastructure for spawning tasks and issuing async IO related to VM
5//! activity.
6
7// UNSAFETY: Needed to implement the unsafe new_dyn_overlapped_file method.
8#![cfg_attr(windows, expect(unsafe_code))]
9
10use inspect::Inspect;
11use pal_async::driver::Driver;
12use pal_async::task::Spawn;
13use pal_async::task::TaskMetadata;
14use std::fmt::Debug;
15use std::pin::Pin;
16use std::sync::Arc;
17
18/// A source for [`VmTaskDriver`]s.
19///
20/// This is used to create device-specific drivers that implement [`Driver`] and
21/// [`Spawn`]. These drivers' behavior can be customized based on the needs of
22/// the device.
23///
24/// The backend for these drivers is customizable for different environments.
25#[derive(Clone)]
26pub struct VmTaskDriverSource {
27    backend: Arc<dyn DynVmBackend>,
28}
29
30impl VmTaskDriverSource {
31    /// Returns a new task driver source backed by `backend`.
32    pub fn new(backend: impl 'static + BuildVmTaskDriver) -> Self {
33        Self {
34            backend: Arc::new(backend),
35        }
36    }
37
38    /// Returns a VM task driver with default parameters.
39    ///
40    /// Use this when you don't care where your task runs.
41    pub fn simple(&self) -> VmTaskDriver {
42        // Don't provide a name, since backends won't do anything with it for
43        // default settings.
44        self.builder().build("")
45    }
46
47    /// Returns a builder for a custom VM task driver.
48    pub fn builder(&self) -> VmTaskDriverBuilder<'_> {
49        VmTaskDriverBuilder {
50            backend: self.backend.as_ref(),
51            run_on_target: false,
52            target_vp: None,
53        }
54    }
55}
56
57/// Trait implemented by backends for [`VmTaskDriverSource`].
58pub trait BuildVmTaskDriver: Send + Sync {
59    /// The associated driver type.
60    type Driver: TargetedDriver;
61
62    /// Builds a new driver that can drive IO and spawn tasks.
63    fn build(&self, name: String, target_vp: Option<u32>, run_on_target: bool) -> Self::Driver;
64}
65
66/// Trait implemented by drivers built with [`BuildVmTaskDriver`].
67pub trait TargetedDriver: 'static + Send + Sync + Inspect {
68    /// Returns the implementation to use for spawning tasks.
69    fn spawner(&self) -> &dyn Spawn;
70    /// Returns the implementation to use for driving IO.
71    fn driver(&self) -> &dyn Driver;
72    /// Retargets the driver to the specified virtual processor.
73    fn retarget_vp(&self, target_vp: u32);
74    /// Returns whether a driver's target VP is ready for tasks and IO.
75    ///
76    /// A driver must be operable even if this is false, but the tasks and IO
77    /// may run on a different target VP.
78    fn is_target_vp_ready(&self) -> bool {
79        true
80    }
81    /// Waits for this driver's target VP to be ready for tasks and IO.
82    fn wait_target_vp_ready(&self) -> impl Future<Output = ()> + Send {
83        std::future::ready(())
84    }
85}
86
87trait DynTargetedDriver: 'static + Send + Sync + Inspect {
88    fn spawner(&self) -> &dyn Spawn;
89    fn driver(&self) -> &dyn Driver;
90    fn retarget_vp(&self, target_vp: u32);
91    fn is_ready(&self) -> bool;
92    fn wait_ready(&self) -> Pin<Box<dyn '_ + Future<Output = ()> + Send>>;
93}
94
95impl<T: TargetedDriver> DynTargetedDriver for T {
96    fn spawner(&self) -> &dyn Spawn {
97        self.spawner()
98    }
99
100    fn driver(&self) -> &dyn Driver {
101        self.driver()
102    }
103
104    fn retarget_vp(&self, target_vp: u32) {
105        self.retarget_vp(target_vp)
106    }
107
108    fn is_ready(&self) -> bool {
109        self.is_target_vp_ready()
110    }
111
112    fn wait_ready(&self) -> Pin<Box<dyn '_ + Future<Output = ()> + Send>> {
113        Box::pin(self.wait_target_vp_ready())
114    }
115}
116
117trait DynVmBackend: Send + Sync {
118    fn build(
119        &self,
120        name: String,
121        target_vp: Option<u32>,
122        run_on_target: bool,
123    ) -> Arc<dyn DynTargetedDriver>;
124}
125
126impl<T: BuildVmTaskDriver> DynVmBackend for T {
127    fn build(
128        &self,
129        name: String,
130        target_vp: Option<u32>,
131        run_on_target: bool,
132    ) -> Arc<dyn DynTargetedDriver> {
133        Arc::new(self.build(name, target_vp, run_on_target))
134    }
135}
136
137/// A builder returned by [`VmTaskDriverSource::builder`].
138pub struct VmTaskDriverBuilder<'a> {
139    backend: &'a dyn DynVmBackend,
140    run_on_target: bool,
141    target_vp: Option<u32>,
142}
143
144impl VmTaskDriverBuilder<'_> {
145    /// A hint to the backend specifying whether spawned tasks should always
146    /// run on the thread handling the target VP.
147    ///
148    /// If `false` (the default), then when spawned tasks are awoken, they may
149    /// run on any executor (such as the current one). If `true`, the backend
150    /// will endeavor to run them on the same thread that would drive async IO.
151    ///
152    /// Some devices will want to override the default to reduce jitter or
153    /// ensure that IO is issued from the correct processor. For example,
154    /// StorVSP sets this to true for its channel workers, so that all of a
155    /// channel's IO and tasks ideally run on the same VP's thread.
156    pub fn run_on_target(&mut self, run_on_target: bool) -> &mut Self {
157        self.run_on_target = run_on_target;
158        self
159    }
160
161    /// A hint to the backend specifying the guest VP associated with spawned
162    /// tasks and IO.
163    ///
164    /// Backends can use this to ensure that spawned tasks and async IO will run
165    /// near or on the target VP. For example, StorVSP sets this to the VP
166    /// from the VMBus channel open request's `target_vp` field, so that
167    /// each channel's worker runs on the VP the guest specified.
168    ///
169    /// If not set, the backend uses its default scheduling (typically the
170    /// calling thread or a shared pool).
171    pub fn target_vp(&mut self, target_vp: u32) -> &mut Self {
172        self.target_vp = Some(target_vp);
173        self
174    }
175
176    /// Builds a VM task driver.
177    ///
178    /// `name` is used by some backends to identify a spawned thread. It is
179    /// ignored by other backends.
180    pub fn build(&self, name: impl Into<String>) -> VmTaskDriver {
181        VmTaskDriver {
182            inner: self
183                .backend
184                .build(name.into(), self.target_vp, self.run_on_target),
185        }
186    }
187}
188
189/// A driver returned by [`VmTaskDriverSource`].
190///
191/// This can be used to spawn tasks (via [`Spawn`]) and issue async IO (via [`Driver`]).
192#[derive(Clone, Inspect)]
193pub struct VmTaskDriver {
194    #[inspect(flatten)]
195    inner: Arc<dyn DynTargetedDriver>,
196}
197
198impl VmTaskDriver {
199    /// Updates the target VP for the task.
200    ///
201    /// The effectiveness of this call, and when it would take effect,
202    /// depends on the backend. For example, in the OpenHCL threadpool backend,
203    /// this will cause new IO to target the new VP's thread, but
204    /// existing IO will continue to run on the original VP's thread.
205    pub fn retarget_vp(&self, target_vp: u32) {
206        self.inner.retarget_vp(target_vp)
207    }
208
209    /// Returns whether the target VP is ready for tasks and IO.
210    ///
211    /// A driver must be operable even if this is false, but the tasks and IO
212    /// may run on a different target VP.
213    pub fn is_target_vp_ready(&self) -> bool {
214        self.inner.is_ready()
215    }
216
217    /// Waits for the target VP to be ready for tasks and IO.
218    pub async fn wait_target_vp_ready(&self) {
219        self.inner.wait_ready().await
220    }
221}
222
223impl Driver for VmTaskDriver {
224    fn new_dyn_timer(&self) -> pal_async::driver::PollImpl<dyn pal_async::timer::PollTimer> {
225        self.inner.driver().new_dyn_timer()
226    }
227
228    #[cfg(unix)]
229    fn new_dyn_fd_ready(
230        &self,
231        fd: std::os::fd::RawFd,
232    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::fd::PollFdReady>> {
233        self.inner.driver().new_dyn_fd_ready(fd)
234    }
235
236    #[cfg(unix)]
237    fn new_dyn_socket_ready(
238        &self,
239        socket: std::os::fd::RawFd,
240    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::socket::PollSocketReady>> {
241        self.inner.driver().new_dyn_socket_ready(socket)
242    }
243
244    #[cfg(windows)]
245    fn new_dyn_socket_ready(
246        &self,
247        socket: std::os::windows::io::RawSocket,
248    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::socket::PollSocketReady>> {
249        self.inner.driver().new_dyn_socket_ready(socket)
250    }
251
252    #[cfg(unix)]
253    fn new_dyn_wait(
254        &self,
255        fd: std::os::fd::RawFd,
256        read_size: usize,
257    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::wait::PollWait>> {
258        self.inner.driver().new_dyn_wait(fd, read_size)
259    }
260
261    #[cfg(windows)]
262    fn new_dyn_wait(
263        &self,
264        handle: std::os::windows::io::RawHandle,
265    ) -> std::io::Result<pal_async::driver::PollImpl<dyn pal_async::wait::PollWait>> {
266        self.inner.driver().new_dyn_wait(handle)
267    }
268
269    #[cfg(windows)]
270    unsafe fn new_dyn_overlapped_file(
271        &self,
272        handle: std::os::windows::io::RawHandle,
273    ) -> std::io::Result<
274        pal_async::driver::PollImpl<dyn pal_async::windows::overlapped::IoOverlapped>,
275    > {
276        // SAFETY: passthru from caller
277        unsafe { self.inner.driver().new_dyn_overlapped_file(handle) }
278    }
279}
280
281impl Spawn for VmTaskDriver {
282    fn scheduler(&self, metadata: &TaskMetadata) -> Arc<dyn pal_async::task::Schedule> {
283        self.inner.spawner().scheduler(metadata)
284    }
285}
286
287/// A backend that spawns all tasks and IO on a single driver.
288#[derive(Debug)]
289pub struct SingleDriverBackend<T>(T);
290
291impl<T: Driver + Spawn + Clone> SingleDriverBackend<T> {
292    /// Returns a new driver backend that spawns all tasks and IO on `driver`,
293    /// regardless of policy.
294    pub fn new(driver: T) -> Self {
295        Self(driver)
296    }
297}
298
299/// The driver for [`SingleDriverBackend`].
300#[derive(Debug)]
301pub struct SingleDriver<T>(T);
302
303impl<T> Inspect for SingleDriver<T> {
304    fn inspect(&self, req: inspect::Request<'_>) {
305        req.ignore();
306    }
307}
308
309impl<T: Driver + Spawn + Clone> BuildVmTaskDriver for SingleDriverBackend<T> {
310    type Driver = SingleDriver<T>;
311
312    fn build(&self, _name: String, _target_vp: Option<u32>, _run_on_target: bool) -> Self::Driver {
313        SingleDriver(self.0.clone())
314    }
315}
316
317impl<T: Driver + Spawn> TargetedDriver for SingleDriver<T> {
318    fn spawner(&self) -> &dyn Spawn {
319        &self.0
320    }
321
322    fn driver(&self) -> &dyn Driver {
323        &self.0
324    }
325
326    fn retarget_vp(&self, _target_vp: u32) {}
327}
328
329pub mod thread {
330    //! Provides a thread-based task VM task driver backend
331    //! [`ThreadDriverBackend`].
332
333    use super::BuildVmTaskDriver;
334    use super::TargetedDriver;
335    use inspect::Inspect;
336    use pal_async::DefaultDriver;
337    use pal_async::DefaultPool;
338    use pal_async::driver::Driver;
339    use pal_async::task::Spawn;
340
341    /// A backend for [`VmTaskDriverSource`](super::VmTaskDriverSource) based on
342    /// individual threads.
343    ///
344    /// If no target VP is specified, this backend will spawn tasks and IO a
345    /// default single-threaded IO driver. If a target VP is specified, the
346    /// backend will spawn a separate thread and spawn tasks and IOs there.
347    #[derive(Debug)]
348    pub struct ThreadDriverBackend {
349        default_driver: DefaultDriver,
350    }
351
352    impl ThreadDriverBackend {
353        /// Returns a new backend, using `default_driver` to back task drivers
354        /// that did not specify a target VP.
355        pub fn new(default_driver: DefaultDriver) -> Self {
356            Self { default_driver }
357        }
358    }
359
360    impl BuildVmTaskDriver for ThreadDriverBackend {
361        type Driver = ThreadDriver;
362
363        fn build(
364            &self,
365            name: String,
366            target_vp: Option<u32>,
367            _run_on_target: bool,
368        ) -> Self::Driver {
369            // Build a standalone thread for this device if a target VP was specified.
370            if target_vp.is_some() {
371                let (_, driver) = DefaultPool::spawn_on_thread(name);
372                ThreadDriver {
373                    inner: driver,
374                    has_dedicated_thread: true,
375                }
376            } else {
377                ThreadDriver {
378                    inner: self.default_driver.clone(),
379                    has_dedicated_thread: false,
380                }
381            }
382        }
383    }
384
385    /// The driver for [`ThreadDriverBackend`].
386    #[derive(Debug, Inspect)]
387    pub struct ThreadDriver {
388        #[inspect(skip)]
389        inner: DefaultDriver,
390        has_dedicated_thread: bool,
391    }
392
393    impl TargetedDriver for ThreadDriver {
394        fn spawner(&self) -> &dyn Spawn {
395            &self.inner
396        }
397
398        fn driver(&self) -> &dyn Driver {
399            &self.inner
400        }
401
402        fn retarget_vp(&self, _target_vp: u32) {}
403    }
404}