petri/vm/openvmm/
runtime.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Methods to interact with a running [`PetriVmOpenVmm`].
5
6use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriVmFramebufferAccess;
10use crate::PetriVmInspector;
11use crate::PetriVmRuntime;
12use crate::ShutdownKind;
13use crate::VmScreenshotMeta;
14use crate::openhcl_diag::OpenHclDiagHandler;
15use crate::worker::Worker;
16use anyhow::Context;
17use async_trait::async_trait;
18use framebuffer::View;
19use futures::FutureExt;
20use futures_concurrency::future::Race;
21use get_resources::ged::FirmwareEvent;
22use hyperv_ic_resources::shutdown::ShutdownRpc;
23use mesh::CancelContext;
24use mesh::Receiver;
25use mesh::RecvError;
26use mesh::rpc::RpcError;
27use mesh::rpc::RpcSend;
28use mesh_process::Mesh;
29use openvmm_defs::rpc::PulseSaveRestoreError;
30use pal_async::socket::PolledSocket;
31use petri_artifacts_core::ResolvedArtifact;
32use pipette_client::PipetteClient;
33use std::future::Future;
34use std::path::Path;
35use std::sync::Arc;
36use std::time::Duration;
37use vmm_core_defs::HaltReason;
38use vtl2_settings_proto::Vtl2Settings;
39
40/// A running VM that tests can interact with.
41// DEVNOTE: Really the PetriVmInner is the actual VM and channels that we interact
42// with. This struct exists as a wrapper to provide error handling, such as not
43// hanging indefinitely when waiting on certain channels if the VM crashes.
44pub struct PetriVmOpenVmm {
45    inner: PetriVmInner,
46    halt: PetriVmHaltReceiver,
47}
48
49#[async_trait]
50impl PetriVmRuntime for PetriVmOpenVmm {
51    type VmInspector = OpenVmmInspector;
52    type VmFramebufferAccess = OpenVmmFramebufferAccess;
53
54    async fn teardown(self) -> anyhow::Result<()> {
55        tracing::info!("waiting for worker");
56        let worker = Arc::into_inner(self.inner.worker)
57            .context("all references to the OpenVMM worker have not been closed")?;
58        worker.shutdown().await?;
59
60        tracing::info!("Worker quit, waiting for mesh");
61        self.inner.mesh.shutdown().await;
62
63        tracing::info!("Mesh shutdown, waiting for logging tasks");
64        for t in self.inner.resources.log_stream_tasks {
65            t.await?;
66        }
67
68        Ok(())
69    }
70
71    async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReason> {
72        let halt_reason = if let Some(already) = self.halt.already_received.take() {
73            already.map_err(anyhow::Error::from)
74        } else {
75            self.halt
76                .halt_notif
77                .recv()
78                .await
79                .context("Failed to get halt reason")
80        }?;
81
82        tracing::info!(?halt_reason, "Got halt reason");
83
84        let halt_reason = match halt_reason {
85            HaltReason::PowerOff => PetriHaltReason::PowerOff,
86            HaltReason::Reset => PetriHaltReason::Reset,
87            HaltReason::Hibernate => PetriHaltReason::Hibernate,
88            HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
89            _ => PetriHaltReason::Other,
90        };
91
92        if allow_reset && halt_reason == PetriHaltReason::Reset {
93            self.reset().await?
94        }
95
96        Ok(halt_reason)
97    }
98
99    async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
100        Self::wait_for_agent(self, set_high_vtl).await
101    }
102
103    fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
104        self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
105            OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
106                self.inner.resources.driver.clone(),
107                path,
108            ))
109        })
110    }
111
112    async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
113        Self::wait_for_boot_event(self).await
114    }
115
116    async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
117        Self::wait_for_enlightened_shutdown_ready(self)
118            .await
119            .map(|_| ())
120    }
121
122    async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
123        Self::send_enlightened_shutdown(self, kind).await
124    }
125
126    async fn restart_openhcl(
127        &mut self,
128        new_openhcl: &ResolvedArtifact,
129        flags: OpenHclServicingFlags,
130    ) -> anyhow::Result<()> {
131        Self::save_openhcl(self, new_openhcl, flags).await?;
132        Self::restore_openhcl(self).await
133    }
134
135    async fn save_openhcl(
136        &mut self,
137        new_openhcl: &ResolvedArtifact,
138        flags: OpenHclServicingFlags,
139    ) -> anyhow::Result<()> {
140        Self::save_openhcl(self, new_openhcl, flags).await
141    }
142
143    async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
144        Self::restore_openhcl(self).await
145    }
146
147    async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
148        Self::update_command_line(self, command_line).await
149    }
150
151    fn inspector(&self) -> Option<OpenVmmInspector> {
152        Some(OpenVmmInspector {
153            worker: self.inner.worker.clone(),
154        })
155    }
156
157    fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
158        self.inner
159            .framebuffer_view
160            .take()
161            .map(|view| OpenVmmFramebufferAccess { view })
162    }
163
164    async fn reset(&mut self) -> anyhow::Result<()> {
165        Self::reset(self).await
166    }
167
168    async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()> {
169        Self::set_vtl2_settings(self, settings).await
170    }
171
172    async fn set_vmbus_drive(
173        &mut self,
174        _disk: &crate::Drive,
175        _controller_id: &guid::Guid,
176        _controller_location: u32,
177    ) -> anyhow::Result<()> {
178        todo!("openvmm set vmbus drive")
179    }
180}
181
182pub(super) struct PetriVmInner {
183    pub(super) resources: PetriVmResourcesOpenVmm,
184    pub(super) mesh: Mesh,
185    pub(super) worker: Arc<Worker>,
186    pub(super) framebuffer_view: Option<View>,
187}
188
189struct PetriVmHaltReceiver {
190    halt_notif: Receiver<HaltReason>,
191    already_received: Option<Result<HaltReason, RecvError>>,
192}
193
194// Wrap a PetriVmInner function in [`PetriVmOpenVmm::wait_for_halt_or_internal`] to
195// provide better error handling.
196macro_rules! petri_vm_fn {
197    ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
198        $(#[$($attrss)*])*
199        $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
200            Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
201        }
202    };
203}
204
205// TODO: Add all runtime functions that are not backend specific
206// to the `PetriVmRuntime` trait
207impl PetriVmOpenVmm {
208    pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
209        Self {
210            inner,
211            halt: PetriVmHaltReceiver {
212                halt_notif,
213                already_received: None,
214            },
215        }
216    }
217
218    /// Get the path to the VTL 2 vsock socket, if the VM is configured with OpenHCL.
219    pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
220        self.inner
221            .resources
222            .vtl2_vsock_path
223            .as_deref()
224            .context("VM is not configured with OpenHCL")
225    }
226
227    petri_vm_fn!(
228        /// Waits for an event emitted by the firmware about its boot status, and
229        /// returns that status.
230        pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
231    );
232    petri_vm_fn!(
233        /// Waits for the Hyper-V shutdown IC to be ready, returning a receiver
234        /// that will be closed when it is no longer ready.
235        pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<mesh::OneshotReceiver<()>>
236    );
237    petri_vm_fn!(
238        /// Instruct the guest to shutdown via the Hyper-V shutdown IC.
239        pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
240    );
241    petri_vm_fn!(
242        /// Waits for the KVP IC to be ready, returning a sender that can be used
243        /// to send requests to it.
244        pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
245    );
246    petri_vm_fn!(
247        /// Stages the new OpenHCL file and saves the existing state.
248        pub async fn save_openhcl(
249            &mut self,
250            new_openhcl: &ResolvedArtifact,
251            flags: OpenHclServicingFlags
252        ) -> anyhow::Result<()>
253    );
254    petri_vm_fn!(
255        /// Restores OpenHCL from a previously saved state.
256        pub async fn restore_openhcl(
257            &mut self
258        ) -> anyhow::Result<()>
259    );
260    petri_vm_fn!(
261        /// Updates the command line parameters of the running VM.
262        pub async fn update_command_line(
263            &mut self,
264            command_line: &str
265        ) -> anyhow::Result<()>
266    );
267    petri_vm_fn!(
268        /// Resets the hardware state of the VM, simulating a power cycle.
269        pub async fn reset(&mut self) -> anyhow::Result<()>
270    );
271    petri_vm_fn!(
272        /// Wait for a connection from a pipette agent
273        pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
274    );
275    petri_vm_fn!(
276        /// Set the OpenHCL VTL2 settings.
277        pub async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()>
278    );
279
280    petri_vm_fn!(pub(crate) async fn resume(&mut self) -> anyhow::Result<()>);
281    petri_vm_fn!(pub(crate) async fn verify_save_restore(&mut self) -> anyhow::Result<()>);
282    petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
283
284    /// Wrap the provided future in a race with the worker process's halt
285    /// notification channel. This is useful for preventing a future from
286    /// waiting indefinitely if the VM dies for any reason. If the worker
287    /// process crashes the halt notification channel will return an error, and
288    /// if the VM halts for any other reason the future will complete with that
289    /// reason.
290    pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
291        &mut self,
292        future: F,
293    ) -> anyhow::Result<T> {
294        Self::wait_for_halt_or_internal(&mut self.halt, future).await
295    }
296
297    async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
298        halt: &mut PetriVmHaltReceiver,
299        future: F,
300    ) -> anyhow::Result<T> {
301        let future = &mut std::pin::pin!(future);
302        enum Either<T> {
303            Future(anyhow::Result<T>),
304            Halt(Result<HaltReason, RecvError>),
305        }
306        let res = (
307            future.map(Either::Future),
308            halt.halt_notif.recv().map(Either::Halt),
309        )
310            .race()
311            .await;
312
313        match res {
314            Either::Future(Ok(success)) => Ok(success),
315            Either::Future(Err(e)) => {
316                tracing::warn!(
317                    ?e,
318                    "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
319                );
320                let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
321                c.cancelled().await;
322                Err(e)
323            }
324            Either::Halt(halt_result) => {
325                tracing::warn!(
326                    halt_result = format_args!("{:x?}", halt_result),
327                    "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
328                );
329                let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
330                let try_again = c.until_cancelled(future).await;
331
332                match try_again {
333                    Ok(fut_result) => {
334                        halt.already_received = Some(halt_result);
335                        if let Err(e) = &fut_result {
336                            tracing::warn!(
337                                ?e,
338                                "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
339                            );
340                            let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
341                            c.cancelled().await;
342                        }
343                        fut_result
344                    }
345                    Err(_cancel) => match halt_result {
346                        Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
347                        Err(e) => Err(e).context("VM disappeared"),
348                    },
349                }
350            }
351        }
352    }
353}
354
355impl PetriVmInner {
356    async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
357        self.resources
358            .firmware_event_recv
359            .recv()
360            .await
361            .context("Failed to get firmware boot event")
362    }
363
364    async fn wait_for_enlightened_shutdown_ready(
365        &mut self,
366    ) -> anyhow::Result<mesh::OneshotReceiver<()>> {
367        let recv = self
368            .resources
369            .shutdown_ic_send
370            .call(ShutdownRpc::WaitReady, ())
371            .await?;
372
373        Ok(recv)
374    }
375
376    async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
377        let shutdown_result = self
378            .resources
379            .shutdown_ic_send
380            .call(
381                ShutdownRpc::Shutdown,
382                hyperv_ic_resources::shutdown::ShutdownParams {
383                    shutdown_type: match kind {
384                        ShutdownKind::Shutdown => {
385                            hyperv_ic_resources::shutdown::ShutdownType::PowerOff
386                        }
387                        ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
388                    },
389                    force: false,
390                },
391            )
392            .await?;
393
394        tracing::info!(?shutdown_result, "Shutdown sent");
395        anyhow::ensure!(
396            shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
397            "Got non-Ok shutdown response"
398        );
399
400        Ok(())
401    }
402
403    async fn wait_for_kvp(
404        &mut self,
405    ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
406        tracing::info!("Waiting for KVP IC");
407        let (send, _) = self
408            .resources
409            .kvp_ic_send
410            .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
411            .await
412            .context("failed to connect to KVP IC")?;
413
414        Ok(send)
415    }
416
417    async fn save_openhcl(
418        &self,
419        new_openhcl: &ResolvedArtifact,
420        flags: OpenHclServicingFlags,
421    ) -> anyhow::Result<()> {
422        let ged_send = self
423            .resources
424            .ged_send
425            .as_ref()
426            .context("openhcl not configured")?;
427
428        let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
429        self.worker
430            .save_openhcl(ged_send, flags, igvm_file.into())
431            .await
432    }
433
434    async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
435        self.worker.update_command_line(command_line).await
436    }
437
438    async fn restore_openhcl(&self) -> anyhow::Result<()> {
439        let ged_send = self
440            .resources
441            .ged_send
442            .as_ref()
443            .context("openhcl not configured")?;
444
445        self.worker.restore_openhcl(ged_send).await
446    }
447
448    async fn set_vtl2_settings(&self, settings: &Vtl2Settings) -> anyhow::Result<()> {
449        let ged_send = self
450            .resources
451            .ged_send
452            .as_ref()
453            .context("openhcl not configured")?;
454
455        ged_send
456            .call_failable(
457                get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
458                prost::Message::encode_to_vec(settings),
459            )
460            .await?;
461
462        Ok(())
463    }
464
465    async fn reset(&mut self) -> anyhow::Result<()> {
466        tracing::info!("Resetting VM");
467        self.worker.reset().await?;
468        // On linux direct pipette won't auto start, start it over serial
469        if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
470            agent.reset();
471
472            if self.resources.properties.using_vtl0_pipette {
473                self.launch_linux_direct_pipette().await?;
474            }
475        }
476        Ok(())
477    }
478
479    async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
480        let listener = if set_high_vtl {
481            self.resources
482                .vtl2_pipette_listener
483                .as_mut()
484                .context("VM is not configured with VTL 2")?
485        } else {
486            &mut self.resources.pipette_listener
487        };
488
489        tracing::info!(set_high_vtl, "listening for pipette connection");
490        let (conn, _) = listener
491            .accept()
492            .await
493            .context("failed to accept pipette connection")?;
494        tracing::info!(set_high_vtl, "handshaking with pipette");
495        let client = PipetteClient::new(
496            &self.resources.driver,
497            PolledSocket::new(&self.resources.driver, conn)?,
498            &self.resources.output_dir,
499        )
500        .await
501        .context("failed to connect to pipette");
502        tracing::info!(set_high_vtl, "completed pipette handshake");
503        client
504    }
505
506    async fn resume(&self) -> anyhow::Result<()> {
507        self.worker.resume().await?;
508        Ok(())
509    }
510
511    async fn verify_save_restore(&self) -> anyhow::Result<()> {
512        for i in 0..2 {
513            let result = self.worker.pulse_save_restore().await;
514            match result {
515                Ok(()) => {}
516                Err(RpcError::Channel(err)) => return Err(err.into()),
517                Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
518                    tracing::warn!("Reset not supported, could not test save + restore.");
519                    break;
520                }
521                Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
522                    return Err(anyhow::Error::from(err))
523                        .context(format!("Save + restore {i} failed."));
524                }
525            }
526        }
527
528        Ok(())
529    }
530
531    async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
532        // Start pipette through serial on linux direct.
533        self.resources
534            .linux_direct_serial_agent
535            .as_mut()
536            .unwrap()
537            .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
538            .await?;
539        Ok(())
540    }
541}
542
543/// Interface for inspecting OpenVMM
544pub struct OpenVmmInspector {
545    worker: Arc<Worker>,
546}
547
548#[async_trait]
549impl PetriVmInspector for OpenVmmInspector {
550    async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
551        Ok(self.worker.inspect_all().await)
552    }
553}
554
555/// Interface to the OpenVMM framebuffer
556pub struct OpenVmmFramebufferAccess {
557    view: View,
558}
559
560#[async_trait]
561impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
562    async fn screenshot(
563        &mut self,
564        image: &mut Vec<u8>,
565    ) -> anyhow::Result<Option<VmScreenshotMeta>> {
566        // Our framebuffer uses 4 bytes per pixel, approximating an
567        // BGRA image, however it only actually contains BGR data.
568        // The fourth byte is effectively noise. We can set the 'alpha'
569        // value to 0xFF to make the image opaque.
570        const BYTES_PER_PIXEL: usize = 4;
571        let (width, height) = self.view.resolution();
572        let (widthsize, heightsize) = (width as usize, height as usize);
573        let len = widthsize * heightsize * BYTES_PER_PIXEL;
574
575        image.resize(len, 0);
576        for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
577            self.view.read_line(i, line);
578            for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
579                pixel.swap(0, 2);
580                pixel[3] = 0xFF;
581            }
582        }
583
584        Ok(Some(VmScreenshotMeta {
585            color: image::ExtendedColorType::Rgba8,
586            width,
587            height,
588        }))
589    }
590}