chipset_device_worker/
worker.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! A worker for running ChipsetDevice implementations in a separate process.
5//!
6//! This worker provides process isolation for any device implementing the
7//! ChipsetDevice trait. It handles serialization and deserialization of
8//! device operations across process boundaries.
9
10#![forbid(unsafe_code)]
11
12mod configure;
13
14use crate::RemoteDynamicResolvers;
15use crate::guestmem::GuestMemoryRemoteBuilder;
16use crate::protocol::*;
17use anyhow::Context;
18use chipset_device::ChipsetDevice;
19use chipset_device::io::IoResult;
20use chipset_device::io::deferred::DeferredToken;
21use chipset_device_resources::ErasedChipsetDevice;
22use chipset_device_resources::ResolveChipsetDeviceHandleParams;
23use mesh::MeshPayload;
24use mesh::error::RemoteError;
25use mesh_worker::Worker;
26use mesh_worker::WorkerId;
27use mesh_worker::WorkerRpc;
28use pal_async::DefaultPool;
29use std::task::Poll;
30use vm_resource::Resource;
31use vm_resource::ResourceResolver;
32use vm_resource::kind::ChipsetDeviceHandleKind;
33use vmcore::device_state::ChangeDeviceState;
34use vmcore::save_restore::ProtobufSaveRestore;
35
36/// Worker ID for ChipsetDevice workers.
37pub(crate) const fn remote_chipset_device_worker_id<T: RemoteDynamicResolvers>()
38-> WorkerId<RemoteChipsetDeviceWorkerParameters<T>> {
39    WorkerId::new(T::WORKER_ID_STR)
40}
41
42/// Parameters for launching a remote chipset device worker.
43#[derive(MeshPayload)]
44pub struct RemoteChipsetDeviceWorkerParameters<T> {
45    pub(crate) device: Resource<ChipsetDeviceHandleKind>,
46    pub(crate) dyn_resolvers: T,
47    pub(crate) inputs: RemoteChipsetDeviceHandleParams,
48
49    pub(crate) req_recv: mesh::Receiver<DeviceRequest>,
50    pub(crate) resp_send: mesh::Sender<DeviceResponse>,
51    pub(crate) cap_send: mesh::OneshotSender<DeviceInit>,
52}
53
54#[derive(MeshPayload)]
55pub(crate) struct RemoteChipsetDeviceHandleParams {
56    pub device_name: String,
57    pub is_restoring: bool,
58    pub vmtime: vmcore::vmtime::VmTimeSourceBuilder,
59    pub guest_memory: GuestMemoryRemoteBuilder,
60    pub encrypted_guest_memory: GuestMemoryRemoteBuilder,
61}
62
63/// The chipset device worker.
64///
65/// This worker wraps any device implementing ChipsetDevice and handles
66/// device operations sent via mesh channels.
67pub struct RemoteChipsetDeviceWorker<T> {
68    device: ErasedChipsetDevice,
69    pool: Option<DefaultPool>,
70    req_recv: mesh::Receiver<DeviceRequest>,
71    resp_send: mesh::Sender<DeviceResponse>,
72    deferred_reads: Vec<DeferredRead>,
73    deferred_writes: Vec<DeferredWrite>,
74
75    _phantom_resolvers: std::marker::PhantomData<T>,
76}
77
78struct DeferredRead {
79    id: usize,
80    token: DeferredToken,
81    size: usize,
82}
83
84struct DeferredWrite {
85    id: usize,
86    token: DeferredToken,
87}
88
89impl<T: RemoteDynamicResolvers> Worker for RemoteChipsetDeviceWorker<T> {
90    type Parameters = RemoteChipsetDeviceWorkerParameters<T>;
91    type State = ();
92    const ID: WorkerId<Self::Parameters> = remote_chipset_device_worker_id();
93
94    fn new(params: Self::Parameters) -> anyhow::Result<Self> {
95        let mut pool = DefaultPool::new();
96
97        let RemoteChipsetDeviceWorkerParameters {
98            device,
99            dyn_resolvers,
100            inputs,
101
102            req_recv,
103            resp_send,
104            cap_send,
105        } = params;
106
107        let mut resolver = ResourceResolver::new();
108
109        let driver = pool.driver();
110        let mut device = pool
111            .run_until(async move {
112                dyn_resolvers
113                    .register_remote_dynamic_resolvers(&mut resolver)
114                    .await?;
115                resolver
116                    .resolve(
117                        device,
118                        ResolveChipsetDeviceHandleParams {
119                            device_name: &inputs.device_name,
120                            guest_memory: &inputs.guest_memory.build("remote_gm"),
121                            encrypted_guest_memory: &inputs
122                                .encrypted_guest_memory
123                                .build("remote_enc_gm"),
124                            vmtime: &inputs
125                                .vmtime
126                                .build(&driver)
127                                .await
128                                .context("failed to build vmtime source")?,
129                            is_restoring: inputs.is_restoring,
130                            task_driver_source: &vmcore::vm_task::VmTaskDriverSource::new(
131                                vmcore::vm_task::thread::ThreadDriverBackend::new(driver),
132                            ),
133                            // TODO: Actually wire these up
134                            configure: &mut configure::RemoteConfigureChipsetDevice {},
135                            register_mmio: &mut configure::RemoteRegisterMmio {},
136                            register_pio: &mut configure::RemoteRegisterPio {},
137                        },
138                    )
139                    .await
140                    .context("failed to resolve device")
141            })?
142            .0;
143
144        if device.supports_acknowledge_pic_interrupt().is_some()
145            || device.supports_handle_eoi().is_some()
146            || device.supports_line_interrupt_target().is_some()
147            || device.supports_tdisp().is_some()
148        {
149            anyhow::bail!("remote device requires unimplemented functionality");
150        }
151
152        cap_send.send(DeviceInit {
153            mmio: device.supports_mmio().map(|m| MmioInit {
154                static_regions: m
155                    .get_static_regions()
156                    .iter()
157                    .map(|(name, range)| ((*name).into(), *range.start(), *range.end()))
158                    .collect(),
159            }),
160            pio: device.supports_pio().map(|p| PioInit {
161                static_regions: p
162                    .get_static_regions()
163                    .iter()
164                    .map(|(name, range)| ((*name).into(), *range.start(), *range.end()))
165                    .collect(),
166            }),
167            pci: device.supports_pci().map(|p| PciInit {
168                suggested_bdf: p.suggested_bdf(),
169            }),
170        });
171
172        Ok(Self {
173            device,
174            pool: Some(pool),
175            req_recv,
176            resp_send,
177            deferred_reads: Vec::new(),
178            deferred_writes: Vec::new(),
179            _phantom_resolvers: std::marker::PhantomData,
180        })
181    }
182
183    fn restart(_state: Self::State) -> anyhow::Result<Self> {
184        todo!()
185    }
186
187    fn run(mut self, mut rpc_recv: mesh::Receiver<WorkerRpc<Self::State>>) -> anyhow::Result<()> {
188        self.pool.take().unwrap().run_until(async move {
189            loop {
190                enum WorkerEvent {
191                    Rpc(WorkerRpc<()>),
192                    DeviceRequest(DeviceRequest),
193                }
194
195                let event = std::future::poll_fn(|cx| {
196                    if let Some(poll_device) = self.device.supports_poll_device() {
197                        poll_device.poll_device(cx);
198                    }
199
200                    self.deferred_reads
201                        .extract_if(.., |read| {
202                            let mut data = vec![0; read.size];
203                            match read.token.poll_read(cx, &mut data) {
204                                Poll::Ready(r) => {
205                                    self.resp_send.send(DeviceResponse::Read {
206                                        id: read.id,
207                                        result: r.map(|_| data),
208                                    });
209                                    true
210                                }
211                                Poll::Pending => false,
212                            }
213                        })
214                        .for_each(|_| ());
215
216                    self.deferred_writes
217                        .extract_if(.., |write| match write.token.poll_write(cx) {
218                            Poll::Ready(r) => {
219                                self.resp_send.send(DeviceResponse::Write {
220                                    id: write.id,
221                                    result: r,
222                                });
223                                true
224                            }
225                            Poll::Pending => false,
226                        })
227                        .for_each(|_| ());
228
229                    // If either of these channels fail, we fail the worker too.
230                    if let Poll::Ready(r) = rpc_recv.poll_recv(cx) {
231                        return Poll::Ready(r.map(WorkerEvent::Rpc));
232                    }
233                    if let Poll::Ready(r) = self.req_recv.poll_recv(cx) {
234                        return Poll::Ready(r.map(WorkerEvent::DeviceRequest));
235                    }
236                    Poll::Pending
237                })
238                .await?;
239
240                match event {
241                    WorkerEvent::Rpc(rpc) => match rpc {
242                        WorkerRpc::Inspect(deferred) => {
243                            deferred.inspect(&mut self.device);
244                        }
245                        WorkerRpc::Stop => {
246                            return Ok(());
247                        }
248                        WorkerRpc::Restart(rpc) => {
249                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!("not supported"))));
250                        }
251                    },
252                    WorkerEvent::DeviceRequest(req) => match req {
253                        DeviceRequest::Start => self.device.start(),
254                        DeviceRequest::Stop(rpc) => {
255                            rpc.handle(async |()| self.device.stop().await).await
256                        }
257                        DeviceRequest::Reset(rpc) => {
258                            self.deferred_reads.clear();
259                            self.deferred_writes.clear();
260                            rpc.handle(async |()| self.device.reset().await).await
261                        }
262                        DeviceRequest::MmioRead(ReadRequest { id, address, size }) => {
263                            let mut data = vec![0; size];
264                            let result = self
265                                .device
266                                .supports_mmio()
267                                .unwrap()
268                                .mmio_read(address, &mut data);
269                            self.handle_read_result(id, result, data);
270                        }
271                        DeviceRequest::MmioWrite(WriteRequest { id, address, data }) => {
272                            let result = self
273                                .device
274                                .supports_mmio()
275                                .unwrap()
276                                .mmio_write(address, &data);
277                            self.handle_write_result(id, result);
278                        }
279                        DeviceRequest::PioRead(ReadRequest { id, address, size }) => {
280                            let mut data = vec![0; size];
281                            let result = self
282                                .device
283                                .supports_pio()
284                                .unwrap()
285                                .io_read(address, &mut data);
286                            self.handle_read_result(id, result, data);
287                        }
288                        DeviceRequest::PioWrite(WriteRequest { id, address, data }) => {
289                            let result =
290                                self.device.supports_pio().unwrap().io_write(address, &data);
291                            self.handle_write_result(id, result);
292                        }
293                        DeviceRequest::PciConfigRead(ReadRequest { id, address, size }) => {
294                            assert_eq!(size, 4);
295                            let mut data = 0;
296                            let result = self
297                                .device
298                                .supports_pci()
299                                .unwrap()
300                                .pci_cfg_read(address, &mut data);
301                            self.handle_read_result(id, result, data.to_ne_bytes().to_vec());
302                        }
303                        DeviceRequest::PciConfigWrite(WriteRequest { id, address, data }) => {
304                            let result = self
305                                .device
306                                .supports_pci()
307                                .unwrap()
308                                .pci_cfg_write(address, data);
309                            self.handle_write_result(id, result);
310                        }
311                        DeviceRequest::Save(rpc) => {
312                            rpc.handle_failable_sync(|()| self.device.save())
313                        }
314                        DeviceRequest::Restore(rpc) => {
315                            rpc.handle_failable_sync(|state| self.device.restore(state))
316                        }
317                    },
318                }
319            }
320        })
321    }
322}
323
324impl<T> RemoteChipsetDeviceWorker<T> {
325    fn handle_read_result(&mut self, id: usize, result: IoResult, data: Vec<u8>) {
326        match result {
327            IoResult::Ok => self.resp_send.send(DeviceResponse::Read {
328                id,
329                result: Ok(data),
330            }),
331            IoResult::Err(io_error) => self.resp_send.send(DeviceResponse::Read {
332                id,
333                result: Err(io_error),
334            }),
335            IoResult::Defer(token) => self.deferred_reads.push(DeferredRead {
336                id,
337                token,
338                size: data.len(),
339            }),
340        }
341    }
342
343    fn handle_write_result(&mut self, id: usize, result: IoResult) {
344        match result {
345            IoResult::Ok => self
346                .resp_send
347                .send(DeviceResponse::Write { id, result: Ok(()) }),
348            IoResult::Err(io_error) => self.resp_send.send(DeviceResponse::Write {
349                id,
350                result: Err(io_error),
351            }),
352            IoResult::Defer(token) => self.deferred_writes.push(DeferredWrite { id, token }),
353        }
354    }
355}