1use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriVmFramebufferAccess;
10use crate::PetriVmInspector;
11use crate::PetriVmRuntime;
12use crate::ShutdownKind;
13use crate::VmScreenshotMeta;
14use crate::openhcl_diag::OpenHclDiagHandler;
15use crate::worker::Worker;
16use anyhow::Context;
17use async_trait::async_trait;
18use framebuffer::View;
19use futures::FutureExt;
20use futures_concurrency::future::Race;
21use get_resources::ged::FirmwareEvent;
22use hvlite_defs::rpc::PulseSaveRestoreError;
23use hyperv_ic_resources::shutdown::ShutdownRpc;
24use mesh::CancelContext;
25use mesh::Receiver;
26use mesh::RecvError;
27use mesh::rpc::RpcError;
28use mesh::rpc::RpcSend;
29use mesh_process::Mesh;
30use pal_async::socket::PolledSocket;
31use petri_artifacts_core::ResolvedArtifact;
32use pipette_client::PipetteClient;
33use std::future::Future;
34use std::path::Path;
35use std::sync::Arc;
36use std::time::Duration;
37use vmm_core_defs::HaltReason;
38use vtl2_settings_proto::Vtl2Settings;
39
40pub struct PetriVmOpenVmm {
45 inner: PetriVmInner,
46 halt: PetriVmHaltReceiver,
47}
48
49#[async_trait]
50impl PetriVmRuntime for PetriVmOpenVmm {
51 type VmInspector = OpenVmmInspector;
52 type VmFramebufferAccess = OpenVmmFramebufferAccess;
53
54 async fn teardown(self) -> anyhow::Result<()> {
55 tracing::info!("waiting for worker");
56 let worker = Arc::into_inner(self.inner.worker)
57 .context("all references to the OpenVMM worker have not been closed")?;
58 worker.shutdown().await?;
59
60 tracing::info!("Worker quit, waiting for mesh");
61 self.inner.mesh.shutdown().await;
62
63 tracing::info!("Mesh shutdown, waiting for logging tasks");
64 for t in self.inner.resources.log_stream_tasks {
65 t.await?;
66 }
67
68 Ok(())
69 }
70
71 async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReason> {
72 let halt_reason = if let Some(already) = self.halt.already_received.take() {
73 already.map_err(anyhow::Error::from)
74 } else {
75 self.halt
76 .halt_notif
77 .recv()
78 .await
79 .context("Failed to get halt reason")
80 }?;
81
82 tracing::info!(?halt_reason, "Got halt reason");
83
84 let halt_reason = match halt_reason {
85 HaltReason::PowerOff => PetriHaltReason::PowerOff,
86 HaltReason::Reset => PetriHaltReason::Reset,
87 HaltReason::Hibernate => PetriHaltReason::Hibernate,
88 HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
89 _ => PetriHaltReason::Other,
90 };
91
92 if allow_reset && halt_reason == PetriHaltReason::Reset {
93 self.reset().await?
94 }
95
96 Ok(halt_reason)
97 }
98
99 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
100 Self::wait_for_agent(self, set_high_vtl).await
101 }
102
103 fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
104 self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
105 OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
106 self.inner.resources.driver.clone(),
107 path,
108 ))
109 })
110 }
111
112 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
113 Self::wait_for_boot_event(self).await
114 }
115
116 async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
117 Self::wait_for_enlightened_shutdown_ready(self)
118 .await
119 .map(|_| ())
120 }
121
122 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
123 Self::send_enlightened_shutdown(self, kind).await
124 }
125
126 async fn restart_openhcl(
127 &mut self,
128 new_openhcl: &ResolvedArtifact,
129 flags: OpenHclServicingFlags,
130 ) -> anyhow::Result<()> {
131 Self::save_openhcl(self, new_openhcl, flags).await?;
132 Self::restore_openhcl(self).await
133 }
134
135 async fn save_openhcl(
136 &mut self,
137 new_openhcl: &ResolvedArtifact,
138 flags: OpenHclServicingFlags,
139 ) -> anyhow::Result<()> {
140 Self::save_openhcl(self, new_openhcl, flags).await
141 }
142
143 async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
144 Self::restore_openhcl(self).await
145 }
146
147 fn inspector(&self) -> Option<OpenVmmInspector> {
148 Some(OpenVmmInspector {
149 worker: self.inner.worker.clone(),
150 })
151 }
152
153 fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
154 self.inner
155 .framebuffer_view
156 .take()
157 .map(|view| OpenVmmFramebufferAccess { view })
158 }
159
160 async fn reset(&mut self) -> anyhow::Result<()> {
161 Self::reset(self).await
162 }
163}
164
165pub(super) struct PetriVmInner {
166 pub(super) resources: PetriVmResourcesOpenVmm,
167 pub(super) mesh: Mesh,
168 pub(super) worker: Arc<Worker>,
169 pub(super) framebuffer_view: Option<View>,
170}
171
172struct PetriVmHaltReceiver {
173 halt_notif: Receiver<HaltReason>,
174 already_received: Option<Result<HaltReason, RecvError>>,
175}
176
177macro_rules! petri_vm_fn {
180 ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
181 $(#[$($attrss)*])*
182 $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
183 Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
184 }
185 };
186}
187
188impl PetriVmOpenVmm {
191 pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
192 Self {
193 inner,
194 halt: PetriVmHaltReceiver {
195 halt_notif,
196 already_received: None,
197 },
198 }
199 }
200
201 pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
203 self.inner
204 .resources
205 .vtl2_vsock_path
206 .as_deref()
207 .context("VM is not configured with OpenHCL")
208 }
209
210 petri_vm_fn!(
211 pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
214 );
215 petri_vm_fn!(
216 pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<mesh::OneshotReceiver<()>>
219 );
220 petri_vm_fn!(
221 pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
223 );
224 petri_vm_fn!(
225 pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
228 );
229 petri_vm_fn!(
230 pub async fn save_openhcl(
232 &mut self,
233 new_openhcl: &ResolvedArtifact,
234 flags: OpenHclServicingFlags
235 ) -> anyhow::Result<()>
236 );
237 petri_vm_fn!(
238 pub async fn restore_openhcl(
240 &mut self
241 ) -> anyhow::Result<()>
242 );
243 petri_vm_fn!(
244 pub async fn reset(&mut self) -> anyhow::Result<()>
246 );
247 petri_vm_fn!(
248 pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
250 );
251 petri_vm_fn!(
252 pub async fn modify_vtl2_settings(&mut self, f: impl FnOnce(&mut Vtl2Settings)) -> anyhow::Result<()>
254 );
255
256 petri_vm_fn!(pub(crate) async fn resume(&mut self) -> anyhow::Result<()>);
257 petri_vm_fn!(pub(crate) async fn verify_save_restore(&mut self) -> anyhow::Result<()>);
258 petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
259
260 pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
267 &mut self,
268 future: F,
269 ) -> anyhow::Result<T> {
270 Self::wait_for_halt_or_internal(&mut self.halt, future).await
271 }
272
273 async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
274 halt: &mut PetriVmHaltReceiver,
275 future: F,
276 ) -> anyhow::Result<T> {
277 let future = &mut std::pin::pin!(future);
278 enum Either<T> {
279 Future(anyhow::Result<T>),
280 Halt(Result<HaltReason, RecvError>),
281 }
282 let res = (
283 future.map(Either::Future),
284 halt.halt_notif.recv().map(Either::Halt),
285 )
286 .race()
287 .await;
288
289 match res {
290 Either::Future(Ok(success)) => Ok(success),
291 Either::Future(Err(e)) => {
292 tracing::warn!(
293 ?e,
294 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
295 );
296 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
297 c.cancelled().await;
298 Err(e)
299 }
300 Either::Halt(halt_result) => {
301 tracing::warn!(
302 halt_result = format_args!("{:x?}", halt_result),
303 "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
304 );
305 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
306 let try_again = c.until_cancelled(future).await;
307
308 match try_again {
309 Ok(fut_result) => {
310 halt.already_received = Some(halt_result);
311 if let Err(e) = &fut_result {
312 tracing::warn!(
313 ?e,
314 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
315 );
316 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
317 c.cancelled().await;
318 }
319 fut_result
320 }
321 Err(_cancel) => match halt_result {
322 Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
323 Err(e) => Err(e).context("VM disappeared"),
324 },
325 }
326 }
327 }
328 }
329}
330
331impl PetriVmInner {
332 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
333 self.resources
334 .firmware_event_recv
335 .recv()
336 .await
337 .context("Failed to get firmware boot event")
338 }
339
340 async fn wait_for_enlightened_shutdown_ready(
341 &mut self,
342 ) -> anyhow::Result<mesh::OneshotReceiver<()>> {
343 let recv = self
344 .resources
345 .shutdown_ic_send
346 .call(ShutdownRpc::WaitReady, ())
347 .await?;
348
349 Ok(recv)
350 }
351
352 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
353 let shutdown_result = self
354 .resources
355 .shutdown_ic_send
356 .call(
357 ShutdownRpc::Shutdown,
358 hyperv_ic_resources::shutdown::ShutdownParams {
359 shutdown_type: match kind {
360 ShutdownKind::Shutdown => {
361 hyperv_ic_resources::shutdown::ShutdownType::PowerOff
362 }
363 ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
364 },
365 force: false,
366 },
367 )
368 .await?;
369
370 tracing::info!(?shutdown_result, "Shutdown sent");
371 anyhow::ensure!(
372 shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
373 "Got non-Ok shutdown response"
374 );
375
376 Ok(())
377 }
378
379 async fn wait_for_kvp(
380 &mut self,
381 ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
382 tracing::info!("Waiting for KVP IC");
383 let (send, _) = self
384 .resources
385 .kvp_ic_send
386 .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
387 .await
388 .context("failed to connect to KVP IC")?;
389
390 Ok(send)
391 }
392
393 async fn save_openhcl(
394 &self,
395 new_openhcl: &ResolvedArtifact,
396 flags: OpenHclServicingFlags,
397 ) -> anyhow::Result<()> {
398 let ged_send = self
399 .resources
400 .ged_send
401 .as_ref()
402 .context("openhcl not configured")?;
403
404 let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
405 self.worker
406 .save_openhcl(ged_send, flags, igvm_file.into())
407 .await
408 }
409
410 async fn restore_openhcl(&self) -> anyhow::Result<()> {
411 let ged_send = self
412 .resources
413 .ged_send
414 .as_ref()
415 .context("openhcl not configured")?;
416
417 self.worker.restore_openhcl(ged_send).await
418 }
419
420 async fn modify_vtl2_settings(
421 &mut self,
422 f: impl FnOnce(&mut Vtl2Settings),
423 ) -> anyhow::Result<()> {
424 f(self.resources.vtl2_settings.as_mut().unwrap());
425
426 let ged_send = self
427 .resources
428 .ged_send
429 .as_ref()
430 .context("openhcl not configured")?;
431
432 ged_send
433 .call_failable(
434 get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
435 prost::Message::encode_to_vec(self.resources.vtl2_settings.as_ref().unwrap()),
436 )
437 .await?;
438
439 Ok(())
440 }
441
442 async fn reset(&mut self) -> anyhow::Result<()> {
443 tracing::info!("Resetting VM");
444 self.worker.reset().await?;
445 if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
447 agent.reset();
448
449 if self
450 .resources
451 .agent_image
452 .as_ref()
453 .is_some_and(|x| x.contains_pipette())
454 {
455 self.launch_linux_direct_pipette().await?;
456 }
457 }
458 Ok(())
459 }
460
461 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
462 let listener = if set_high_vtl {
463 self.resources
464 .vtl2_pipette_listener
465 .as_mut()
466 .context("VM is not configured with VTL 2")?
467 } else {
468 &mut self.resources.pipette_listener
469 };
470
471 tracing::info!(set_high_vtl, "listening for pipette connection");
472 let (conn, _) = listener
473 .accept()
474 .await
475 .context("failed to accept pipette connection")?;
476 tracing::info!(set_high_vtl, "handshaking with pipette");
477 let client = PipetteClient::new(
478 &self.resources.driver,
479 PolledSocket::new(&self.resources.driver, conn)?,
480 &self.resources.output_dir,
481 )
482 .await
483 .context("failed to connect to pipette");
484 tracing::info!(set_high_vtl, "completed pipette handshake");
485 client
486 }
487
488 async fn resume(&self) -> anyhow::Result<()> {
489 self.worker.resume().await?;
490 Ok(())
491 }
492
493 async fn verify_save_restore(&self) -> anyhow::Result<()> {
494 for i in 0..2 {
495 let result = self.worker.pulse_save_restore().await;
496 match result {
497 Ok(()) => {}
498 Err(RpcError::Channel(err)) => return Err(err.into()),
499 Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
500 tracing::warn!("Reset not supported, could not test save + restore.");
501 break;
502 }
503 Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
504 return Err(anyhow::Error::from(err))
505 .context(format!("Save + restore {i} failed."));
506 }
507 }
508 }
509
510 Ok(())
511 }
512
513 async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
514 self.resources
516 .linux_direct_serial_agent
517 .as_mut()
518 .unwrap()
519 .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
520 .await?;
521 Ok(())
522 }
523}
524
525pub struct OpenVmmInspector {
527 worker: Arc<Worker>,
528}
529
530#[async_trait]
531impl PetriVmInspector for OpenVmmInspector {
532 async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
533 Ok(self.worker.inspect_all().await)
534 }
535}
536
537pub struct OpenVmmFramebufferAccess {
539 view: View,
540}
541
542#[async_trait]
543impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
544 async fn screenshot(
545 &mut self,
546 image: &mut Vec<u8>,
547 ) -> anyhow::Result<Option<VmScreenshotMeta>> {
548 const BYTES_PER_PIXEL: usize = 4;
553 let (width, height) = self.view.resolution();
554 let (widthsize, heightsize) = (width as usize, height as usize);
555 let len = widthsize * heightsize * BYTES_PER_PIXEL;
556
557 image.resize(len, 0);
558 for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
559 self.view.read_line(i, line);
560 for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
561 pixel.swap(0, 2);
562 pixel[3] = 0xFF;
563 }
564 }
565
566 Ok(Some(VmScreenshotMeta {
567 color: image::ExtendedColorType::Rgba8,
568 width,
569 height,
570 }))
571 }
572}