1use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriVmFramebufferAccess;
10use crate::PetriVmInspector;
11use crate::PetriVmRuntime;
12use crate::ShutdownKind;
13use crate::VmScreenshotMeta;
14use crate::openhcl_diag::OpenHclDiagHandler;
15use crate::worker::Worker;
16use anyhow::Context;
17use async_trait::async_trait;
18use framebuffer::View;
19use futures::FutureExt;
20use futures_concurrency::future::Race;
21use get_resources::ged::FirmwareEvent;
22use hyperv_ic_resources::shutdown::ShutdownRpc;
23use mesh::CancelContext;
24use mesh::Receiver;
25use mesh::RecvError;
26use mesh::rpc::RpcError;
27use mesh::rpc::RpcSend;
28use mesh_process::Mesh;
29use openvmm_defs::rpc::PulseSaveRestoreError;
30use pal_async::socket::PolledSocket;
31use petri_artifacts_core::ResolvedArtifact;
32use pipette_client::PipetteClient;
33use std::future::Future;
34use std::path::Path;
35use std::sync::Arc;
36use std::time::Duration;
37use vmm_core_defs::HaltReason;
38use vtl2_settings_proto::Vtl2Settings;
39
40pub struct PetriVmOpenVmm {
45 inner: PetriVmInner,
46 halt: PetriVmHaltReceiver,
47}
48
49#[async_trait]
50impl PetriVmRuntime for PetriVmOpenVmm {
51 type VmInspector = OpenVmmInspector;
52 type VmFramebufferAccess = OpenVmmFramebufferAccess;
53
54 async fn teardown(self) -> anyhow::Result<()> {
55 tracing::info!("waiting for worker");
56 let worker = Arc::into_inner(self.inner.worker)
57 .context("all references to the OpenVMM worker have not been closed")?;
58 worker.shutdown().await?;
59
60 tracing::info!("Worker quit, waiting for mesh");
61 self.inner.mesh.shutdown().await;
62
63 tracing::info!("Mesh shutdown, waiting for logging tasks");
64 for t in self.inner.resources.log_stream_tasks {
65 t.await?;
66 }
67
68 Ok(())
69 }
70
71 async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReason> {
72 let halt_reason = if let Some(already) = self.halt.already_received.take() {
73 already.map_err(anyhow::Error::from)
74 } else {
75 self.halt
76 .halt_notif
77 .recv()
78 .await
79 .context("Failed to get halt reason")
80 }?;
81
82 tracing::info!(?halt_reason, "Got halt reason");
83
84 let halt_reason = match halt_reason {
85 HaltReason::PowerOff => PetriHaltReason::PowerOff,
86 HaltReason::Reset => PetriHaltReason::Reset,
87 HaltReason::Hibernate => PetriHaltReason::Hibernate,
88 HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
89 _ => PetriHaltReason::Other,
90 };
91
92 if allow_reset && halt_reason == PetriHaltReason::Reset {
93 self.reset().await?
94 }
95
96 Ok(halt_reason)
97 }
98
99 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
100 Self::wait_for_agent(self, set_high_vtl).await
101 }
102
103 fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
104 self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
105 OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
106 self.inner.resources.driver.clone(),
107 path,
108 ))
109 })
110 }
111
112 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
113 Self::wait_for_boot_event(self).await
114 }
115
116 async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
117 Self::wait_for_enlightened_shutdown_ready(self)
118 .await
119 .map(|_| ())
120 }
121
122 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
123 Self::send_enlightened_shutdown(self, kind).await
124 }
125
126 async fn restart_openhcl(
127 &mut self,
128 new_openhcl: &ResolvedArtifact,
129 flags: OpenHclServicingFlags,
130 ) -> anyhow::Result<()> {
131 Self::save_openhcl(self, new_openhcl, flags).await?;
132 Self::restore_openhcl(self).await
133 }
134
135 async fn save_openhcl(
136 &mut self,
137 new_openhcl: &ResolvedArtifact,
138 flags: OpenHclServicingFlags,
139 ) -> anyhow::Result<()> {
140 Self::save_openhcl(self, new_openhcl, flags).await
141 }
142
143 async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
144 Self::restore_openhcl(self).await
145 }
146
147 async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
148 Self::update_command_line(self, command_line).await
149 }
150
151 fn inspector(&self) -> Option<OpenVmmInspector> {
152 Some(OpenVmmInspector {
153 worker: self.inner.worker.clone(),
154 })
155 }
156
157 fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
158 self.inner
159 .framebuffer_view
160 .take()
161 .map(|view| OpenVmmFramebufferAccess { view })
162 }
163
164 async fn reset(&mut self) -> anyhow::Result<()> {
165 Self::reset(self).await
166 }
167
168 async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()> {
169 Self::set_vtl2_settings(self, settings).await
170 }
171
172 async fn set_vmbus_drive(
173 &mut self,
174 _disk: &crate::Drive,
175 _controller_id: &guid::Guid,
176 _controller_location: u32,
177 ) -> anyhow::Result<()> {
178 todo!("openvmm set vmbus drive")
179 }
180
181 async fn add_pcie_device(
182 &mut self,
183 port_name: String,
184 resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>,
185 ) -> anyhow::Result<()> {
186 Self::add_pcie_device(self, port_name, resource).await
187 }
188
189 async fn remove_pcie_device(&mut self, port_name: String) -> anyhow::Result<()> {
190 Self::remove_pcie_device(self, port_name).await
191 }
192}
193
194pub(super) struct PetriVmInner {
195 pub(super) resources: PetriVmResourcesOpenVmm,
196 pub(super) mesh: Mesh,
197 pub(super) worker: Arc<Worker>,
198 pub(super) framebuffer_view: Option<View>,
199 pub(super) cidata_mounted: bool,
203 pub(super) pid: i32,
204}
205
206struct PetriVmHaltReceiver {
207 halt_notif: Receiver<HaltReason>,
208 already_received: Option<Result<HaltReason, RecvError>>,
209}
210
211macro_rules! petri_vm_fn {
214 ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
215 $(#[$($attrss)*])*
216 $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
217 Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
218 }
219 };
220}
221
222impl PetriVmOpenVmm {
225 pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
226 Self {
227 inner,
228 halt: PetriVmHaltReceiver {
229 halt_notif,
230 already_received: None,
231 },
232 }
233 }
234
235 pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
237 self.inner
238 .resources
239 .vtl2_vsock_path
240 .as_deref()
241 .context("VM is not configured with OpenHCL")
242 }
243
244 pub fn pid(&self) -> i32 {
246 self.inner.pid
247 }
248
249 petri_vm_fn!(
250 pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
253 );
254 petri_vm_fn!(
255 pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<mesh::OneshotReceiver<()>>
258 );
259 petri_vm_fn!(
260 pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
262 );
263 petri_vm_fn!(
264 pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
267 );
268 petri_vm_fn!(
269 pub async fn save_openhcl(
271 &mut self,
272 new_openhcl: &ResolvedArtifact,
273 flags: OpenHclServicingFlags
274 ) -> anyhow::Result<()>
275 );
276 petri_vm_fn!(
277 pub async fn restore_openhcl(
279 &mut self
280 ) -> anyhow::Result<()>
281 );
282 petri_vm_fn!(
283 pub async fn update_command_line(
285 &mut self,
286 command_line: &str
287 ) -> anyhow::Result<()>
288 );
289
290 petri_vm_fn!(
291 pub async fn add_pcie_device(
293 &mut self,
294 port_name: String,
295 resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>
296 ) -> anyhow::Result<()>
297 );
298 petri_vm_fn!(
299 pub async fn remove_pcie_device(
301 &mut self,
302 port_name: String
303 ) -> anyhow::Result<()>
304 );
305 petri_vm_fn!(
306 pub async fn reset(&mut self) -> anyhow::Result<()>
308 );
309 petri_vm_fn!(
310 pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
312 );
313 petri_vm_fn!(
314 pub async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()>
316 );
317
318 petri_vm_fn!(
319 pub async fn pause(&mut self) -> anyhow::Result<()>
321 );
322 petri_vm_fn!(
323 pub async fn save_state(&mut self) -> anyhow::Result<Vec<u8>>
326 );
327 petri_vm_fn!(
328 pub async fn resume(&mut self) -> anyhow::Result<()>
330 );
331 petri_vm_fn!(
332 pub async fn verify_save_restore(&mut self) -> anyhow::Result<()>
336 );
337 petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
338
339 pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
346 &mut self,
347 future: F,
348 ) -> anyhow::Result<T> {
349 Self::wait_for_halt_or_internal(&mut self.halt, future).await
350 }
351
352 async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
353 halt: &mut PetriVmHaltReceiver,
354 future: F,
355 ) -> anyhow::Result<T> {
356 let future = &mut std::pin::pin!(future);
357 enum Either<T> {
358 Future(anyhow::Result<T>),
359 Halt(Result<HaltReason, RecvError>),
360 }
361 let res = (
362 future.map(Either::Future),
363 halt.halt_notif.recv().map(Either::Halt),
364 )
365 .race()
366 .await;
367
368 match res {
369 Either::Future(Ok(success)) => Ok(success),
370 Either::Future(Err(e)) => {
371 tracing::warn!(
372 ?e,
373 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
374 );
375 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
376 c.cancelled().await;
377 Err(e)
378 }
379 Either::Halt(halt_result) => {
380 tracing::warn!(
381 halt_result = format_args!("{:x?}", halt_result),
382 "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
383 );
384 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
385 let try_again = c.until_cancelled(future).await;
386
387 match try_again {
388 Ok(fut_result) => {
389 halt.already_received = Some(halt_result);
390 if let Err(e) = &fut_result {
391 tracing::warn!(
392 ?e,
393 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
394 );
395 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
396 c.cancelled().await;
397 }
398 fut_result
399 }
400 Err(_cancel) => match halt_result {
401 Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
402 Err(e) => Err(e).context("VM disappeared"),
403 },
404 }
405 }
406 }
407 }
408}
409
410impl PetriVmInner {
411 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
412 self.resources
413 .firmware_event_recv
414 .recv()
415 .await
416 .context("Failed to get firmware boot event")
417 }
418
419 async fn wait_for_enlightened_shutdown_ready(
420 &mut self,
421 ) -> anyhow::Result<mesh::OneshotReceiver<()>> {
422 let recv = self
423 .resources
424 .shutdown_ic_send
425 .call(ShutdownRpc::WaitReady, ())
426 .await?;
427
428 Ok(recv)
429 }
430
431 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
432 let shutdown_result = self
433 .resources
434 .shutdown_ic_send
435 .call(
436 ShutdownRpc::Shutdown,
437 hyperv_ic_resources::shutdown::ShutdownParams {
438 shutdown_type: match kind {
439 ShutdownKind::Shutdown => {
440 hyperv_ic_resources::shutdown::ShutdownType::PowerOff
441 }
442 ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
443 },
444 force: false,
445 },
446 )
447 .await?;
448
449 tracing::info!(?shutdown_result, "Shutdown sent");
450 anyhow::ensure!(
451 shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
452 "Got non-Ok shutdown response"
453 );
454
455 Ok(())
456 }
457
458 async fn wait_for_kvp(
459 &mut self,
460 ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
461 tracing::info!("Waiting for KVP IC");
462 let (send, _) = self
463 .resources
464 .kvp_ic_send
465 .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
466 .await
467 .context("failed to connect to KVP IC")?;
468
469 Ok(send)
470 }
471
472 async fn save_openhcl(
473 &self,
474 new_openhcl: &ResolvedArtifact,
475 flags: OpenHclServicingFlags,
476 ) -> anyhow::Result<()> {
477 let ged_send = self
478 .resources
479 .ged_send
480 .as_ref()
481 .context("openhcl not configured")?;
482
483 let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
484 self.worker
485 .save_openhcl(ged_send, flags, igvm_file.into())
486 .await
487 }
488
489 async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
490 self.worker.update_command_line(command_line).await
491 }
492
493 async fn add_pcie_device(
494 &mut self,
495 port_name: String,
496 resource: vm_resource::Resource<vm_resource::kind::PciDeviceHandleKind>,
497 ) -> anyhow::Result<()> {
498 self.worker.add_pcie_device(port_name, resource).await
499 }
500
501 async fn remove_pcie_device(&mut self, port_name: String) -> anyhow::Result<()> {
502 self.worker.remove_pcie_device(port_name).await
503 }
504
505 async fn restore_openhcl(&self) -> anyhow::Result<()> {
506 let ged_send = self
507 .resources
508 .ged_send
509 .as_ref()
510 .context("openhcl not configured")?;
511
512 self.worker.restore_openhcl(ged_send).await
513 }
514
515 async fn set_vtl2_settings(&self, settings: &Vtl2Settings) -> anyhow::Result<()> {
516 let ged_send = self
517 .resources
518 .ged_send
519 .as_ref()
520 .context("openhcl not configured")?;
521
522 ged_send
523 .call_failable(
524 get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
525 prost::Message::encode_to_vec(settings),
526 )
527 .await?;
528
529 Ok(())
530 }
531
532 async fn reset(&mut self) -> anyhow::Result<()> {
533 tracing::info!("Resetting VM");
534 self.worker.reset().await?;
535 self.cidata_mounted = false;
537 if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
542 agent.reset();
543
544 if self.resources.properties.using_vtl0_pipette {
545 self.launch_linux_direct_pipette().await?;
546 }
547 }
548 Ok(())
549 }
550
551 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
552 let listener = if set_high_vtl {
553 self.resources
554 .vtl2_pipette_listener
555 .as_mut()
556 .context("VM is not configured with VTL 2")?
557 } else {
558 &mut self.resources.pipette_listener
559 };
560
561 tracing::info!(set_high_vtl, "listening for pipette connection");
562 let (conn, _) = listener
563 .accept()
564 .await
565 .context("failed to accept pipette connection")?;
566 tracing::info!(set_high_vtl, "handshaking with pipette");
567 let client = PipetteClient::new(
568 &self.resources.driver,
569 PolledSocket::new(&self.resources.driver, conn)?,
570 &self.resources.output_dir,
571 )
572 .await
573 .context("failed to connect to pipette")?;
574 tracing::info!(set_high_vtl, "completed pipette handshake");
575
576 if !set_high_vtl
581 && self.resources.properties.uses_pipette_as_init
582 && self.resources.properties.has_agent_disk
583 && !self.cidata_mounted
584 {
585 tracing::info!("mounting CIDATA agent disk via pipette");
586 client
587 .unix_shell()
588 .cmd("mkdir")
589 .arg("-p")
590 .arg("/cidata")
591 .run()
592 .await
593 .context("failed to create /cidata mount point")?;
594 client
595 .unix_shell()
596 .cmd("mount")
597 .arg("LABEL=cidata")
598 .arg("/cidata")
599 .run()
600 .await
601 .context("failed to mount CIDATA disk")?;
602 self.cidata_mounted = true;
603 }
604
605 Ok(client)
606 }
607
608 async fn pause(&self) -> anyhow::Result<()> {
609 self.worker.pause().await?;
610 Ok(())
611 }
612
613 async fn save_state(&self) -> anyhow::Result<Vec<u8>> {
614 let state_msg = self.worker.save().await?;
615 Ok(mesh::payload::encode(state_msg))
616 }
617
618 async fn resume(&self) -> anyhow::Result<()> {
619 self.worker.resume().await?;
620 Ok(())
621 }
622
623 async fn verify_save_restore(&self) -> anyhow::Result<()> {
624 for i in 0..2 {
625 let result = self.worker.pulse_save_restore().await;
626 match result {
627 Ok(()) => {}
628 Err(RpcError::Channel(err)) => return Err(err.into()),
629 Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
630 tracing::warn!("Reset not supported, could not test save + restore.");
631 break;
632 }
633 Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
634 return Err(anyhow::Error::from(err))
635 .context(format!("Save + restore {i} failed."));
636 }
637 }
638 }
639
640 Ok(())
641 }
642
643 async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
644 self.resources
646 .linux_direct_serial_agent
647 .as_mut()
648 .unwrap()
649 .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
650 .await?;
651 Ok(())
652 }
653}
654
655pub struct OpenVmmInspector {
657 worker: Arc<Worker>,
658}
659
660#[async_trait]
661impl PetriVmInspector for OpenVmmInspector {
662 async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
663 Ok(self.worker.inspect_all().await)
664 }
665}
666
667pub struct OpenVmmFramebufferAccess {
669 view: View,
670}
671
672#[async_trait]
673impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
674 async fn screenshot(
675 &mut self,
676 image: &mut Vec<u8>,
677 ) -> anyhow::Result<Option<VmScreenshotMeta>> {
678 const BYTES_PER_PIXEL: usize = 4;
683 let (width, height) = self.view.resolution();
684 let (widthsize, heightsize) = (width as usize, height as usize);
685 let len = widthsize * heightsize * BYTES_PER_PIXEL;
686
687 image.resize(len, 0);
688 for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
689 self.view.read_line(i, line);
690 for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
691 pixel.swap(0, 2);
692 pixel[3] = 0xFF;
693 }
694 }
695
696 Ok(Some(VmScreenshotMeta {
697 color: image::ExtendedColorType::Rgba8,
698 width,
699 height,
700 }))
701 }
702}