1use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriVmFramebufferAccess;
10use crate::PetriVmInspector;
11use crate::PetriVmRuntime;
12use crate::ShutdownKind;
13use crate::VmScreenshotMeta;
14use crate::openhcl_diag::OpenHclDiagHandler;
15use crate::worker::Worker;
16use anyhow::Context;
17use async_trait::async_trait;
18use framebuffer::View;
19use futures::FutureExt;
20use futures_concurrency::future::Race;
21use get_resources::ged::FirmwareEvent;
22use hvlite_defs::rpc::PulseSaveRestoreError;
23use hyperv_ic_resources::shutdown::ShutdownRpc;
24use mesh::CancelContext;
25use mesh::Receiver;
26use mesh::RecvError;
27use mesh::rpc::RpcError;
28use mesh::rpc::RpcSend;
29use mesh_process::Mesh;
30use pal_async::DefaultDriver;
31use pal_async::socket::PolledSocket;
32use petri_artifacts_core::ResolvedArtifact;
33use pipette_client::PipetteClient;
34use std::future::Future;
35use std::path::Path;
36use std::sync::Arc;
37use std::time::Duration;
38use unix_socket::UnixListener;
39use vmm_core_defs::HaltReason;
40use vtl2_settings_proto::Vtl2Settings;
41
42pub struct PetriVmOpenVmm {
47 inner: PetriVmInner,
48 halt: PetriVmHaltReceiver,
49}
50
51#[async_trait]
52impl PetriVmRuntime for PetriVmOpenVmm {
53 type VmInspector = OpenVmmInspector;
54 type VmFramebufferAccess = OpenVmmFramebufferAccess;
55
56 async fn teardown(self) -> anyhow::Result<()> {
57 tracing::info!("waiting for worker");
58 let worker = Arc::into_inner(self.inner.worker)
59 .context("all references to the OpenVMM worker have not been closed")?;
60 worker.shutdown().await?;
61
62 tracing::info!("Worker quit, waiting for mesh");
63 self.inner.mesh.shutdown().await;
64
65 tracing::info!("Mesh shutdown, waiting for logging tasks");
66 for t in self.inner.resources.log_stream_tasks {
67 t.await?;
68 }
69
70 Ok(())
71 }
72
73 async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReason> {
74 let halt_reason = if let Some(already) = self.halt.already_received.take() {
75 already.map_err(anyhow::Error::from)
76 } else {
77 self.halt
78 .halt_notif
79 .recv()
80 .await
81 .context("Failed to get halt reason")
82 }?;
83
84 tracing::info!(?halt_reason, "Got halt reason");
85
86 let halt_reason = match halt_reason {
87 HaltReason::PowerOff => PetriHaltReason::PowerOff,
88 HaltReason::Reset => PetriHaltReason::Reset,
89 HaltReason::Hibernate => PetriHaltReason::Hibernate,
90 HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
91 _ => PetriHaltReason::Other,
92 };
93
94 if allow_reset && halt_reason == PetriHaltReason::Reset {
95 self.reset().await?
96 }
97
98 Ok(halt_reason)
99 }
100
101 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
102 Self::wait_for_agent(self, set_high_vtl).await
103 }
104
105 fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
106 self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
107 OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
108 self.inner.resources.driver.clone(),
109 path,
110 ))
111 })
112 }
113
114 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
115 Self::wait_for_boot_event(self).await
116 }
117
118 async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
119 Self::wait_for_enlightened_shutdown_ready(self)
120 .await
121 .map(|_| ())
122 }
123
124 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
125 Self::send_enlightened_shutdown(self, kind).await
126 }
127
128 async fn restart_openhcl(
129 &mut self,
130 new_openhcl: &ResolvedArtifact,
131 flags: OpenHclServicingFlags,
132 ) -> anyhow::Result<()> {
133 Self::save_openhcl(self, new_openhcl, flags).await?;
134 Self::restore_openhcl(self).await
135 }
136
137 async fn save_openhcl(
138 &mut self,
139 new_openhcl: &ResolvedArtifact,
140 flags: OpenHclServicingFlags,
141 ) -> anyhow::Result<()> {
142 Self::save_openhcl(self, new_openhcl, flags).await
143 }
144
145 async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
146 Self::restore_openhcl(self).await
147 }
148
149 fn inspector(&self) -> Option<OpenVmmInspector> {
150 Some(OpenVmmInspector {
151 worker: self.inner.worker.clone(),
152 })
153 }
154
155 fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
156 self.inner
157 .framebuffer_view
158 .take()
159 .map(|view| OpenVmmFramebufferAccess { view })
160 }
161
162 async fn reset(&mut self) -> anyhow::Result<()> {
163 Self::reset(self).await
164 }
165}
166
167pub(super) struct PetriVmInner {
168 pub(super) resources: PetriVmResourcesOpenVmm,
169 pub(super) mesh: Mesh,
170 pub(super) worker: Arc<Worker>,
171 pub(super) framebuffer_view: Option<View>,
172}
173
174struct PetriVmHaltReceiver {
175 halt_notif: Receiver<HaltReason>,
176 already_received: Option<Result<HaltReason, RecvError>>,
177}
178
179macro_rules! petri_vm_fn {
182 ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
183 $(#[$($attrss)*])*
184 $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
185 Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
186 }
187 };
188}
189
190impl PetriVmOpenVmm {
193 pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
194 Self {
195 inner,
196 halt: PetriVmHaltReceiver {
197 halt_notif,
198 already_received: None,
199 },
200 }
201 }
202
203 pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
205 self.inner
206 .resources
207 .vtl2_vsock_path
208 .as_deref()
209 .context("VM is not configured with OpenHCL")
210 }
211
212 petri_vm_fn!(
213 pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
216 );
217 petri_vm_fn!(
218 pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<mesh::OneshotReceiver<()>>
221 );
222 petri_vm_fn!(
223 pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
225 );
226 petri_vm_fn!(
227 pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
230 );
231 petri_vm_fn!(
232 pub async fn save_openhcl(
234 &mut self,
235 new_openhcl: &ResolvedArtifact,
236 flags: OpenHclServicingFlags
237 ) -> anyhow::Result<()>
238 );
239 petri_vm_fn!(
240 pub async fn restore_openhcl(
242 &mut self
243 ) -> anyhow::Result<()>
244 );
245 petri_vm_fn!(
246 pub async fn reset(&mut self) -> anyhow::Result<()>
248 );
249 petri_vm_fn!(
250 pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
252 );
253 petri_vm_fn!(
254 pub async fn modify_vtl2_settings(&mut self, f: impl FnOnce(&mut Vtl2Settings)) -> anyhow::Result<()>
256 );
257
258 petri_vm_fn!(pub(crate) async fn resume(&mut self) -> anyhow::Result<()>);
259 petri_vm_fn!(pub(crate) async fn verify_save_restore(&mut self) -> anyhow::Result<()>);
260 petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
261
262 pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
269 &mut self,
270 future: F,
271 ) -> anyhow::Result<T> {
272 Self::wait_for_halt_or_internal(&mut self.halt, future).await
273 }
274
275 async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
276 halt: &mut PetriVmHaltReceiver,
277 future: F,
278 ) -> anyhow::Result<T> {
279 let future = &mut std::pin::pin!(future);
280 enum Either<T> {
281 Future(anyhow::Result<T>),
282 Halt(Result<HaltReason, RecvError>),
283 }
284 let res = (
285 future.map(Either::Future),
286 halt.halt_notif.recv().map(Either::Halt),
287 )
288 .race()
289 .await;
290
291 match res {
292 Either::Future(Ok(success)) => Ok(success),
293 Either::Future(Err(e)) => {
294 tracing::warn!(
295 ?e,
296 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
297 );
298 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
299 c.cancelled().await;
300 Err(e)
301 }
302 Either::Halt(halt_result) => {
303 tracing::warn!(
304 halt_result = format_args!("{:x?}", halt_result),
305 "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
306 );
307 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
308 let try_again = c.until_cancelled(future).await;
309
310 match try_again {
311 Ok(fut_result) => {
312 halt.already_received = Some(halt_result);
313 if let Err(e) = &fut_result {
314 tracing::warn!(
315 ?e,
316 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
317 );
318 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
319 c.cancelled().await;
320 }
321 fut_result
322 }
323 Err(_cancel) => match halt_result {
324 Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
325 Err(e) => Err(e).context("VM disappeared"),
326 },
327 }
328 }
329 }
330 }
331}
332
333impl PetriVmInner {
334 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
335 self.resources
336 .firmware_event_recv
337 .recv()
338 .await
339 .context("Failed to get firmware boot event")
340 }
341
342 async fn wait_for_enlightened_shutdown_ready(
343 &mut self,
344 ) -> anyhow::Result<mesh::OneshotReceiver<()>> {
345 let recv = self
346 .resources
347 .shutdown_ic_send
348 .call(ShutdownRpc::WaitReady, ())
349 .await?;
350
351 Ok(recv)
352 }
353
354 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
355 let shutdown_result = self
356 .resources
357 .shutdown_ic_send
358 .call(
359 ShutdownRpc::Shutdown,
360 hyperv_ic_resources::shutdown::ShutdownParams {
361 shutdown_type: match kind {
362 ShutdownKind::Shutdown => {
363 hyperv_ic_resources::shutdown::ShutdownType::PowerOff
364 }
365 ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
366 },
367 force: false,
368 },
369 )
370 .await?;
371
372 tracing::info!(?shutdown_result, "Shutdown sent");
373 anyhow::ensure!(
374 shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
375 "Got non-Ok shutdown response"
376 );
377
378 Ok(())
379 }
380
381 async fn wait_for_kvp(
382 &mut self,
383 ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
384 tracing::info!("Waiting for KVP IC");
385 let (send, _) = self
386 .resources
387 .kvp_ic_send
388 .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
389 .await
390 .context("failed to connect to KVP IC")?;
391
392 Ok(send)
393 }
394
395 async fn save_openhcl(
396 &self,
397 new_openhcl: &ResolvedArtifact,
398 flags: OpenHclServicingFlags,
399 ) -> anyhow::Result<()> {
400 let ged_send = self
401 .resources
402 .ged_send
403 .as_ref()
404 .context("openhcl not configured")?;
405
406 let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
407 self.worker
408 .save_openhcl(ged_send, flags, igvm_file.into())
409 .await
410 }
411
412 async fn restore_openhcl(&self) -> anyhow::Result<()> {
413 let ged_send = self
414 .resources
415 .ged_send
416 .as_ref()
417 .context("openhcl not configured")?;
418
419 self.worker.restore_openhcl(ged_send).await
420 }
421
422 async fn modify_vtl2_settings(
423 &mut self,
424 f: impl FnOnce(&mut Vtl2Settings),
425 ) -> anyhow::Result<()> {
426 f(self.resources.vtl2_settings.as_mut().unwrap());
427
428 let ged_send = self
429 .resources
430 .ged_send
431 .as_ref()
432 .context("openhcl not configured")?;
433
434 ged_send
435 .call_failable(
436 get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
437 prost::Message::encode_to_vec(self.resources.vtl2_settings.as_ref().unwrap()),
438 )
439 .await?;
440
441 Ok(())
442 }
443
444 async fn reset(&mut self) -> anyhow::Result<()> {
445 tracing::info!("Resetting VM");
446 self.worker.reset().await?;
447 if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
449 agent.reset();
450
451 if self
452 .resources
453 .agent_image
454 .as_ref()
455 .is_some_and(|x| x.contains_pipette())
456 {
457 self.launch_linux_direct_pipette().await?;
458 }
459 }
460 Ok(())
461 }
462
463 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
464 Self::wait_for_agent_core(
465 &self.resources.driver,
466 if set_high_vtl {
467 self.resources
468 .vtl2_pipette_listener
469 .as_mut()
470 .context("VM is not configured with VTL 2")?
471 } else {
472 &mut self.resources.pipette_listener
473 },
474 &self.resources.output_dir,
475 )
476 .await
477 }
478
479 async fn wait_for_agent_core(
480 driver: &DefaultDriver,
481 listener: &mut PolledSocket<UnixListener>,
482 output_dir: &Path,
483 ) -> anyhow::Result<PipetteClient> {
484 tracing::info!("listening for pipette connection");
486 let (conn, _) = listener
487 .accept()
488 .await
489 .context("failed to accept pipette connection")?;
490
491 tracing::info!("handshaking with pipette");
492 let client = PipetteClient::new(&driver, PolledSocket::new(driver, conn)?, output_dir)
493 .await
494 .context("failed to connect to pipette");
495
496 tracing::info!("completed pipette handshake");
497 client
498 }
499
500 async fn resume(&self) -> anyhow::Result<()> {
501 self.worker.resume().await?;
502 Ok(())
503 }
504
505 async fn verify_save_restore(&self) -> anyhow::Result<()> {
506 for i in 0..2 {
507 let result = self.worker.pulse_save_restore().await;
508 match result {
509 Ok(()) => {}
510 Err(RpcError::Channel(err)) => return Err(err.into()),
511 Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
512 tracing::warn!("Reset not supported, could not test save + restore.");
513 break;
514 }
515 Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
516 return Err(anyhow::Error::from(err))
517 .context(format!("Save + restore {i} failed."));
518 }
519 }
520 }
521
522 Ok(())
523 }
524
525 async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
526 self.resources
528 .linux_direct_serial_agent
529 .as_mut()
530 .unwrap()
531 .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
532 .await?;
533 Ok(())
534 }
535}
536
537pub struct OpenVmmInspector {
539 worker: Arc<Worker>,
540}
541
542#[async_trait]
543impl PetriVmInspector for OpenVmmInspector {
544 async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
545 Ok(self.worker.inspect_all().await)
546 }
547}
548
549pub struct OpenVmmFramebufferAccess {
551 view: View,
552}
553
554#[async_trait]
555impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
556 async fn screenshot(
557 &mut self,
558 image: &mut Vec<u8>,
559 ) -> anyhow::Result<Option<VmScreenshotMeta>> {
560 const BYTES_PER_PIXEL: usize = 4;
565 let (width, height) = self.view.resolution();
566 let (widthsize, heightsize) = (width as usize, height as usize);
567 let len = widthsize * heightsize * BYTES_PER_PIXEL;
568
569 image.resize(len, 0);
570 for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
571 self.view.read_line(i, line);
572 for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
573 pixel.swap(0, 2);
574 pixel[3] = 0xFF;
575 }
576 }
577
578 Ok(Some(VmScreenshotMeta {
579 color: image::ExtendedColorType::Rgba8,
580 width,
581 height,
582 }))
583 }
584}