petri/vm/openvmm/
runtime.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Methods to interact with a running [`PetriVmOpenVmm`].
5
6use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriVmFramebufferAccess;
10use crate::PetriVmInspector;
11use crate::PetriVmRuntime;
12use crate::ShutdownKind;
13use crate::VmScreenshotMeta;
14use crate::openhcl_diag::OpenHclDiagHandler;
15use crate::worker::Worker;
16use anyhow::Context;
17use async_trait::async_trait;
18use framebuffer::View;
19use futures::FutureExt;
20use futures_concurrency::future::Race;
21use get_resources::ged::FirmwareEvent;
22use hvlite_defs::rpc::PulseSaveRestoreError;
23use hyperv_ic_resources::shutdown::ShutdownRpc;
24use mesh::CancelContext;
25use mesh::Receiver;
26use mesh::RecvError;
27use mesh::rpc::RpcError;
28use mesh::rpc::RpcSend;
29use mesh_process::Mesh;
30use pal_async::socket::PolledSocket;
31use petri_artifacts_core::ResolvedArtifact;
32use pipette_client::PipetteClient;
33use std::future::Future;
34use std::path::Path;
35use std::sync::Arc;
36use std::time::Duration;
37use vmm_core_defs::HaltReason;
38use vtl2_settings_proto::Vtl2Settings;
39
40/// A running VM that tests can interact with.
41// DEVNOTE: Really the PetriVmInner is the actual VM and channels that we interact
42// with. This struct exists as a wrapper to provide error handling, such as not
43// hanging indefinitely when waiting on certain channels if the VM crashes.
44pub struct PetriVmOpenVmm {
45    inner: PetriVmInner,
46    halt: PetriVmHaltReceiver,
47}
48
49#[async_trait]
50impl PetriVmRuntime for PetriVmOpenVmm {
51    type VmInspector = OpenVmmInspector;
52    type VmFramebufferAccess = OpenVmmFramebufferAccess;
53
54    async fn teardown(self) -> anyhow::Result<()> {
55        tracing::info!("waiting for worker");
56        let worker = Arc::into_inner(self.inner.worker)
57            .context("all references to the OpenVMM worker have not been closed")?;
58        worker.shutdown().await?;
59
60        tracing::info!("Worker quit, waiting for mesh");
61        self.inner.mesh.shutdown().await;
62
63        tracing::info!("Mesh shutdown, waiting for logging tasks");
64        for t in self.inner.resources.log_stream_tasks {
65            t.await?;
66        }
67
68        Ok(())
69    }
70
71    async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReason> {
72        let halt_reason = if let Some(already) = self.halt.already_received.take() {
73            already.map_err(anyhow::Error::from)
74        } else {
75            self.halt
76                .halt_notif
77                .recv()
78                .await
79                .context("Failed to get halt reason")
80        }?;
81
82        tracing::info!(?halt_reason, "Got halt reason");
83
84        let halt_reason = match halt_reason {
85            HaltReason::PowerOff => PetriHaltReason::PowerOff,
86            HaltReason::Reset => PetriHaltReason::Reset,
87            HaltReason::Hibernate => PetriHaltReason::Hibernate,
88            HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
89            _ => PetriHaltReason::Other,
90        };
91
92        if allow_reset && halt_reason == PetriHaltReason::Reset {
93            self.reset().await?
94        }
95
96        Ok(halt_reason)
97    }
98
99    async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
100        Self::wait_for_agent(self, set_high_vtl).await
101    }
102
103    fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
104        self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
105            OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
106                self.inner.resources.driver.clone(),
107                path,
108            ))
109        })
110    }
111
112    async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
113        Self::wait_for_boot_event(self).await
114    }
115
116    async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
117        Self::wait_for_enlightened_shutdown_ready(self)
118            .await
119            .map(|_| ())
120    }
121
122    async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
123        Self::send_enlightened_shutdown(self, kind).await
124    }
125
126    async fn restart_openhcl(
127        &mut self,
128        new_openhcl: &ResolvedArtifact,
129        flags: OpenHclServicingFlags,
130    ) -> anyhow::Result<()> {
131        Self::save_openhcl(self, new_openhcl, flags).await?;
132        Self::restore_openhcl(self).await
133    }
134
135    async fn save_openhcl(
136        &mut self,
137        new_openhcl: &ResolvedArtifact,
138        flags: OpenHclServicingFlags,
139    ) -> anyhow::Result<()> {
140        Self::save_openhcl(self, new_openhcl, flags).await
141    }
142
143    async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
144        Self::restore_openhcl(self).await
145    }
146
147    fn inspector(&self) -> Option<OpenVmmInspector> {
148        Some(OpenVmmInspector {
149            worker: self.inner.worker.clone(),
150        })
151    }
152
153    fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
154        self.inner
155            .framebuffer_view
156            .take()
157            .map(|view| OpenVmmFramebufferAccess { view })
158    }
159
160    async fn reset(&mut self) -> anyhow::Result<()> {
161        Self::reset(self).await
162    }
163}
164
165pub(super) struct PetriVmInner {
166    pub(super) resources: PetriVmResourcesOpenVmm,
167    pub(super) mesh: Mesh,
168    pub(super) worker: Arc<Worker>,
169    pub(super) framebuffer_view: Option<View>,
170}
171
172struct PetriVmHaltReceiver {
173    halt_notif: Receiver<HaltReason>,
174    already_received: Option<Result<HaltReason, RecvError>>,
175}
176
177// Wrap a PetriVmInner function in [`PetriVmOpenVmm::wait_for_halt_or_internal`] to
178// provide better error handling.
179macro_rules! petri_vm_fn {
180    ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
181        $(#[$($attrss)*])*
182        $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
183            Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
184        }
185    };
186}
187
188// TODO: Add all runtime functions that are not backend specific
189// to the `PetriVmRuntime` trait
190impl PetriVmOpenVmm {
191    pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
192        Self {
193            inner,
194            halt: PetriVmHaltReceiver {
195                halt_notif,
196                already_received: None,
197            },
198        }
199    }
200
201    /// Get the path to the VTL 2 vsock socket, if the VM is configured with OpenHCL.
202    pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
203        self.inner
204            .resources
205            .vtl2_vsock_path
206            .as_deref()
207            .context("VM is not configured with OpenHCL")
208    }
209
210    petri_vm_fn!(
211        /// Waits for an event emitted by the firmware about its boot status, and
212        /// returns that status.
213        pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
214    );
215    petri_vm_fn!(
216        /// Waits for the Hyper-V shutdown IC to be ready, returning a receiver
217        /// that will be closed when it is no longer ready.
218        pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<mesh::OneshotReceiver<()>>
219    );
220    petri_vm_fn!(
221        /// Instruct the guest to shutdown via the Hyper-V shutdown IC.
222        pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
223    );
224    petri_vm_fn!(
225        /// Waits for the KVP IC to be ready, returning a sender that can be used
226        /// to send requests to it.
227        pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
228    );
229    petri_vm_fn!(
230        /// Stages the new OpenHCL file and saves the existing state.
231        pub async fn save_openhcl(
232            &mut self,
233            new_openhcl: &ResolvedArtifact,
234            flags: OpenHclServicingFlags
235        ) -> anyhow::Result<()>
236    );
237    petri_vm_fn!(
238        /// Restores OpenHCL from a previously saved state.
239        pub async fn restore_openhcl(
240            &mut self
241        ) -> anyhow::Result<()>
242    );
243    petri_vm_fn!(
244        /// Resets the hardware state of the VM, simulating a power cycle.
245        pub async fn reset(&mut self) -> anyhow::Result<()>
246    );
247    petri_vm_fn!(
248        /// Wait for a connection from a pipette agent
249        pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
250    );
251    petri_vm_fn!(
252        /// Modifies OpenHCL VTL2 settings.
253        pub async fn modify_vtl2_settings(&mut self, f: impl FnOnce(&mut Vtl2Settings)) -> anyhow::Result<()>
254    );
255
256    petri_vm_fn!(pub(crate) async fn resume(&mut self) -> anyhow::Result<()>);
257    petri_vm_fn!(pub(crate) async fn verify_save_restore(&mut self) -> anyhow::Result<()>);
258    petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
259
260    /// Wrap the provided future in a race with the worker process's halt
261    /// notification channel. This is useful for preventing a future from
262    /// waiting indefinitely if the VM dies for any reason. If the worker
263    /// process crashes the halt notification channel will return an error, and
264    /// if the VM halts for any other reason the future will complete with that
265    /// reason.
266    pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
267        &mut self,
268        future: F,
269    ) -> anyhow::Result<T> {
270        Self::wait_for_halt_or_internal(&mut self.halt, future).await
271    }
272
273    async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
274        halt: &mut PetriVmHaltReceiver,
275        future: F,
276    ) -> anyhow::Result<T> {
277        let future = &mut std::pin::pin!(future);
278        enum Either<T> {
279            Future(anyhow::Result<T>),
280            Halt(Result<HaltReason, RecvError>),
281        }
282        let res = (
283            future.map(Either::Future),
284            halt.halt_notif.recv().map(Either::Halt),
285        )
286            .race()
287            .await;
288
289        match res {
290            Either::Future(Ok(success)) => Ok(success),
291            Either::Future(Err(e)) => {
292                tracing::warn!(
293                    ?e,
294                    "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
295                );
296                let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
297                c.cancelled().await;
298                Err(e)
299            }
300            Either::Halt(halt_result) => {
301                tracing::warn!(
302                    halt_result = format_args!("{:x?}", halt_result),
303                    "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
304                );
305                let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
306                let try_again = c.until_cancelled(future).await;
307
308                match try_again {
309                    Ok(fut_result) => {
310                        halt.already_received = Some(halt_result);
311                        if let Err(e) = &fut_result {
312                            tracing::warn!(
313                                ?e,
314                                "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
315                            );
316                            let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
317                            c.cancelled().await;
318                        }
319                        fut_result
320                    }
321                    Err(_cancel) => match halt_result {
322                        Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
323                        Err(e) => Err(e).context("VM disappeared"),
324                    },
325                }
326            }
327        }
328    }
329}
330
331impl PetriVmInner {
332    async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
333        self.resources
334            .firmware_event_recv
335            .recv()
336            .await
337            .context("Failed to get firmware boot event")
338    }
339
340    async fn wait_for_enlightened_shutdown_ready(
341        &mut self,
342    ) -> anyhow::Result<mesh::OneshotReceiver<()>> {
343        let recv = self
344            .resources
345            .shutdown_ic_send
346            .call(ShutdownRpc::WaitReady, ())
347            .await?;
348
349        Ok(recv)
350    }
351
352    async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
353        let shutdown_result = self
354            .resources
355            .shutdown_ic_send
356            .call(
357                ShutdownRpc::Shutdown,
358                hyperv_ic_resources::shutdown::ShutdownParams {
359                    shutdown_type: match kind {
360                        ShutdownKind::Shutdown => {
361                            hyperv_ic_resources::shutdown::ShutdownType::PowerOff
362                        }
363                        ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
364                    },
365                    force: false,
366                },
367            )
368            .await?;
369
370        tracing::info!(?shutdown_result, "Shutdown sent");
371        anyhow::ensure!(
372            shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
373            "Got non-Ok shutdown response"
374        );
375
376        Ok(())
377    }
378
379    async fn wait_for_kvp(
380        &mut self,
381    ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
382        tracing::info!("Waiting for KVP IC");
383        let (send, _) = self
384            .resources
385            .kvp_ic_send
386            .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
387            .await
388            .context("failed to connect to KVP IC")?;
389
390        Ok(send)
391    }
392
393    async fn save_openhcl(
394        &self,
395        new_openhcl: &ResolvedArtifact,
396        flags: OpenHclServicingFlags,
397    ) -> anyhow::Result<()> {
398        let ged_send = self
399            .resources
400            .ged_send
401            .as_ref()
402            .context("openhcl not configured")?;
403
404        let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
405        self.worker
406            .save_openhcl(ged_send, flags, igvm_file.into())
407            .await
408    }
409
410    async fn restore_openhcl(&self) -> anyhow::Result<()> {
411        let ged_send = self
412            .resources
413            .ged_send
414            .as_ref()
415            .context("openhcl not configured")?;
416
417        self.worker.restore_openhcl(ged_send).await
418    }
419
420    async fn modify_vtl2_settings(
421        &mut self,
422        f: impl FnOnce(&mut Vtl2Settings),
423    ) -> anyhow::Result<()> {
424        f(self.resources.vtl2_settings.as_mut().unwrap());
425
426        let ged_send = self
427            .resources
428            .ged_send
429            .as_ref()
430            .context("openhcl not configured")?;
431
432        ged_send
433            .call_failable(
434                get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
435                prost::Message::encode_to_vec(self.resources.vtl2_settings.as_ref().unwrap()),
436            )
437            .await?;
438
439        Ok(())
440    }
441
442    async fn reset(&mut self) -> anyhow::Result<()> {
443        tracing::info!("Resetting VM");
444        self.worker.reset().await?;
445        // On linux direct pipette won't auto start, start it over serial
446        if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
447            agent.reset();
448
449            if self
450                .resources
451                .agent_image
452                .as_ref()
453                .is_some_and(|x| x.contains_pipette())
454            {
455                self.launch_linux_direct_pipette().await?;
456            }
457        }
458        Ok(())
459    }
460
461    async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
462        let listener = if set_high_vtl {
463            self.resources
464                .vtl2_pipette_listener
465                .as_mut()
466                .context("VM is not configured with VTL 2")?
467        } else {
468            &mut self.resources.pipette_listener
469        };
470
471        tracing::info!(set_high_vtl, "listening for pipette connection");
472        let (conn, _) = listener
473            .accept()
474            .await
475            .context("failed to accept pipette connection")?;
476        tracing::info!(set_high_vtl, "handshaking with pipette");
477        let client = PipetteClient::new(
478            &self.resources.driver,
479            PolledSocket::new(&self.resources.driver, conn)?,
480            &self.resources.output_dir,
481        )
482        .await
483        .context("failed to connect to pipette");
484        tracing::info!(set_high_vtl, "completed pipette handshake");
485        client
486    }
487
488    async fn resume(&self) -> anyhow::Result<()> {
489        self.worker.resume().await?;
490        Ok(())
491    }
492
493    async fn verify_save_restore(&self) -> anyhow::Result<()> {
494        for i in 0..2 {
495            let result = self.worker.pulse_save_restore().await;
496            match result {
497                Ok(()) => {}
498                Err(RpcError::Channel(err)) => return Err(err.into()),
499                Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
500                    tracing::warn!("Reset not supported, could not test save + restore.");
501                    break;
502                }
503                Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
504                    return Err(anyhow::Error::from(err))
505                        .context(format!("Save + restore {i} failed."));
506                }
507            }
508        }
509
510        Ok(())
511    }
512
513    async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
514        // Start pipette through serial on linux direct.
515        self.resources
516            .linux_direct_serial_agent
517            .as_mut()
518            .unwrap()
519            .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
520            .await?;
521        Ok(())
522    }
523}
524
525/// Interface for inspecting OpenVMM
526pub struct OpenVmmInspector {
527    worker: Arc<Worker>,
528}
529
530#[async_trait]
531impl PetriVmInspector for OpenVmmInspector {
532    async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
533        Ok(self.worker.inspect_all().await)
534    }
535}
536
537/// Interface to the OpenVMM framebuffer
538pub struct OpenVmmFramebufferAccess {
539    view: View,
540}
541
542#[async_trait]
543impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
544    async fn screenshot(
545        &mut self,
546        image: &mut Vec<u8>,
547    ) -> anyhow::Result<Option<VmScreenshotMeta>> {
548        // Our framebuffer uses 4 bytes per pixel, approximating an
549        // BGRA image, however it only actually contains BGR data.
550        // The fourth byte is effectively noise. We can set the 'alpha'
551        // value to 0xFF to make the image opaque.
552        const BYTES_PER_PIXEL: usize = 4;
553        let (width, height) = self.view.resolution();
554        let (widthsize, heightsize) = (width as usize, height as usize);
555        let len = widthsize * heightsize * BYTES_PER_PIXEL;
556
557        image.resize(len, 0);
558        for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
559            self.view.read_line(i, line);
560            for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
561                pixel.swap(0, 2);
562                pixel[3] = 0xFF;
563            }
564        }
565
566        Ok(Some(VmScreenshotMeta {
567            color: image::ExtendedColorType::Rgba8,
568            width,
569            height,
570        }))
571    }
572}