1use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriHaltReasonDetail;
10use crate::PetriVmFramebufferAccess;
11use crate::PetriVmInspector;
12use crate::PetriVmRuntime;
13use crate::ShutdownKind;
14use crate::VmScreenshotMeta;
15use crate::openhcl_diag::OpenHclDiagHandler;
16use crate::worker::Worker;
17use anyhow::Context;
18use async_trait::async_trait;
19use framebuffer::View;
20use futures::FutureExt;
21use futures_concurrency::future::Race;
22use get_resources::ged::FirmwareEvent;
23use hyperv_ic_resources::shutdown::ShutdownRpc;
24use mesh::CancelContext;
25use mesh::Receiver;
26use mesh::RecvError;
27use mesh::rpc::RpcError;
28use mesh::rpc::RpcSend;
29use mesh_process::Mesh;
30use openvmm_defs::rpc::PulseSaveRestoreError;
31use pal_async::socket::PolledSocket;
32use petri_artifacts_core::ResolvedArtifact;
33use pipette_client::PipetteClient;
34use std::future::Future;
35use std::path::Path;
36use std::sync::Arc;
37use std::time::Duration;
38use vmm_core_defs::HaltReason;
39use vtl2_settings_proto::Vtl2Settings;
40
41pub struct PetriVmOpenVmm {
46 inner: PetriVmInner,
47 halt: PetriVmHaltReceiver,
48}
49
50#[async_trait]
51impl PetriVmRuntime for PetriVmOpenVmm {
52 type VmInspector = OpenVmmInspector;
53 type VmFramebufferAccess = OpenVmmFramebufferAccess;
54
55 async fn teardown(self) -> anyhow::Result<()> {
56 tracing::info!("waiting for worker");
57 let worker = Arc::into_inner(self.inner.worker)
58 .context("all references to the OpenVMM worker have not been closed")?;
59 worker.shutdown().await?;
60
61 tracing::info!("Worker quit, waiting for mesh");
62 self.inner.mesh.shutdown().await;
63
64 tracing::info!("Mesh shutdown, waiting for logging tasks");
65 for t in self.inner.resources.log_stream_tasks {
66 t.await?;
67 }
68
69 Ok(())
70 }
71
72 async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReasonDetail> {
73 let halt_reason = if let Some(already) = self.halt.already_received.take() {
74 already.map_err(anyhow::Error::from)
75 } else {
76 self.halt
77 .halt_notif
78 .recv()
79 .await
80 .context("Failed to get halt reason")
81 }?;
82
83 tracing::info!(?halt_reason, "Got halt reason");
84
85 let reason = match halt_reason {
86 HaltReason::PowerOff => PetriHaltReason::PowerOff,
87 HaltReason::Reset => PetriHaltReason::Reset,
88 HaltReason::Hibernate => PetriHaltReason::Hibernate,
89 HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
90 _ => PetriHaltReason::Other,
91 };
92
93 if allow_reset && reason == PetriHaltReason::Reset {
94 self.reset().await?
95 }
96
97 Ok(PetriHaltReasonDetail {
98 reason,
99 detail: format!("{halt_reason:?}"),
100 })
101 }
102
103 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
104 Self::wait_for_agent(self, set_high_vtl).await
105 }
106
107 fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
108 self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
109 OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
110 self.inner.resources.driver.clone(),
111 path,
112 ))
113 })
114 }
115
116 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
117 Self::wait_for_boot_event(self).await
118 }
119
120 async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
121 Self::wait_for_enlightened_shutdown_ready(self)
122 .await
123 .map(|_| ())
124 }
125
126 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
127 Self::send_enlightened_shutdown(self, kind).await
128 }
129
130 async fn restart_openhcl(
131 &mut self,
132 new_openhcl: &ResolvedArtifact,
133 flags: OpenHclServicingFlags,
134 ) -> anyhow::Result<()> {
135 Self::save_openhcl(self, new_openhcl, flags).await?;
136 Self::restore_openhcl(self).await
137 }
138
139 async fn save_openhcl(
140 &mut self,
141 new_openhcl: &ResolvedArtifact,
142 flags: OpenHclServicingFlags,
143 ) -> anyhow::Result<()> {
144 Self::save_openhcl(self, new_openhcl, flags).await
145 }
146
147 async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
148 Self::restore_openhcl(self).await
149 }
150
151 async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
152 Self::update_command_line(self, command_line).await
153 }
154
155 fn inspector(&self) -> Option<OpenVmmInspector> {
156 Some(OpenVmmInspector {
157 worker: self.inner.worker.clone(),
158 })
159 }
160
161 fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
162 self.inner
163 .framebuffer_view
164 .take()
165 .map(|view| OpenVmmFramebufferAccess { view })
166 }
167
168 async fn reset(&mut self) -> anyhow::Result<()> {
169 Self::reset(self).await
170 }
171
172 async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()> {
173 Self::set_vtl2_settings(self, settings).await
174 }
175
176 async fn set_vmbus_drive(
177 &mut self,
178 _disk: &crate::Drive,
179 _controller_id: &guid::Guid,
180 _controller_location: u32,
181 ) -> anyhow::Result<()> {
182 todo!("openvmm set vmbus drive")
183 }
184
185 async fn add_pcie_device(
186 &mut self,
187 port_name: String,
188 resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>,
189 ) -> anyhow::Result<()> {
190 Self::add_pcie_device(self, port_name, resource).await
191 }
192
193 async fn remove_pcie_device(&mut self, port_name: String) -> anyhow::Result<()> {
194 Self::remove_pcie_device(self, port_name).await
195 }
196}
197
198pub(super) struct PetriVmInner {
199 pub(super) resources: PetriVmResourcesOpenVmm,
200 pub(super) mesh: Mesh,
201 pub(super) worker: Arc<Worker>,
202 pub(super) framebuffer_view: Option<View>,
203 pub(super) cidata_mounted: bool,
207 pub(super) tcp_pipette_port: Option<u16>,
210 pub(super) pid: i32,
211}
212
213struct PetriVmHaltReceiver {
214 halt_notif: Receiver<HaltReason>,
215 already_received: Option<Result<HaltReason, RecvError>>,
216}
217
218macro_rules! petri_vm_fn {
221 ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
222 $(#[$($attrss)*])*
223 $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
224 Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
225 }
226 };
227}
228
229impl PetriVmOpenVmm {
232 pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
233 Self {
234 inner,
235 halt: PetriVmHaltReceiver {
236 halt_notif,
237 already_received: None,
238 },
239 }
240 }
241
242 pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
244 self.inner
245 .resources
246 .vtl2_vsock_path
247 .as_deref()
248 .context("VM is not configured with OpenHCL")
249 }
250
251 pub fn pid(&self) -> i32 {
253 self.inner.pid
254 }
255
256 petri_vm_fn!(
257 pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
260 );
261 petri_vm_fn!(
262 pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<Option<mesh::OneshotReceiver<()>>>
266 );
267 petri_vm_fn!(
268 pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
270 );
271 petri_vm_fn!(
272 pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
275 );
276 petri_vm_fn!(
277 pub async fn save_openhcl(
279 &mut self,
280 new_openhcl: &ResolvedArtifact,
281 flags: OpenHclServicingFlags
282 ) -> anyhow::Result<()>
283 );
284 petri_vm_fn!(
285 pub async fn restore_openhcl(
287 &mut self
288 ) -> anyhow::Result<()>
289 );
290 petri_vm_fn!(
291 pub async fn update_command_line(
293 &mut self,
294 command_line: &str
295 ) -> anyhow::Result<()>
296 );
297
298 petri_vm_fn!(
299 pub async fn add_pcie_device(
301 &mut self,
302 port_name: String,
303 resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>
304 ) -> anyhow::Result<()>
305 );
306 petri_vm_fn!(
307 pub async fn remove_pcie_device(
309 &mut self,
310 port_name: String
311 ) -> anyhow::Result<()>
312 );
313 petri_vm_fn!(
314 pub async fn reset(&mut self) -> anyhow::Result<()>
316 );
317 petri_vm_fn!(
318 pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
320 );
321 petri_vm_fn!(
322 pub async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()>
324 );
325
326 petri_vm_fn!(
327 pub async fn pause(&mut self) -> anyhow::Result<()>
329 );
330 petri_vm_fn!(
331 pub async fn save_state(&mut self) -> anyhow::Result<Vec<u8>>
334 );
335 petri_vm_fn!(
336 pub async fn resume(&mut self) -> anyhow::Result<()>
338 );
339 petri_vm_fn!(
340 pub async fn verify_save_restore(&mut self) -> anyhow::Result<()>
344 );
345 petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
346
347 pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
354 &mut self,
355 future: F,
356 ) -> anyhow::Result<T> {
357 Self::wait_for_halt_or_internal(&mut self.halt, future).await
358 }
359
360 async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
361 halt: &mut PetriVmHaltReceiver,
362 future: F,
363 ) -> anyhow::Result<T> {
364 let future = &mut std::pin::pin!(future);
365 enum Either<T> {
366 Future(anyhow::Result<T>),
367 Halt(Result<HaltReason, RecvError>),
368 }
369 let res = (
370 future.map(Either::Future),
371 halt.halt_notif.recv().map(Either::Halt),
372 )
373 .race()
374 .await;
375
376 match res {
377 Either::Future(Ok(success)) => Ok(success),
378 Either::Future(Err(e)) => {
379 tracing::warn!(
380 ?e,
381 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
382 );
383 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
384 c.cancelled().await;
385 Err(e)
386 }
387 Either::Halt(halt_result) => {
388 tracing::warn!(
389 halt_result = format_args!("{:x?}", halt_result),
390 "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
391 );
392 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
393 let try_again = c.until_cancelled(future).await;
394
395 match try_again {
396 Ok(fut_result) => {
397 halt.already_received = Some(halt_result);
398 if let Err(e) = &fut_result {
399 tracing::warn!(
400 ?e,
401 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
402 );
403 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
404 c.cancelled().await;
405 }
406 fut_result
407 }
408 Err(_cancel) => match halt_result {
409 Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
410 Err(e) => Err(e).context("VM disappeared"),
411 },
412 }
413 }
414 }
415 }
416}
417
418impl PetriVmInner {
419 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
420 self.resources
421 .firmware_event_recv
422 .recv()
423 .await
424 .context("Failed to get firmware boot event")
425 }
426
427 async fn wait_for_enlightened_shutdown_ready(
428 &mut self,
429 ) -> anyhow::Result<Option<mesh::OneshotReceiver<()>>> {
430 let Some(send) = self.resources.shutdown_ic_send.as_ref() else {
431 return Ok(None);
432 };
433 let recv = send
434 .call(ShutdownRpc::WaitReady, ())
435 .await
436 .context("waiting for shutdown IC to be ready")?;
437 Ok(Some(recv))
438 }
439
440 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
441 let send = self
442 .resources
443 .shutdown_ic_send
444 .as_ref()
445 .context("shutdown IC not configured")?;
446 let shutdown_result = send
447 .call(
448 ShutdownRpc::Shutdown,
449 hyperv_ic_resources::shutdown::ShutdownParams {
450 shutdown_type: match kind {
451 ShutdownKind::Shutdown => {
452 hyperv_ic_resources::shutdown::ShutdownType::PowerOff
453 }
454 ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
455 },
456 force: false,
457 },
458 )
459 .await?;
460
461 tracing::info!(?shutdown_result, "Shutdown sent");
462 anyhow::ensure!(
463 shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
464 "Got non-Ok shutdown response"
465 );
466
467 Ok(())
468 }
469
470 async fn wait_for_kvp(
471 &mut self,
472 ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
473 tracing::info!("Waiting for KVP IC");
474 let send = self
475 .resources
476 .kvp_ic_send
477 .as_ref()
478 .context("KVP IC not configured")?;
479 let (send, _) = send
480 .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
481 .await
482 .context("failed to connect to KVP IC")?;
483
484 Ok(send)
485 }
486
487 async fn save_openhcl(
488 &self,
489 new_openhcl: &ResolvedArtifact,
490 flags: OpenHclServicingFlags,
491 ) -> anyhow::Result<()> {
492 let ged_send = self
493 .resources
494 .ged_send
495 .as_ref()
496 .context("openhcl not configured")?;
497
498 let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
499 self.worker
500 .save_openhcl(ged_send, flags, igvm_file.into())
501 .await
502 }
503
504 async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
505 self.worker.update_command_line(command_line).await
506 }
507
508 async fn add_pcie_device(
509 &mut self,
510 port_name: String,
511 resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>,
512 ) -> anyhow::Result<()> {
513 self.worker.add_pcie_device(port_name, resource).await
514 }
515
516 async fn remove_pcie_device(&mut self, port_name: String) -> anyhow::Result<()> {
517 self.worker.remove_pcie_device(port_name).await
518 }
519
520 async fn restore_openhcl(&self) -> anyhow::Result<()> {
521 let ged_send = self
522 .resources
523 .ged_send
524 .as_ref()
525 .context("openhcl not configured")?;
526
527 self.worker.restore_openhcl(ged_send).await
528 }
529
530 async fn set_vtl2_settings(&self, settings: &Vtl2Settings) -> anyhow::Result<()> {
531 let ged_send = self
532 .resources
533 .ged_send
534 .as_ref()
535 .context("openhcl not configured")?;
536
537 ged_send
538 .call_failable(
539 get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
540 prost::Message::encode_to_vec(settings),
541 )
542 .await?;
543
544 Ok(())
545 }
546
547 async fn reset(&mut self) -> anyhow::Result<()> {
548 tracing::info!("Resetting VM");
549 self.worker.reset().await?;
550 self.cidata_mounted = false;
552 if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
557 agent.reset();
558
559 if self.resources.properties.using_vtl0_pipette {
560 self.launch_linux_direct_pipette().await?;
561 }
562 }
563 Ok(())
564 }
565
566 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
567 if let Some(port) = self.tcp_pipette_port {
569 assert!(!set_high_vtl, "TCP pipette transport does not support VTL2");
570 return self.wait_for_agent_tcp(port).await;
571 }
572
573 let listener = if set_high_vtl {
574 self.resources
575 .vtl2_pipette_listener
576 .as_mut()
577 .context("VM is not configured with VTL 2")?
578 } else {
579 &mut self.resources.pipette_listener
580 };
581
582 tracing::info!(set_high_vtl, "listening for pipette connection");
583 let client = loop {
584 let (conn, _) = listener
585 .accept()
586 .await
587 .context("failed to accept pipette connection")?;
588 tracing::info!(set_high_vtl, "handshaking with pipette");
589 let socket = PolledSocket::new(&self.resources.driver, conn)?;
590 match PipetteClient::new(&self.resources.driver, socket, &self.resources.output_dir)
591 .await
592 {
593 Ok(client) => break client,
594 Err(e) => {
595 tracing::warn!(
601 error = e.as_ref() as &dyn std::error::Error,
602 "pipette connection not live, retrying"
603 );
604 }
605 }
606 };
607 tracing::info!(set_high_vtl, "completed pipette handshake");
608
609 if !set_high_vtl
614 && self.resources.properties.uses_pipette_as_init
615 && self.resources.properties.has_agent_disk
616 && !self.cidata_mounted
617 {
618 tracing::info!("mounting CIDATA agent disk via pipette");
619 client
620 .unix_shell()
621 .cmd("mkdir")
622 .arg("-p")
623 .arg("/cidata")
624 .run()
625 .await
626 .context("failed to create /cidata mount point")?;
627 client
628 .unix_shell()
629 .cmd("mount")
630 .arg("LABEL=cidata")
631 .arg("/cidata")
632 .run()
633 .await
634 .context("failed to mount CIDATA disk")?;
635 self.cidata_mounted = true;
636 }
637
638 Ok(client)
639 }
640
641 async fn wait_for_agent_tcp(&mut self, port: u16) -> anyhow::Result<PipetteClient> {
647 tracing::info!(port, "connecting to pipette via TCP");
648 let addr = std::net::SocketAddr::from((std::net::Ipv4Addr::LOCALHOST, port));
649 let client = loop {
650 match PolledSocket::connect_tcp(&self.resources.driver, addr).await {
651 Ok(socket) => {
652 socket
653 .get()
654 .set_nodelay(true)
655 .context("failed to set TCP_NODELAY")?;
656 tracing::info!("TCP connected, handshaking with pipette");
657 match PipetteClient::new(
658 &self.resources.driver,
659 socket,
660 &self.resources.output_dir,
661 )
662 .await
663 {
664 Ok(client) => break client,
665 Err(e) => {
666 tracing::warn!(
667 error = e.as_ref() as &dyn std::error::Error,
668 "pipette TCP connection failed, retrying"
669 );
670 }
671 }
672 }
673 Err(e) => {
674 tracing::debug!(
675 error = &e as &dyn std::error::Error,
676 "TCP connect failed, guest not ready yet"
677 );
678 }
679 }
680 pal_async::timer::PolledTimer::new(&self.resources.driver)
682 .sleep(Duration::from_secs(1))
683 .await;
684 };
685 tracing::info!("completed pipette TCP handshake");
686 Ok(client)
687 }
688
689 async fn pause(&self) -> anyhow::Result<()> {
690 self.worker.pause().await?;
691 Ok(())
692 }
693
694 async fn save_state(&self) -> anyhow::Result<Vec<u8>> {
695 let state_msg = self.worker.save().await?;
696 Ok(mesh::payload::encode(state_msg))
697 }
698
699 async fn resume(&self) -> anyhow::Result<()> {
700 self.worker.resume().await?;
701 Ok(())
702 }
703
704 async fn verify_save_restore(&self) -> anyhow::Result<()> {
705 for i in 0..2 {
706 let result = self.worker.pulse_save_restore().await;
707 match result {
708 Ok(()) => {}
709 Err(RpcError::Channel(err)) => return Err(err.into()),
710 Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
711 tracing::warn!("Reset not supported, could not test save + restore.");
712 break;
713 }
714 Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
715 return Err(anyhow::Error::from(err))
716 .context(format!("Save + restore {i} failed."));
717 }
718 }
719 }
720
721 Ok(())
722 }
723
724 async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
725 self.resources
727 .linux_direct_serial_agent
728 .as_mut()
729 .unwrap()
730 .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
731 .await?;
732 Ok(())
733 }
734}
735
736pub struct OpenVmmInspector {
738 worker: Arc<Worker>,
739}
740
741#[async_trait]
742impl PetriVmInspector for OpenVmmInspector {
743 async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
744 Ok(self.worker.inspect_all().await)
745 }
746}
747
748pub struct OpenVmmFramebufferAccess {
750 view: View,
751}
752
753#[async_trait]
754impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
755 async fn screenshot(
756 &mut self,
757 image: &mut Vec<u8>,
758 ) -> anyhow::Result<Option<VmScreenshotMeta>> {
759 const BYTES_PER_PIXEL: usize = 4;
764 let (width, height) = self.view.resolution();
765 let (widthsize, heightsize) = (width as usize, height as usize);
766 let len = widthsize * heightsize * BYTES_PER_PIXEL;
767
768 image.resize(len, 0);
769 for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
770 self.view.read_line(i, line);
771 for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
772 pixel.swap(0, 2);
773 pixel[3] = 0xFF;
774 }
775 }
776
777 Ok(Some(VmScreenshotMeta {
778 color: image::ExtendedColorType::Rgba8,
779 width,
780 height,
781 }))
782 }
783}