petri/vm/openvmm/
runtime.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Methods to interact with a running [`PetriVmOpenVmm`].
5
6use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriVmFramebufferAccess;
10use crate::PetriVmInspector;
11use crate::PetriVmRuntime;
12use crate::ShutdownKind;
13use crate::VmScreenshotMeta;
14use crate::openhcl_diag::OpenHclDiagHandler;
15use crate::worker::Worker;
16use anyhow::Context;
17use async_trait::async_trait;
18use framebuffer::View;
19use futures::FutureExt;
20use futures_concurrency::future::Race;
21use get_resources::ged::FirmwareEvent;
22use hvlite_defs::rpc::PulseSaveRestoreError;
23use hyperv_ic_resources::shutdown::ShutdownRpc;
24use mesh::CancelContext;
25use mesh::Receiver;
26use mesh::RecvError;
27use mesh::rpc::RpcError;
28use mesh::rpc::RpcSend;
29use mesh_process::Mesh;
30use pal_async::DefaultDriver;
31use pal_async::socket::PolledSocket;
32use petri_artifacts_core::ResolvedArtifact;
33use pipette_client::PipetteClient;
34use std::future::Future;
35use std::path::Path;
36use std::sync::Arc;
37use std::time::Duration;
38use unix_socket::UnixListener;
39use vmm_core_defs::HaltReason;
40use vtl2_settings_proto::Vtl2Settings;
41
42/// A running VM that tests can interact with.
43// DEVNOTE: Really the PetriVmInner is the actual VM and channels that we interact
44// with. This struct exists as a wrapper to provide error handling, such as not
45// hanging indefinitely when waiting on certain channels if the VM crashes.
46pub struct PetriVmOpenVmm {
47    inner: PetriVmInner,
48    halt: PetriVmHaltReceiver,
49}
50
51#[async_trait]
52impl PetriVmRuntime for PetriVmOpenVmm {
53    type VmInspector = OpenVmmInspector;
54    type VmFramebufferAccess = OpenVmmFramebufferAccess;
55
56    async fn teardown(self) -> anyhow::Result<()> {
57        tracing::info!("waiting for worker");
58        let worker = Arc::into_inner(self.inner.worker)
59            .context("all references to the OpenVMM worker have not been closed")?;
60        worker.shutdown().await?;
61
62        tracing::info!("Worker quit, waiting for mesh");
63        self.inner.mesh.shutdown().await;
64
65        tracing::info!("Mesh shutdown, waiting for logging tasks");
66        for t in self.inner.resources.log_stream_tasks {
67            t.await?;
68        }
69
70        Ok(())
71    }
72
73    async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReason> {
74        let halt_reason = if let Some(already) = self.halt.already_received.take() {
75            already.map_err(anyhow::Error::from)
76        } else {
77            self.halt
78                .halt_notif
79                .recv()
80                .await
81                .context("Failed to get halt reason")
82        }?;
83
84        tracing::info!(?halt_reason, "Got halt reason");
85
86        let halt_reason = match halt_reason {
87            HaltReason::PowerOff => PetriHaltReason::PowerOff,
88            HaltReason::Reset => PetriHaltReason::Reset,
89            HaltReason::Hibernate => PetriHaltReason::Hibernate,
90            HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
91            _ => PetriHaltReason::Other,
92        };
93
94        if allow_reset && halt_reason == PetriHaltReason::Reset {
95            self.reset().await?
96        }
97
98        Ok(halt_reason)
99    }
100
101    async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
102        Self::wait_for_agent(self, set_high_vtl).await
103    }
104
105    fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
106        self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
107            OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
108                self.inner.resources.driver.clone(),
109                path,
110            ))
111        })
112    }
113
114    async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
115        Self::wait_for_boot_event(self).await
116    }
117
118    async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
119        Self::wait_for_enlightened_shutdown_ready(self)
120            .await
121            .map(|_| ())
122    }
123
124    async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
125        Self::send_enlightened_shutdown(self, kind).await
126    }
127
128    async fn restart_openhcl(
129        &mut self,
130        new_openhcl: &ResolvedArtifact,
131        flags: OpenHclServicingFlags,
132    ) -> anyhow::Result<()> {
133        Self::save_openhcl(self, new_openhcl, flags).await?;
134        Self::restore_openhcl(self).await
135    }
136
137    async fn save_openhcl(
138        &mut self,
139        new_openhcl: &ResolvedArtifact,
140        flags: OpenHclServicingFlags,
141    ) -> anyhow::Result<()> {
142        Self::save_openhcl(self, new_openhcl, flags).await
143    }
144
145    async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
146        Self::restore_openhcl(self).await
147    }
148
149    fn inspector(&self) -> Option<OpenVmmInspector> {
150        Some(OpenVmmInspector {
151            worker: self.inner.worker.clone(),
152        })
153    }
154
155    fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
156        self.inner
157            .framebuffer_view
158            .take()
159            .map(|view| OpenVmmFramebufferAccess { view })
160    }
161
162    async fn reset(&mut self) -> anyhow::Result<()> {
163        Self::reset(self).await
164    }
165}
166
167pub(super) struct PetriVmInner {
168    pub(super) resources: PetriVmResourcesOpenVmm,
169    pub(super) mesh: Mesh,
170    pub(super) worker: Arc<Worker>,
171    pub(super) framebuffer_view: Option<View>,
172}
173
174struct PetriVmHaltReceiver {
175    halt_notif: Receiver<HaltReason>,
176    already_received: Option<Result<HaltReason, RecvError>>,
177}
178
179// Wrap a PetriVmInner function in [`PetriVmOpenVmm::wait_for_halt_or_internal`] to
180// provide better error handling.
181macro_rules! petri_vm_fn {
182    ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
183        $(#[$($attrss)*])*
184        $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
185            Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
186        }
187    };
188}
189
190// TODO: Add all runtime functions that are not backend specific
191// to the `PetriVmRuntime` trait
192impl PetriVmOpenVmm {
193    pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
194        Self {
195            inner,
196            halt: PetriVmHaltReceiver {
197                halt_notif,
198                already_received: None,
199            },
200        }
201    }
202
203    /// Get the path to the VTL 2 vsock socket, if the VM is configured with OpenHCL.
204    pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
205        self.inner
206            .resources
207            .vtl2_vsock_path
208            .as_deref()
209            .context("VM is not configured with OpenHCL")
210    }
211
212    petri_vm_fn!(
213        /// Waits for an event emitted by the firmware about its boot status, and
214        /// returns that status.
215        pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
216    );
217    petri_vm_fn!(
218        /// Waits for the Hyper-V shutdown IC to be ready, returning a receiver
219        /// that will be closed when it is no longer ready.
220        pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<mesh::OneshotReceiver<()>>
221    );
222    petri_vm_fn!(
223        /// Instruct the guest to shutdown via the Hyper-V shutdown IC.
224        pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
225    );
226    petri_vm_fn!(
227        /// Waits for the KVP IC to be ready, returning a sender that can be used
228        /// to send requests to it.
229        pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
230    );
231    petri_vm_fn!(
232        /// Stages the new OpenHCL file and saves the existing state.
233        pub async fn save_openhcl(
234            &mut self,
235            new_openhcl: &ResolvedArtifact,
236            flags: OpenHclServicingFlags
237        ) -> anyhow::Result<()>
238    );
239    petri_vm_fn!(
240        /// Restores OpenHCL from a previously saved state.
241        pub async fn restore_openhcl(
242            &mut self
243        ) -> anyhow::Result<()>
244    );
245    petri_vm_fn!(
246        /// Resets the hardware state of the VM, simulating a power cycle.
247        pub async fn reset(&mut self) -> anyhow::Result<()>
248    );
249    petri_vm_fn!(
250        /// Wait for a connection from a pipette agent
251        pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
252    );
253    petri_vm_fn!(
254        /// Modifies OpenHCL VTL2 settings.
255        pub async fn modify_vtl2_settings(&mut self, f: impl FnOnce(&mut Vtl2Settings)) -> anyhow::Result<()>
256    );
257
258    petri_vm_fn!(pub(crate) async fn resume(&mut self) -> anyhow::Result<()>);
259    petri_vm_fn!(pub(crate) async fn verify_save_restore(&mut self) -> anyhow::Result<()>);
260    petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
261
262    /// Wrap the provided future in a race with the worker process's halt
263    /// notification channel. This is useful for preventing a future from
264    /// waiting indefinitely if the VM dies for any reason. If the worker
265    /// process crashes the halt notification channel will return an error, and
266    /// if the VM halts for any other reason the future will complete with that
267    /// reason.
268    pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
269        &mut self,
270        future: F,
271    ) -> anyhow::Result<T> {
272        Self::wait_for_halt_or_internal(&mut self.halt, future).await
273    }
274
275    async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
276        halt: &mut PetriVmHaltReceiver,
277        future: F,
278    ) -> anyhow::Result<T> {
279        let future = &mut std::pin::pin!(future);
280        enum Either<T> {
281            Future(anyhow::Result<T>),
282            Halt(Result<HaltReason, RecvError>),
283        }
284        let res = (
285            future.map(Either::Future),
286            halt.halt_notif.recv().map(Either::Halt),
287        )
288            .race()
289            .await;
290
291        match res {
292            Either::Future(Ok(success)) => Ok(success),
293            Either::Future(Err(e)) => {
294                tracing::warn!(
295                    ?e,
296                    "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
297                );
298                let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
299                c.cancelled().await;
300                Err(e)
301            }
302            Either::Halt(halt_result) => {
303                tracing::warn!(
304                    halt_result = format_args!("{:x?}", halt_result),
305                    "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
306                );
307                let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
308                let try_again = c.until_cancelled(future).await;
309
310                match try_again {
311                    Ok(fut_result) => {
312                        halt.already_received = Some(halt_result);
313                        if let Err(e) = &fut_result {
314                            tracing::warn!(
315                                ?e,
316                                "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
317                            );
318                            let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
319                            c.cancelled().await;
320                        }
321                        fut_result
322                    }
323                    Err(_cancel) => match halt_result {
324                        Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
325                        Err(e) => Err(e).context("VM disappeared"),
326                    },
327                }
328            }
329        }
330    }
331}
332
333impl PetriVmInner {
334    async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
335        self.resources
336            .firmware_event_recv
337            .recv()
338            .await
339            .context("Failed to get firmware boot event")
340    }
341
342    async fn wait_for_enlightened_shutdown_ready(
343        &mut self,
344    ) -> anyhow::Result<mesh::OneshotReceiver<()>> {
345        let recv = self
346            .resources
347            .shutdown_ic_send
348            .call(ShutdownRpc::WaitReady, ())
349            .await?;
350
351        Ok(recv)
352    }
353
354    async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
355        let shutdown_result = self
356            .resources
357            .shutdown_ic_send
358            .call(
359                ShutdownRpc::Shutdown,
360                hyperv_ic_resources::shutdown::ShutdownParams {
361                    shutdown_type: match kind {
362                        ShutdownKind::Shutdown => {
363                            hyperv_ic_resources::shutdown::ShutdownType::PowerOff
364                        }
365                        ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
366                    },
367                    force: false,
368                },
369            )
370            .await?;
371
372        tracing::info!(?shutdown_result, "Shutdown sent");
373        anyhow::ensure!(
374            shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
375            "Got non-Ok shutdown response"
376        );
377
378        Ok(())
379    }
380
381    async fn wait_for_kvp(
382        &mut self,
383    ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
384        tracing::info!("Waiting for KVP IC");
385        let (send, _) = self
386            .resources
387            .kvp_ic_send
388            .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
389            .await
390            .context("failed to connect to KVP IC")?;
391
392        Ok(send)
393    }
394
395    async fn save_openhcl(
396        &self,
397        new_openhcl: &ResolvedArtifact,
398        flags: OpenHclServicingFlags,
399    ) -> anyhow::Result<()> {
400        let ged_send = self
401            .resources
402            .ged_send
403            .as_ref()
404            .context("openhcl not configured")?;
405
406        let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
407        self.worker
408            .save_openhcl(ged_send, flags, igvm_file.into())
409            .await
410    }
411
412    async fn restore_openhcl(&self) -> anyhow::Result<()> {
413        let ged_send = self
414            .resources
415            .ged_send
416            .as_ref()
417            .context("openhcl not configured")?;
418
419        self.worker.restore_openhcl(ged_send).await
420    }
421
422    async fn modify_vtl2_settings(
423        &mut self,
424        f: impl FnOnce(&mut Vtl2Settings),
425    ) -> anyhow::Result<()> {
426        f(self.resources.vtl2_settings.as_mut().unwrap());
427
428        let ged_send = self
429            .resources
430            .ged_send
431            .as_ref()
432            .context("openhcl not configured")?;
433
434        ged_send
435            .call_failable(
436                get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
437                prost::Message::encode_to_vec(self.resources.vtl2_settings.as_ref().unwrap()),
438            )
439            .await?;
440
441        Ok(())
442    }
443
444    async fn reset(&mut self) -> anyhow::Result<()> {
445        tracing::info!("Resetting VM");
446        self.worker.reset().await?;
447        // On linux direct pipette won't auto start, start it over serial
448        if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
449            agent.reset();
450
451            if self
452                .resources
453                .agent_image
454                .as_ref()
455                .is_some_and(|x| x.contains_pipette())
456            {
457                self.launch_linux_direct_pipette().await?;
458            }
459        }
460        Ok(())
461    }
462
463    async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
464        Self::wait_for_agent_core(
465            &self.resources.driver,
466            if set_high_vtl {
467                self.resources
468                    .vtl2_pipette_listener
469                    .as_mut()
470                    .context("VM is not configured with VTL 2")?
471            } else {
472                &mut self.resources.pipette_listener
473            },
474            &self.resources.output_dir,
475        )
476        .await
477    }
478
479    async fn wait_for_agent_core(
480        driver: &DefaultDriver,
481        listener: &mut PolledSocket<UnixListener>,
482        output_dir: &Path,
483    ) -> anyhow::Result<PipetteClient> {
484        // Wait for the pipette connection.
485        tracing::info!("listening for pipette connection");
486        let (conn, _) = listener
487            .accept()
488            .await
489            .context("failed to accept pipette connection")?;
490
491        tracing::info!("handshaking with pipette");
492        let client = PipetteClient::new(&driver, PolledSocket::new(driver, conn)?, output_dir)
493            .await
494            .context("failed to connect to pipette");
495
496        tracing::info!("completed pipette handshake");
497        client
498    }
499
500    async fn resume(&self) -> anyhow::Result<()> {
501        self.worker.resume().await?;
502        Ok(())
503    }
504
505    async fn verify_save_restore(&self) -> anyhow::Result<()> {
506        for i in 0..2 {
507            let result = self.worker.pulse_save_restore().await;
508            match result {
509                Ok(()) => {}
510                Err(RpcError::Channel(err)) => return Err(err.into()),
511                Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
512                    tracing::warn!("Reset not supported, could not test save + restore.");
513                    break;
514                }
515                Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
516                    return Err(anyhow::Error::from(err))
517                        .context(format!("Save + restore {i} failed."));
518                }
519            }
520        }
521
522        Ok(())
523    }
524
525    async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
526        // Start pipette through serial on linux direct.
527        self.resources
528            .linux_direct_serial_agent
529            .as_mut()
530            .unwrap()
531            .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
532            .await?;
533        Ok(())
534    }
535}
536
537/// Interface for inspecting OpenVMM
538pub struct OpenVmmInspector {
539    worker: Arc<Worker>,
540}
541
542#[async_trait]
543impl PetriVmInspector for OpenVmmInspector {
544    async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
545        Ok(self.worker.inspect_all().await)
546    }
547}
548
549/// Interface to the OpenVMM framebuffer
550pub struct OpenVmmFramebufferAccess {
551    view: View,
552}
553
554#[async_trait]
555impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
556    async fn screenshot(
557        &mut self,
558        image: &mut Vec<u8>,
559    ) -> anyhow::Result<Option<VmScreenshotMeta>> {
560        // Our framebuffer uses 4 bytes per pixel, approximating an
561        // BGRA image, however it only actually contains BGR data.
562        // The fourth byte is effectively noise. We can set the 'alpha'
563        // value to 0xFF to make the image opaque.
564        const BYTES_PER_PIXEL: usize = 4;
565        let (width, height) = self.view.resolution();
566        let (widthsize, heightsize) = (width as usize, height as usize);
567        let len = widthsize * heightsize * BYTES_PER_PIXEL;
568
569        image.resize(len, 0);
570        for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
571            self.view.read_line(i, line);
572            for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
573                pixel.swap(0, 2);
574                pixel[3] = 0xFF;
575            }
576        }
577
578        Ok(Some(VmScreenshotMeta {
579            color: image::ExtendedColorType::Rgba8,
580            width,
581            height,
582        }))
583    }
584}