Skip to main content

petri/vm/openvmm/
runtime.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Methods to interact with a running [`PetriVmOpenVmm`].
5
6use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriHaltReasonDetail;
10use crate::PetriVmFramebufferAccess;
11use crate::PetriVmInspector;
12use crate::PetriVmRuntime;
13use crate::ShutdownKind;
14use crate::VmScreenshotMeta;
15use crate::openhcl_diag::OpenHclDiagHandler;
16use crate::worker::Worker;
17use anyhow::Context;
18use async_trait::async_trait;
19use framebuffer::View;
20use futures::FutureExt;
21use futures_concurrency::future::Race;
22use get_resources::ged::FirmwareEvent;
23use hyperv_ic_resources::shutdown::ShutdownRpc;
24use mesh::CancelContext;
25use mesh::Receiver;
26use mesh::RecvError;
27use mesh::rpc::RpcError;
28use mesh::rpc::RpcSend;
29use mesh_process::Mesh;
30use openvmm_defs::rpc::PulseSaveRestoreError;
31use pal_async::socket::PolledSocket;
32use petri_artifacts_core::ResolvedArtifact;
33use pipette_client::PipetteClient;
34use std::future::Future;
35use std::path::Path;
36use std::sync::Arc;
37use std::time::Duration;
38use vmm_core_defs::HaltReason;
39use vtl2_settings_proto::Vtl2Settings;
40
41/// A running VM that tests can interact with.
42// DEVNOTE: Really the PetriVmInner is the actual VM and channels that we interact
43// with. This struct exists as a wrapper to provide error handling, such as not
44// hanging indefinitely when waiting on certain channels if the VM crashes.
45pub struct PetriVmOpenVmm {
46    inner: PetriVmInner,
47    halt: PetriVmHaltReceiver,
48}
49
50#[async_trait]
51impl PetriVmRuntime for PetriVmOpenVmm {
52    type VmInspector = OpenVmmInspector;
53    type VmFramebufferAccess = OpenVmmFramebufferAccess;
54
55    async fn teardown(self) -> anyhow::Result<()> {
56        tracing::info!("waiting for worker");
57        let worker = Arc::into_inner(self.inner.worker)
58            .context("all references to the OpenVMM worker have not been closed")?;
59        worker.shutdown().await?;
60
61        tracing::info!("Worker quit, waiting for mesh");
62        self.inner.mesh.shutdown().await;
63
64        tracing::info!("Mesh shutdown, waiting for logging tasks");
65        for t in self.inner.resources.log_stream_tasks {
66            t.await?;
67        }
68
69        Ok(())
70    }
71
72    async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReasonDetail> {
73        let halt_reason = if let Some(already) = self.halt.already_received.take() {
74            already.map_err(anyhow::Error::from)
75        } else {
76            self.halt
77                .halt_notif
78                .recv()
79                .await
80                .context("Failed to get halt reason")
81        }?;
82
83        tracing::info!(?halt_reason, "Got halt reason");
84
85        let reason = match halt_reason {
86            HaltReason::PowerOff => PetriHaltReason::PowerOff,
87            HaltReason::Reset => PetriHaltReason::Reset,
88            HaltReason::Hibernate => PetriHaltReason::Hibernate,
89            HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
90            _ => PetriHaltReason::Other,
91        };
92
93        if allow_reset && reason == PetriHaltReason::Reset {
94            self.reset().await?
95        }
96
97        Ok(PetriHaltReasonDetail {
98            reason,
99            detail: format!("{halt_reason:?}"),
100        })
101    }
102
103    async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
104        Self::wait_for_agent(self, set_high_vtl).await
105    }
106
107    fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
108        self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
109            OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
110                self.inner.resources.driver.clone(),
111                path,
112            ))
113        })
114    }
115
116    async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
117        Self::wait_for_boot_event(self).await
118    }
119
120    async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
121        Self::wait_for_enlightened_shutdown_ready(self)
122            .await
123            .map(|_| ())
124    }
125
126    async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
127        Self::send_enlightened_shutdown(self, kind).await
128    }
129
130    async fn restart_openhcl(
131        &mut self,
132        new_openhcl: &ResolvedArtifact,
133        flags: OpenHclServicingFlags,
134    ) -> anyhow::Result<()> {
135        Self::save_openhcl(self, new_openhcl, flags).await?;
136        Self::restore_openhcl(self).await
137    }
138
139    async fn save_openhcl(
140        &mut self,
141        new_openhcl: &ResolvedArtifact,
142        flags: OpenHclServicingFlags,
143    ) -> anyhow::Result<()> {
144        Self::save_openhcl(self, new_openhcl, flags).await
145    }
146
147    async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
148        Self::restore_openhcl(self).await
149    }
150
151    async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
152        Self::update_command_line(self, command_line).await
153    }
154
155    fn inspector(&self) -> Option<OpenVmmInspector> {
156        Some(OpenVmmInspector {
157            worker: self.inner.worker.clone(),
158        })
159    }
160
161    fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
162        self.inner
163            .framebuffer_view
164            .take()
165            .map(|view| OpenVmmFramebufferAccess { view })
166    }
167
168    async fn reset(&mut self) -> anyhow::Result<()> {
169        Self::reset(self).await
170    }
171
172    async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()> {
173        Self::set_vtl2_settings(self, settings).await
174    }
175
176    async fn set_vmbus_drive(
177        &mut self,
178        _disk: &crate::Drive,
179        _controller_id: &guid::Guid,
180        _controller_location: u32,
181    ) -> anyhow::Result<()> {
182        todo!("openvmm set vmbus drive")
183    }
184
185    async fn add_pcie_device(
186        &mut self,
187        port_name: String,
188        resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>,
189    ) -> anyhow::Result<()> {
190        Self::add_pcie_device(self, port_name, resource).await
191    }
192
193    async fn remove_pcie_device(&mut self, port_name: String) -> anyhow::Result<()> {
194        Self::remove_pcie_device(self, port_name).await
195    }
196}
197
198pub(super) struct PetriVmInner {
199    pub(super) resources: PetriVmResourcesOpenVmm,
200    pub(super) mesh: Mesh,
201    pub(super) worker: Arc<Worker>,
202    pub(super) framebuffer_view: Option<View>,
203    /// Whether CIDATA has already been mounted inside the guest.
204    /// Used to skip re-mounting after save/restore (where guest state is
205    /// preserved) while still mounting after a full reset/reboot.
206    pub(super) cidata_mounted: bool,
207    /// Resolved TCP pipette port for no-vmbus Windows guests. Set once
208    /// during startup and reused across reconnections (e.g. after reset).
209    pub(super) tcp_pipette_port: Option<u16>,
210    pub(super) pid: i32,
211}
212
213struct PetriVmHaltReceiver {
214    halt_notif: Receiver<HaltReason>,
215    already_received: Option<Result<HaltReason, RecvError>>,
216}
217
218// Wrap a PetriVmInner function in [`PetriVmOpenVmm::wait_for_halt_or_internal`] to
219// provide better error handling.
220macro_rules! petri_vm_fn {
221    ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
222        $(#[$($attrss)*])*
223        $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
224            Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
225        }
226    };
227}
228
229// TODO: Add all runtime functions that are not backend specific
230// to the `PetriVmRuntime` trait
231impl PetriVmOpenVmm {
232    pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
233        Self {
234            inner,
235            halt: PetriVmHaltReceiver {
236                halt_notif,
237                already_received: None,
238            },
239        }
240    }
241
242    /// Get the path to the VTL 2 vsock socket, if the VM is configured with OpenHCL.
243    pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
244        self.inner
245            .resources
246            .vtl2_vsock_path
247            .as_deref()
248            .context("VM is not configured with OpenHCL")
249    }
250
251    /// Get the PID of the openvmm child process.
252    pub fn pid(&self) -> i32 {
253        self.inner.pid
254    }
255
256    petri_vm_fn!(
257        /// Waits for an event emitted by the firmware about its boot status, and
258        /// returns that status.
259        pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
260    );
261    petri_vm_fn!(
262        /// Waits for the Hyper-V shutdown IC to be ready, returning a receiver
263        /// that will be closed when it is no longer ready. Returns `None` if
264        /// the shutdown IC is not configured.
265        pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<Option<mesh::OneshotReceiver<()>>>
266    );
267    petri_vm_fn!(
268        /// Instruct the guest to shutdown via the Hyper-V shutdown IC.
269        pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
270    );
271    petri_vm_fn!(
272        /// Waits for the KVP IC to be ready, returning a sender that can be used
273        /// to send requests to it.
274        pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
275    );
276    petri_vm_fn!(
277        /// Stages the new OpenHCL file and saves the existing state.
278        pub async fn save_openhcl(
279            &mut self,
280            new_openhcl: &ResolvedArtifact,
281            flags: OpenHclServicingFlags
282        ) -> anyhow::Result<()>
283    );
284    petri_vm_fn!(
285        /// Restores OpenHCL from a previously saved state.
286        pub async fn restore_openhcl(
287            &mut self
288        ) -> anyhow::Result<()>
289    );
290    petri_vm_fn!(
291        /// Updates the command line parameters of the running VM.
292        pub async fn update_command_line(
293            &mut self,
294            command_line: &str
295        ) -> anyhow::Result<()>
296    );
297
298    petri_vm_fn!(
299        /// Hot-add a PCIe device to a named port at runtime.
300        pub async fn add_pcie_device(
301            &mut self,
302            port_name: String,
303            resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>
304        ) -> anyhow::Result<()>
305    );
306    petri_vm_fn!(
307        /// Hot-remove a PCIe device from a named port at runtime.
308        pub async fn remove_pcie_device(
309            &mut self,
310            port_name: String
311        ) -> anyhow::Result<()>
312    );
313    petri_vm_fn!(
314        /// Resets the hardware state of the VM, simulating a power cycle.
315        pub async fn reset(&mut self) -> anyhow::Result<()>
316    );
317    petri_vm_fn!(
318        /// Wait for a connection from a pipette agent
319        pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
320    );
321    petri_vm_fn!(
322        /// Set the OpenHCL VTL2 settings.
323        pub async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()>
324    );
325
326    petri_vm_fn!(
327        /// Pause the VM. Call [`resume`](Self::resume) to continue execution.
328        pub async fn pause(&mut self) -> anyhow::Result<()>
329    );
330    petri_vm_fn!(
331        /// Save the VM's device and processor state, returning the serialized
332        /// bytes. The VM should be paused before calling this.
333        pub async fn save_state(&mut self) -> anyhow::Result<Vec<u8>>
334    );
335    petri_vm_fn!(
336        /// Resume a paused VM.
337        pub async fn resume(&mut self) -> anyhow::Result<()>
338    );
339    petri_vm_fn!(
340        /// Perform a pulse save/restore cycle: pause the VM, save all state,
341        /// reset, restore, and resume. Useful for verifying that device state
342        /// survives a save/restore round-trip.
343        pub async fn verify_save_restore(&mut self) -> anyhow::Result<()>
344    );
345    petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
346
347    /// Wrap the provided future in a race with the worker process's halt
348    /// notification channel. This is useful for preventing a future from
349    /// waiting indefinitely if the VM dies for any reason. If the worker
350    /// process crashes the halt notification channel will return an error, and
351    /// if the VM halts for any other reason the future will complete with that
352    /// reason.
353    pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
354        &mut self,
355        future: F,
356    ) -> anyhow::Result<T> {
357        Self::wait_for_halt_or_internal(&mut self.halt, future).await
358    }
359
360    async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
361        halt: &mut PetriVmHaltReceiver,
362        future: F,
363    ) -> anyhow::Result<T> {
364        let future = &mut std::pin::pin!(future);
365        enum Either<T> {
366            Future(anyhow::Result<T>),
367            Halt(Result<HaltReason, RecvError>),
368        }
369        let res = (
370            future.map(Either::Future),
371            halt.halt_notif.recv().map(Either::Halt),
372        )
373            .race()
374            .await;
375
376        match res {
377            Either::Future(Ok(success)) => Ok(success),
378            Either::Future(Err(e)) => {
379                tracing::warn!(
380                    ?e,
381                    "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
382                );
383                let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
384                c.cancelled().await;
385                Err(e)
386            }
387            Either::Halt(halt_result) => {
388                tracing::warn!(
389                    halt_result = format_args!("{:x?}", halt_result),
390                    "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
391                );
392                let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
393                let try_again = c.until_cancelled(future).await;
394
395                match try_again {
396                    Ok(fut_result) => {
397                        halt.already_received = Some(halt_result);
398                        if let Err(e) = &fut_result {
399                            tracing::warn!(
400                                ?e,
401                                "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
402                            );
403                            let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
404                            c.cancelled().await;
405                        }
406                        fut_result
407                    }
408                    Err(_cancel) => match halt_result {
409                        Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
410                        Err(e) => Err(e).context("VM disappeared"),
411                    },
412                }
413            }
414        }
415    }
416}
417
418impl PetriVmInner {
419    async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
420        self.resources
421            .firmware_event_recv
422            .recv()
423            .await
424            .context("Failed to get firmware boot event")
425    }
426
427    async fn wait_for_enlightened_shutdown_ready(
428        &mut self,
429    ) -> anyhow::Result<Option<mesh::OneshotReceiver<()>>> {
430        let Some(send) = self.resources.shutdown_ic_send.as_ref() else {
431            return Ok(None);
432        };
433        let recv = send
434            .call(ShutdownRpc::WaitReady, ())
435            .await
436            .context("waiting for shutdown IC to be ready")?;
437        Ok(Some(recv))
438    }
439
440    async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
441        let send = self
442            .resources
443            .shutdown_ic_send
444            .as_ref()
445            .context("shutdown IC not configured")?;
446        let shutdown_result = send
447            .call(
448                ShutdownRpc::Shutdown,
449                hyperv_ic_resources::shutdown::ShutdownParams {
450                    shutdown_type: match kind {
451                        ShutdownKind::Shutdown => {
452                            hyperv_ic_resources::shutdown::ShutdownType::PowerOff
453                        }
454                        ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
455                    },
456                    force: false,
457                },
458            )
459            .await?;
460
461        tracing::info!(?shutdown_result, "Shutdown sent");
462        anyhow::ensure!(
463            shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
464            "Got non-Ok shutdown response"
465        );
466
467        Ok(())
468    }
469
470    async fn wait_for_kvp(
471        &mut self,
472    ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
473        tracing::info!("Waiting for KVP IC");
474        let send = self
475            .resources
476            .kvp_ic_send
477            .as_ref()
478            .context("KVP IC not configured")?;
479        let (send, _) = send
480            .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
481            .await
482            .context("failed to connect to KVP IC")?;
483
484        Ok(send)
485    }
486
487    async fn save_openhcl(
488        &self,
489        new_openhcl: &ResolvedArtifact,
490        flags: OpenHclServicingFlags,
491    ) -> anyhow::Result<()> {
492        let ged_send = self
493            .resources
494            .ged_send
495            .as_ref()
496            .context("openhcl not configured")?;
497
498        let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
499        self.worker
500            .save_openhcl(ged_send, flags, igvm_file.into())
501            .await
502    }
503
504    async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
505        self.worker.update_command_line(command_line).await
506    }
507
508    async fn add_pcie_device(
509        &mut self,
510        port_name: String,
511        resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>,
512    ) -> anyhow::Result<()> {
513        self.worker.add_pcie_device(port_name, resource).await
514    }
515
516    async fn remove_pcie_device(&mut self, port_name: String) -> anyhow::Result<()> {
517        self.worker.remove_pcie_device(port_name).await
518    }
519
520    async fn restore_openhcl(&self) -> anyhow::Result<()> {
521        let ged_send = self
522            .resources
523            .ged_send
524            .as_ref()
525            .context("openhcl not configured")?;
526
527        self.worker.restore_openhcl(ged_send).await
528    }
529
530    async fn set_vtl2_settings(&self, settings: &Vtl2Settings) -> anyhow::Result<()> {
531        let ged_send = self
532            .resources
533            .ged_send
534            .as_ref()
535            .context("openhcl not configured")?;
536
537        ged_send
538            .call_failable(
539                get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
540                prost::Message::encode_to_vec(settings),
541            )
542            .await?;
543
544        Ok(())
545    }
546
547    async fn reset(&mut self) -> anyhow::Result<()> {
548        tracing::info!("Resetting VM");
549        self.worker.reset().await?;
550        // Guest state is lost on reset, so CIDATA needs to be remounted.
551        self.cidata_mounted = false;
552        // On linux direct, pipette won't auto-start unless it is the init
553        // process. When it isn't, restart it over serial. (When pipette runs
554        // as PID 1 via rdinit=/pipette, linux_direct_serial_agent is None, so
555        // this block is skipped and pipette restarts automatically on reboot.)
556        if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
557            agent.reset();
558
559            if self.resources.properties.using_vtl0_pipette {
560                self.launch_linux_direct_pipette().await?;
561            }
562        }
563        Ok(())
564    }
565
566    async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
567        // Use TCP transport if configured (Windows no-vmbus guests).
568        if let Some(port) = self.tcp_pipette_port {
569            assert!(!set_high_vtl, "TCP pipette transport does not support VTL2");
570            return self.wait_for_agent_tcp(port).await;
571        }
572
573        let listener = if set_high_vtl {
574            self.resources
575                .vtl2_pipette_listener
576                .as_mut()
577                .context("VM is not configured with VTL 2")?
578        } else {
579            &mut self.resources.pipette_listener
580        };
581
582        tracing::info!(set_high_vtl, "listening for pipette connection");
583        let client = loop {
584            let (conn, _) = listener
585                .accept()
586                .await
587                .context("failed to accept pipette connection")?;
588            tracing::info!(set_high_vtl, "handshaking with pipette");
589            let socket = PolledSocket::new(&self.resources.driver, conn)?;
590            match PipetteClient::new(&self.resources.driver, socket, &self.resources.output_dir)
591                .await
592            {
593                Ok(client) => break client,
594                Err(e) => {
595                    // During save/restore cycles, stale connections from
596                    // previous hvsock relay sessions can accumulate in the
597                    // listener backlog. These are already-closed sockets
598                    // that fail during the mesh handshake. Drain them and
599                    // retry until we get a live connection.
600                    tracing::warn!(
601                        error = e.as_ref() as &dyn std::error::Error,
602                        "pipette connection not live, retrying"
603                    );
604                }
605            }
606        };
607        tracing::info!(set_high_vtl, "completed pipette handshake");
608
609        // When pipette runs as PID 1 init and a CIDATA agent disk is
610        // attached, mount it so test files are available at /cidata.
611        // Skip if already mounted (e.g. reconnecting after save/restore
612        // where guest state is preserved).
613        if !set_high_vtl
614            && self.resources.properties.uses_pipette_as_init
615            && self.resources.properties.has_agent_disk
616            && !self.cidata_mounted
617        {
618            tracing::info!("mounting CIDATA agent disk via pipette");
619            client
620                .unix_shell()
621                .cmd("mkdir")
622                .arg("-p")
623                .arg("/cidata")
624                .run()
625                .await
626                .context("failed to create /cidata mount point")?;
627            client
628                .unix_shell()
629                .cmd("mount")
630                .arg("LABEL=cidata")
631                .arg("/cidata")
632                .run()
633                .await
634                .context("failed to mount CIDATA disk")?;
635            self.cidata_mounted = true;
636        }
637
638        Ok(client)
639    }
640
641    /// Connect to pipette via TCP through consomme port forwarding.
642    ///
643    /// The guest pipette agent listens on `0.0.0.0:{port}` and consomme
644    /// forwards connections from `localhost:{port}` on the host into the
645    /// guest. We retry until the guest's network stack and pipette are up.
646    async fn wait_for_agent_tcp(&mut self, port: u16) -> anyhow::Result<PipetteClient> {
647        tracing::info!(port, "connecting to pipette via TCP");
648        let addr = std::net::SocketAddr::from((std::net::Ipv4Addr::LOCALHOST, port));
649        let client = loop {
650            match PolledSocket::connect_tcp(&self.resources.driver, addr).await {
651                Ok(socket) => {
652                    socket
653                        .get()
654                        .set_nodelay(true)
655                        .context("failed to set TCP_NODELAY")?;
656                    tracing::info!("TCP connected, handshaking with pipette");
657                    match PipetteClient::new(
658                        &self.resources.driver,
659                        socket,
660                        &self.resources.output_dir,
661                    )
662                    .await
663                    {
664                        Ok(client) => break client,
665                        Err(e) => {
666                            tracing::warn!(
667                                error = e.as_ref() as &dyn std::error::Error,
668                                "pipette TCP connection failed, retrying"
669                            );
670                        }
671                    }
672                }
673                Err(e) => {
674                    tracing::debug!(
675                        error = &e as &dyn std::error::Error,
676                        "TCP connect failed, guest not ready yet"
677                    );
678                }
679            }
680            // Wait before retrying — guest network stack may not be up yet.
681            pal_async::timer::PolledTimer::new(&self.resources.driver)
682                .sleep(Duration::from_secs(1))
683                .await;
684        };
685        tracing::info!("completed pipette TCP handshake");
686        Ok(client)
687    }
688
689    async fn pause(&self) -> anyhow::Result<()> {
690        self.worker.pause().await?;
691        Ok(())
692    }
693
694    async fn save_state(&self) -> anyhow::Result<Vec<u8>> {
695        let state_msg = self.worker.save().await?;
696        Ok(mesh::payload::encode(state_msg))
697    }
698
699    async fn resume(&self) -> anyhow::Result<()> {
700        self.worker.resume().await?;
701        Ok(())
702    }
703
704    async fn verify_save_restore(&self) -> anyhow::Result<()> {
705        for i in 0..2 {
706            let result = self.worker.pulse_save_restore().await;
707            match result {
708                Ok(()) => {}
709                Err(RpcError::Channel(err)) => return Err(err.into()),
710                Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
711                    tracing::warn!("Reset not supported, could not test save + restore.");
712                    break;
713                }
714                Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
715                    return Err(anyhow::Error::from(err))
716                        .context(format!("Save + restore {i} failed."));
717                }
718            }
719        }
720
721        Ok(())
722    }
723
724    async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
725        // Start pipette through serial on linux direct.
726        self.resources
727            .linux_direct_serial_agent
728            .as_mut()
729            .unwrap()
730            .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
731            .await?;
732        Ok(())
733    }
734}
735
736/// Interface for inspecting OpenVMM
737pub struct OpenVmmInspector {
738    worker: Arc<Worker>,
739}
740
741#[async_trait]
742impl PetriVmInspector for OpenVmmInspector {
743    async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
744        Ok(self.worker.inspect_all().await)
745    }
746}
747
748/// Interface to the OpenVMM framebuffer
749pub struct OpenVmmFramebufferAccess {
750    view: View,
751}
752
753#[async_trait]
754impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
755    async fn screenshot(
756        &mut self,
757        image: &mut Vec<u8>,
758    ) -> anyhow::Result<Option<VmScreenshotMeta>> {
759        // Our framebuffer uses 4 bytes per pixel, approximating an
760        // BGRA image, however it only actually contains BGR data.
761        // The fourth byte is effectively noise. We can set the 'alpha'
762        // value to 0xFF to make the image opaque.
763        const BYTES_PER_PIXEL: usize = 4;
764        let (width, height) = self.view.resolution();
765        let (widthsize, heightsize) = (width as usize, height as usize);
766        let len = widthsize * heightsize * BYTES_PER_PIXEL;
767
768        image.resize(len, 0);
769        for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
770            self.view.read_line(i, line);
771            for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
772                pixel.swap(0, 2);
773                pixel[3] = 0xFF;
774            }
775        }
776
777        Ok(Some(VmScreenshotMeta {
778            color: image::ExtendedColorType::Rgba8,
779            width,
780            height,
781        }))
782    }
783}