1use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriHaltReasonDetail;
10use crate::PetriVmFramebufferAccess;
11use crate::PetriVmInspector;
12use crate::PetriVmRuntime;
13use crate::ShutdownKind;
14use crate::VmScreenshotMeta;
15use crate::openhcl_diag::OpenHclDiagHandler;
16use crate::worker::Worker;
17use anyhow::Context;
18use async_trait::async_trait;
19use framebuffer::View;
20use futures::FutureExt;
21use futures_concurrency::future::Race;
22use get_resources::ged::FirmwareEvent;
23use hyperv_ic_resources::shutdown::ShutdownRpc;
24use mesh::CancelContext;
25use mesh::Receiver;
26use mesh::RecvError;
27use mesh::rpc::RpcError;
28use mesh::rpc::RpcSend;
29use mesh_process::Mesh;
30use openvmm_defs::rpc::PulseSaveRestoreError;
31use pal_async::socket::PolledSocket;
32use petri_artifacts_core::ResolvedArtifact;
33use pipette_client::PipetteClient;
34use std::future::Future;
35use std::path::Path;
36use std::sync::Arc;
37use std::time::Duration;
38use vmm_core_defs::HaltReason;
39use vtl2_settings_proto::Vtl2Settings;
40
41pub struct PetriVmOpenVmm {
46 inner: PetriVmInner,
47 halt: PetriVmHaltReceiver,
48}
49
50#[async_trait]
51impl PetriVmRuntime for PetriVmOpenVmm {
52 type VmInspector = OpenVmmInspector;
53 type VmFramebufferAccess = OpenVmmFramebufferAccess;
54
55 async fn teardown(self) -> anyhow::Result<()> {
56 tracing::info!("waiting for worker");
57 let worker = Arc::into_inner(self.inner.worker)
58 .context("all references to the OpenVMM worker have not been closed")?;
59 worker.shutdown().await?;
60
61 tracing::info!("Worker quit, waiting for mesh");
62 self.inner.mesh.shutdown().await;
63
64 tracing::info!("Mesh shutdown, waiting for logging tasks");
65 for t in self.inner.resources.log_stream_tasks {
66 t.await?;
67 }
68
69 Ok(())
70 }
71
72 async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReasonDetail> {
73 let halt_reason = if let Some(already) = self.halt.already_received.take() {
74 already.map_err(anyhow::Error::from)
75 } else {
76 self.halt
77 .halt_notif
78 .recv()
79 .await
80 .context("Failed to get halt reason")
81 }?;
82
83 tracing::info!(?halt_reason, "Got halt reason");
84
85 let reason = match halt_reason {
86 HaltReason::PowerOff => PetriHaltReason::PowerOff,
87 HaltReason::Reset => PetriHaltReason::Reset,
88 HaltReason::Hibernate => PetriHaltReason::Hibernate,
89 HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
90 _ => PetriHaltReason::Other,
91 };
92
93 if allow_reset && reason == PetriHaltReason::Reset {
94 self.reset().await?
95 }
96
97 Ok(PetriHaltReasonDetail {
98 reason,
99 detail: format!("{halt_reason:?}"),
100 })
101 }
102
103 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
104 Self::wait_for_agent(self, set_high_vtl).await
105 }
106
107 fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
108 self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
109 OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
110 self.inner.resources.driver.clone(),
111 path,
112 ))
113 })
114 }
115
116 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
117 Self::wait_for_boot_event(self).await
118 }
119
120 async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
121 Self::wait_for_enlightened_shutdown_ready(self)
122 .await
123 .map(|_| ())
124 }
125
126 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
127 Self::send_enlightened_shutdown(self, kind).await
128 }
129
130 async fn restart_openhcl(
131 &mut self,
132 new_openhcl: &ResolvedArtifact,
133 flags: OpenHclServicingFlags,
134 ) -> anyhow::Result<()> {
135 Self::save_openhcl(self, new_openhcl, flags).await?;
136 Self::restore_openhcl(self).await
137 }
138
139 async fn save_openhcl(
140 &mut self,
141 new_openhcl: &ResolvedArtifact,
142 flags: OpenHclServicingFlags,
143 ) -> anyhow::Result<()> {
144 Self::save_openhcl(self, new_openhcl, flags).await
145 }
146
147 async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
148 Self::restore_openhcl(self).await
149 }
150
151 async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
152 Self::update_command_line(self, command_line).await
153 }
154
155 fn inspector(&self) -> Option<OpenVmmInspector> {
156 Some(OpenVmmInspector {
157 worker: self.inner.worker.clone(),
158 })
159 }
160
161 fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
162 self.inner
163 .framebuffer_view
164 .take()
165 .map(|view| OpenVmmFramebufferAccess { view })
166 }
167
168 async fn reset(&mut self) -> anyhow::Result<()> {
169 Self::reset(self).await
170 }
171
172 async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()> {
173 Self::set_vtl2_settings(self, settings).await
174 }
175
176 async fn set_vmbus_drive(
177 &mut self,
178 _disk: &crate::Drive,
179 _controller_id: &guid::Guid,
180 _controller_location: u32,
181 ) -> anyhow::Result<()> {
182 todo!("openvmm set vmbus drive")
183 }
184
185 async fn add_pcie_device(
186 &mut self,
187 port_name: String,
188 resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>,
189 ) -> anyhow::Result<()> {
190 Self::add_pcie_device(self, port_name, resource).await
191 }
192
193 async fn remove_pcie_device(&mut self, port_name: String) -> anyhow::Result<()> {
194 Self::remove_pcie_device(self, port_name).await
195 }
196}
197
198pub(super) struct PetriVmInner {
199 pub(super) resources: PetriVmResourcesOpenVmm,
200 pub(super) mesh: Mesh,
201 pub(super) worker: Arc<Worker>,
202 pub(super) framebuffer_view: Option<View>,
203 pub(super) cidata_mounted: bool,
207 pub(super) pid: i32,
208}
209
210struct PetriVmHaltReceiver {
211 halt_notif: Receiver<HaltReason>,
212 already_received: Option<Result<HaltReason, RecvError>>,
213}
214
215macro_rules! petri_vm_fn {
218 ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
219 $(#[$($attrss)*])*
220 $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
221 Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
222 }
223 };
224}
225
226impl PetriVmOpenVmm {
229 pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
230 Self {
231 inner,
232 halt: PetriVmHaltReceiver {
233 halt_notif,
234 already_received: None,
235 },
236 }
237 }
238
239 pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
241 self.inner
242 .resources
243 .vtl2_vsock_path
244 .as_deref()
245 .context("VM is not configured with OpenHCL")
246 }
247
248 pub fn pid(&self) -> i32 {
250 self.inner.pid
251 }
252
253 petri_vm_fn!(
254 pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
257 );
258 petri_vm_fn!(
259 pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<mesh::OneshotReceiver<()>>
262 );
263 petri_vm_fn!(
264 pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
266 );
267 petri_vm_fn!(
268 pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
271 );
272 petri_vm_fn!(
273 pub async fn save_openhcl(
275 &mut self,
276 new_openhcl: &ResolvedArtifact,
277 flags: OpenHclServicingFlags
278 ) -> anyhow::Result<()>
279 );
280 petri_vm_fn!(
281 pub async fn restore_openhcl(
283 &mut self
284 ) -> anyhow::Result<()>
285 );
286 petri_vm_fn!(
287 pub async fn update_command_line(
289 &mut self,
290 command_line: &str
291 ) -> anyhow::Result<()>
292 );
293
294 petri_vm_fn!(
295 pub async fn add_pcie_device(
297 &mut self,
298 port_name: String,
299 resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>
300 ) -> anyhow::Result<()>
301 );
302 petri_vm_fn!(
303 pub async fn remove_pcie_device(
305 &mut self,
306 port_name: String
307 ) -> anyhow::Result<()>
308 );
309 petri_vm_fn!(
310 pub async fn reset(&mut self) -> anyhow::Result<()>
312 );
313 petri_vm_fn!(
314 pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
316 );
317 petri_vm_fn!(
318 pub async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()>
320 );
321
322 petri_vm_fn!(
323 pub async fn pause(&mut self) -> anyhow::Result<()>
325 );
326 petri_vm_fn!(
327 pub async fn save_state(&mut self) -> anyhow::Result<Vec<u8>>
330 );
331 petri_vm_fn!(
332 pub async fn resume(&mut self) -> anyhow::Result<()>
334 );
335 petri_vm_fn!(
336 pub async fn verify_save_restore(&mut self) -> anyhow::Result<()>
340 );
341 petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
342
343 pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
350 &mut self,
351 future: F,
352 ) -> anyhow::Result<T> {
353 Self::wait_for_halt_or_internal(&mut self.halt, future).await
354 }
355
356 async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
357 halt: &mut PetriVmHaltReceiver,
358 future: F,
359 ) -> anyhow::Result<T> {
360 let future = &mut std::pin::pin!(future);
361 enum Either<T> {
362 Future(anyhow::Result<T>),
363 Halt(Result<HaltReason, RecvError>),
364 }
365 let res = (
366 future.map(Either::Future),
367 halt.halt_notif.recv().map(Either::Halt),
368 )
369 .race()
370 .await;
371
372 match res {
373 Either::Future(Ok(success)) => Ok(success),
374 Either::Future(Err(e)) => {
375 tracing::warn!(
376 ?e,
377 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
378 );
379 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
380 c.cancelled().await;
381 Err(e)
382 }
383 Either::Halt(halt_result) => {
384 tracing::warn!(
385 halt_result = format_args!("{:x?}", halt_result),
386 "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
387 );
388 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
389 let try_again = c.until_cancelled(future).await;
390
391 match try_again {
392 Ok(fut_result) => {
393 halt.already_received = Some(halt_result);
394 if let Err(e) = &fut_result {
395 tracing::warn!(
396 ?e,
397 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
398 );
399 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
400 c.cancelled().await;
401 }
402 fut_result
403 }
404 Err(_cancel) => match halt_result {
405 Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
406 Err(e) => Err(e).context("VM disappeared"),
407 },
408 }
409 }
410 }
411 }
412}
413
414impl PetriVmInner {
415 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
416 self.resources
417 .firmware_event_recv
418 .recv()
419 .await
420 .context("Failed to get firmware boot event")
421 }
422
423 async fn wait_for_enlightened_shutdown_ready(
424 &mut self,
425 ) -> anyhow::Result<mesh::OneshotReceiver<()>> {
426 let recv = self
427 .resources
428 .shutdown_ic_send
429 .call(ShutdownRpc::WaitReady, ())
430 .await?;
431
432 Ok(recv)
433 }
434
435 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
436 let shutdown_result = self
437 .resources
438 .shutdown_ic_send
439 .call(
440 ShutdownRpc::Shutdown,
441 hyperv_ic_resources::shutdown::ShutdownParams {
442 shutdown_type: match kind {
443 ShutdownKind::Shutdown => {
444 hyperv_ic_resources::shutdown::ShutdownType::PowerOff
445 }
446 ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
447 },
448 force: false,
449 },
450 )
451 .await?;
452
453 tracing::info!(?shutdown_result, "Shutdown sent");
454 anyhow::ensure!(
455 shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
456 "Got non-Ok shutdown response"
457 );
458
459 Ok(())
460 }
461
462 async fn wait_for_kvp(
463 &mut self,
464 ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
465 tracing::info!("Waiting for KVP IC");
466 let (send, _) = self
467 .resources
468 .kvp_ic_send
469 .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
470 .await
471 .context("failed to connect to KVP IC")?;
472
473 Ok(send)
474 }
475
476 async fn save_openhcl(
477 &self,
478 new_openhcl: &ResolvedArtifact,
479 flags: OpenHclServicingFlags,
480 ) -> anyhow::Result<()> {
481 let ged_send = self
482 .resources
483 .ged_send
484 .as_ref()
485 .context("openhcl not configured")?;
486
487 let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
488 self.worker
489 .save_openhcl(ged_send, flags, igvm_file.into())
490 .await
491 }
492
493 async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
494 self.worker.update_command_line(command_line).await
495 }
496
497 async fn add_pcie_device(
498 &mut self,
499 port_name: String,
500 resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>,
501 ) -> anyhow::Result<()> {
502 self.worker.add_pcie_device(port_name, resource).await
503 }
504
505 async fn remove_pcie_device(&mut self, port_name: String) -> anyhow::Result<()> {
506 self.worker.remove_pcie_device(port_name).await
507 }
508
509 async fn restore_openhcl(&self) -> anyhow::Result<()> {
510 let ged_send = self
511 .resources
512 .ged_send
513 .as_ref()
514 .context("openhcl not configured")?;
515
516 self.worker.restore_openhcl(ged_send).await
517 }
518
519 async fn set_vtl2_settings(&self, settings: &Vtl2Settings) -> anyhow::Result<()> {
520 let ged_send = self
521 .resources
522 .ged_send
523 .as_ref()
524 .context("openhcl not configured")?;
525
526 ged_send
527 .call_failable(
528 get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
529 prost::Message::encode_to_vec(settings),
530 )
531 .await?;
532
533 Ok(())
534 }
535
536 async fn reset(&mut self) -> anyhow::Result<()> {
537 tracing::info!("Resetting VM");
538 self.worker.reset().await?;
539 self.cidata_mounted = false;
541 if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
546 agent.reset();
547
548 if self.resources.properties.using_vtl0_pipette {
549 self.launch_linux_direct_pipette().await?;
550 }
551 }
552 Ok(())
553 }
554
555 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
556 let listener = if set_high_vtl {
557 self.resources
558 .vtl2_pipette_listener
559 .as_mut()
560 .context("VM is not configured with VTL 2")?
561 } else {
562 &mut self.resources.pipette_listener
563 };
564
565 tracing::info!(set_high_vtl, "listening for pipette connection");
566 let client = loop {
567 let (conn, _) = listener
568 .accept()
569 .await
570 .context("failed to accept pipette connection")?;
571 tracing::info!(set_high_vtl, "handshaking with pipette");
572 let socket = PolledSocket::new(&self.resources.driver, conn)?;
573 match PipetteClient::new(&self.resources.driver, socket, &self.resources.output_dir)
574 .await
575 {
576 Ok(client) => break client,
577 Err(e) => {
578 tracing::warn!(
584 error = &e as &dyn std::error::Error,
585 "pipette handshake failed, retrying"
586 );
587 }
588 }
589 };
590 tracing::info!(set_high_vtl, "completed pipette handshake");
591
592 if !set_high_vtl
597 && self.resources.properties.uses_pipette_as_init
598 && self.resources.properties.has_agent_disk
599 && !self.cidata_mounted
600 {
601 tracing::info!("mounting CIDATA agent disk via pipette");
602 client
603 .unix_shell()
604 .cmd("mkdir")
605 .arg("-p")
606 .arg("/cidata")
607 .run()
608 .await
609 .context("failed to create /cidata mount point")?;
610 client
611 .unix_shell()
612 .cmd("mount")
613 .arg("LABEL=cidata")
614 .arg("/cidata")
615 .run()
616 .await
617 .context("failed to mount CIDATA disk")?;
618 self.cidata_mounted = true;
619 }
620
621 Ok(client)
622 }
623
624 async fn pause(&self) -> anyhow::Result<()> {
625 self.worker.pause().await?;
626 Ok(())
627 }
628
629 async fn save_state(&self) -> anyhow::Result<Vec<u8>> {
630 let state_msg = self.worker.save().await?;
631 Ok(mesh::payload::encode(state_msg))
632 }
633
634 async fn resume(&self) -> anyhow::Result<()> {
635 self.worker.resume().await?;
636 Ok(())
637 }
638
639 async fn verify_save_restore(&self) -> anyhow::Result<()> {
640 for i in 0..2 {
641 let result = self.worker.pulse_save_restore().await;
642 match result {
643 Ok(()) => {}
644 Err(RpcError::Channel(err)) => return Err(err.into()),
645 Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
646 tracing::warn!("Reset not supported, could not test save + restore.");
647 break;
648 }
649 Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
650 return Err(anyhow::Error::from(err))
651 .context(format!("Save + restore {i} failed."));
652 }
653 }
654 }
655
656 Ok(())
657 }
658
659 async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
660 self.resources
662 .linux_direct_serial_agent
663 .as_mut()
664 .unwrap()
665 .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
666 .await?;
667 Ok(())
668 }
669}
670
671pub struct OpenVmmInspector {
673 worker: Arc<Worker>,
674}
675
676#[async_trait]
677impl PetriVmInspector for OpenVmmInspector {
678 async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
679 Ok(self.worker.inspect_all().await)
680 }
681}
682
683pub struct OpenVmmFramebufferAccess {
685 view: View,
686}
687
688#[async_trait]
689impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
690 async fn screenshot(
691 &mut self,
692 image: &mut Vec<u8>,
693 ) -> anyhow::Result<Option<VmScreenshotMeta>> {
694 const BYTES_PER_PIXEL: usize = 4;
699 let (width, height) = self.view.resolution();
700 let (widthsize, heightsize) = (width as usize, height as usize);
701 let len = widthsize * heightsize * BYTES_PER_PIXEL;
702
703 image.resize(len, 0);
704 for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
705 self.view.read_line(i, line);
706 for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
707 pixel.swap(0, 2);
708 pixel[3] = 0xFF;
709 }
710 }
711
712 Ok(Some(VmScreenshotMeta {
713 color: image::ExtendedColorType::Rgba8,
714 width,
715 height,
716 }))
717 }
718}