1use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriVmFramebufferAccess;
10use crate::PetriVmInspector;
11use crate::PetriVmRuntime;
12use crate::ShutdownKind;
13use crate::VmScreenshotMeta;
14use crate::openhcl_diag::OpenHclDiagHandler;
15use crate::worker::Worker;
16use anyhow::Context;
17use async_trait::async_trait;
18use framebuffer::View;
19use futures::FutureExt;
20use futures_concurrency::future::Race;
21use get_resources::ged::FirmwareEvent;
22use hvlite_defs::rpc::PulseSaveRestoreError;
23use hyperv_ic_resources::shutdown::ShutdownRpc;
24use mesh::CancelContext;
25use mesh::Receiver;
26use mesh::RecvError;
27use mesh::rpc::RpcError;
28use mesh::rpc::RpcSend;
29use mesh_process::Mesh;
30use pal_async::socket::PolledSocket;
31use petri_artifacts_core::ResolvedArtifact;
32use pipette_client::PipetteClient;
33use std::future::Future;
34use std::path::Path;
35use std::sync::Arc;
36use std::time::Duration;
37use vmm_core_defs::HaltReason;
38use vtl2_settings_proto::Vtl2Settings;
39
40pub struct PetriVmOpenVmm {
45 inner: PetriVmInner,
46 halt: PetriVmHaltReceiver,
47}
48
49#[async_trait]
50impl PetriVmRuntime for PetriVmOpenVmm {
51 type VmInspector = OpenVmmInspector;
52 type VmFramebufferAccess = OpenVmmFramebufferAccess;
53
54 async fn teardown(self) -> anyhow::Result<()> {
55 tracing::info!("waiting for worker");
56 let worker = Arc::into_inner(self.inner.worker)
57 .context("all references to the OpenVMM worker have not been closed")?;
58 worker.shutdown().await?;
59
60 tracing::info!("Worker quit, waiting for mesh");
61 self.inner.mesh.shutdown().await;
62
63 tracing::info!("Mesh shutdown, waiting for logging tasks");
64 for t in self.inner.resources.log_stream_tasks {
65 t.await?;
66 }
67
68 Ok(())
69 }
70
71 async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReason> {
72 let halt_reason = if let Some(already) = self.halt.already_received.take() {
73 already.map_err(anyhow::Error::from)
74 } else {
75 self.halt
76 .halt_notif
77 .recv()
78 .await
79 .context("Failed to get halt reason")
80 }?;
81
82 tracing::info!(?halt_reason, "Got halt reason");
83
84 let halt_reason = match halt_reason {
85 HaltReason::PowerOff => PetriHaltReason::PowerOff,
86 HaltReason::Reset => PetriHaltReason::Reset,
87 HaltReason::Hibernate => PetriHaltReason::Hibernate,
88 HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
89 _ => PetriHaltReason::Other,
90 };
91
92 if allow_reset && halt_reason == PetriHaltReason::Reset {
93 self.reset().await?
94 }
95
96 Ok(halt_reason)
97 }
98
99 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
100 Self::wait_for_agent(self, set_high_vtl).await
101 }
102
103 fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
104 self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
105 OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
106 self.inner.resources.driver.clone(),
107 path,
108 ))
109 })
110 }
111
112 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
113 Self::wait_for_boot_event(self).await
114 }
115
116 async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
117 Self::wait_for_enlightened_shutdown_ready(self)
118 .await
119 .map(|_| ())
120 }
121
122 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
123 Self::send_enlightened_shutdown(self, kind).await
124 }
125
126 async fn restart_openhcl(
127 &mut self,
128 new_openhcl: &ResolvedArtifact,
129 flags: OpenHclServicingFlags,
130 ) -> anyhow::Result<()> {
131 Self::save_openhcl(self, new_openhcl, flags).await?;
132 Self::restore_openhcl(self).await
133 }
134
135 async fn save_openhcl(
136 &mut self,
137 new_openhcl: &ResolvedArtifact,
138 flags: OpenHclServicingFlags,
139 ) -> anyhow::Result<()> {
140 Self::save_openhcl(self, new_openhcl, flags).await
141 }
142
143 async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
144 Self::restore_openhcl(self).await
145 }
146
147 async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
148 Self::update_command_line(self, command_line).await
149 }
150
151 fn inspector(&self) -> Option<OpenVmmInspector> {
152 Some(OpenVmmInspector {
153 worker: self.inner.worker.clone(),
154 })
155 }
156
157 fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
158 self.inner
159 .framebuffer_view
160 .take()
161 .map(|view| OpenVmmFramebufferAccess { view })
162 }
163
164 async fn reset(&mut self) -> anyhow::Result<()> {
165 Self::reset(self).await
166 }
167}
168
169pub(super) struct PetriVmInner {
170 pub(super) resources: PetriVmResourcesOpenVmm,
171 pub(super) mesh: Mesh,
172 pub(super) worker: Arc<Worker>,
173 pub(super) framebuffer_view: Option<View>,
174}
175
176struct PetriVmHaltReceiver {
177 halt_notif: Receiver<HaltReason>,
178 already_received: Option<Result<HaltReason, RecvError>>,
179}
180
181macro_rules! petri_vm_fn {
184 ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
185 $(#[$($attrss)*])*
186 $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
187 Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
188 }
189 };
190}
191
192impl PetriVmOpenVmm {
195 pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
196 Self {
197 inner,
198 halt: PetriVmHaltReceiver {
199 halt_notif,
200 already_received: None,
201 },
202 }
203 }
204
205 pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
207 self.inner
208 .resources
209 .vtl2_vsock_path
210 .as_deref()
211 .context("VM is not configured with OpenHCL")
212 }
213
214 petri_vm_fn!(
215 pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
218 );
219 petri_vm_fn!(
220 pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<mesh::OneshotReceiver<()>>
223 );
224 petri_vm_fn!(
225 pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
227 );
228 petri_vm_fn!(
229 pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
232 );
233 petri_vm_fn!(
234 pub async fn save_openhcl(
236 &mut self,
237 new_openhcl: &ResolvedArtifact,
238 flags: OpenHclServicingFlags
239 ) -> anyhow::Result<()>
240 );
241 petri_vm_fn!(
242 pub async fn restore_openhcl(
244 &mut self
245 ) -> anyhow::Result<()>
246 );
247 petri_vm_fn!(
248 pub async fn update_command_line(
250 &mut self,
251 command_line: &str
252 ) -> anyhow::Result<()>
253 );
254 petri_vm_fn!(
255 pub async fn reset(&mut self) -> anyhow::Result<()>
257 );
258 petri_vm_fn!(
259 pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
261 );
262 petri_vm_fn!(
263 pub async fn modify_vtl2_settings(&mut self, f: impl FnOnce(&mut Vtl2Settings)) -> anyhow::Result<()>
265 );
266
267 petri_vm_fn!(pub(crate) async fn resume(&mut self) -> anyhow::Result<()>);
268 petri_vm_fn!(pub(crate) async fn verify_save_restore(&mut self) -> anyhow::Result<()>);
269 petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
270
271 pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
278 &mut self,
279 future: F,
280 ) -> anyhow::Result<T> {
281 Self::wait_for_halt_or_internal(&mut self.halt, future).await
282 }
283
284 async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
285 halt: &mut PetriVmHaltReceiver,
286 future: F,
287 ) -> anyhow::Result<T> {
288 let future = &mut std::pin::pin!(future);
289 enum Either<T> {
290 Future(anyhow::Result<T>),
291 Halt(Result<HaltReason, RecvError>),
292 }
293 let res = (
294 future.map(Either::Future),
295 halt.halt_notif.recv().map(Either::Halt),
296 )
297 .race()
298 .await;
299
300 match res {
301 Either::Future(Ok(success)) => Ok(success),
302 Either::Future(Err(e)) => {
303 tracing::warn!(
304 ?e,
305 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
306 );
307 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
308 c.cancelled().await;
309 Err(e)
310 }
311 Either::Halt(halt_result) => {
312 tracing::warn!(
313 halt_result = format_args!("{:x?}", halt_result),
314 "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
315 );
316 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
317 let try_again = c.until_cancelled(future).await;
318
319 match try_again {
320 Ok(fut_result) => {
321 halt.already_received = Some(halt_result);
322 if let Err(e) = &fut_result {
323 tracing::warn!(
324 ?e,
325 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
326 );
327 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
328 c.cancelled().await;
329 }
330 fut_result
331 }
332 Err(_cancel) => match halt_result {
333 Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
334 Err(e) => Err(e).context("VM disappeared"),
335 },
336 }
337 }
338 }
339 }
340}
341
342impl PetriVmInner {
343 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
344 self.resources
345 .firmware_event_recv
346 .recv()
347 .await
348 .context("Failed to get firmware boot event")
349 }
350
351 async fn wait_for_enlightened_shutdown_ready(
352 &mut self,
353 ) -> anyhow::Result<mesh::OneshotReceiver<()>> {
354 let recv = self
355 .resources
356 .shutdown_ic_send
357 .call(ShutdownRpc::WaitReady, ())
358 .await?;
359
360 Ok(recv)
361 }
362
363 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
364 let shutdown_result = self
365 .resources
366 .shutdown_ic_send
367 .call(
368 ShutdownRpc::Shutdown,
369 hyperv_ic_resources::shutdown::ShutdownParams {
370 shutdown_type: match kind {
371 ShutdownKind::Shutdown => {
372 hyperv_ic_resources::shutdown::ShutdownType::PowerOff
373 }
374 ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
375 },
376 force: false,
377 },
378 )
379 .await?;
380
381 tracing::info!(?shutdown_result, "Shutdown sent");
382 anyhow::ensure!(
383 shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
384 "Got non-Ok shutdown response"
385 );
386
387 Ok(())
388 }
389
390 async fn wait_for_kvp(
391 &mut self,
392 ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
393 tracing::info!("Waiting for KVP IC");
394 let (send, _) = self
395 .resources
396 .kvp_ic_send
397 .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
398 .await
399 .context("failed to connect to KVP IC")?;
400
401 Ok(send)
402 }
403
404 async fn save_openhcl(
405 &self,
406 new_openhcl: &ResolvedArtifact,
407 flags: OpenHclServicingFlags,
408 ) -> anyhow::Result<()> {
409 let ged_send = self
410 .resources
411 .ged_send
412 .as_ref()
413 .context("openhcl not configured")?;
414
415 let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
416 self.worker
417 .save_openhcl(ged_send, flags, igvm_file.into())
418 .await
419 }
420
421 async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
422 self.worker.update_command_line(command_line).await
423 }
424
425 async fn restore_openhcl(&self) -> anyhow::Result<()> {
426 let ged_send = self
427 .resources
428 .ged_send
429 .as_ref()
430 .context("openhcl not configured")?;
431
432 self.worker.restore_openhcl(ged_send).await
433 }
434
435 async fn modify_vtl2_settings(
436 &mut self,
437 f: impl FnOnce(&mut Vtl2Settings),
438 ) -> anyhow::Result<()> {
439 f(self.resources.vtl2_settings.as_mut().unwrap());
440
441 let ged_send = self
442 .resources
443 .ged_send
444 .as_ref()
445 .context("openhcl not configured")?;
446
447 ged_send
448 .call_failable(
449 get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
450 prost::Message::encode_to_vec(self.resources.vtl2_settings.as_ref().unwrap()),
451 )
452 .await?;
453
454 Ok(())
455 }
456
457 async fn reset(&mut self) -> anyhow::Result<()> {
458 tracing::info!("Resetting VM");
459 self.worker.reset().await?;
460 if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
462 agent.reset();
463
464 if self
465 .resources
466 .agent_image
467 .as_ref()
468 .is_some_and(|x| x.contains_pipette())
469 {
470 self.launch_linux_direct_pipette().await?;
471 }
472 }
473 Ok(())
474 }
475
476 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
477 let listener = if set_high_vtl {
478 self.resources
479 .vtl2_pipette_listener
480 .as_mut()
481 .context("VM is not configured with VTL 2")?
482 } else {
483 &mut self.resources.pipette_listener
484 };
485
486 tracing::info!(set_high_vtl, "listening for pipette connection");
487 let (conn, _) = listener
488 .accept()
489 .await
490 .context("failed to accept pipette connection")?;
491 tracing::info!(set_high_vtl, "handshaking with pipette");
492 let client = PipetteClient::new(
493 &self.resources.driver,
494 PolledSocket::new(&self.resources.driver, conn)?,
495 &self.resources.output_dir,
496 )
497 .await
498 .context("failed to connect to pipette");
499 tracing::info!(set_high_vtl, "completed pipette handshake");
500 client
501 }
502
503 async fn resume(&self) -> anyhow::Result<()> {
504 self.worker.resume().await?;
505 Ok(())
506 }
507
508 async fn verify_save_restore(&self) -> anyhow::Result<()> {
509 for i in 0..2 {
510 let result = self.worker.pulse_save_restore().await;
511 match result {
512 Ok(()) => {}
513 Err(RpcError::Channel(err)) => return Err(err.into()),
514 Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
515 tracing::warn!("Reset not supported, could not test save + restore.");
516 break;
517 }
518 Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
519 return Err(anyhow::Error::from(err))
520 .context(format!("Save + restore {i} failed."));
521 }
522 }
523 }
524
525 Ok(())
526 }
527
528 async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
529 self.resources
531 .linux_direct_serial_agent
532 .as_mut()
533 .unwrap()
534 .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
535 .await?;
536 Ok(())
537 }
538}
539
540pub struct OpenVmmInspector {
542 worker: Arc<Worker>,
543}
544
545#[async_trait]
546impl PetriVmInspector for OpenVmmInspector {
547 async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
548 Ok(self.worker.inspect_all().await)
549 }
550}
551
552pub struct OpenVmmFramebufferAccess {
554 view: View,
555}
556
557#[async_trait]
558impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
559 async fn screenshot(
560 &mut self,
561 image: &mut Vec<u8>,
562 ) -> anyhow::Result<Option<VmScreenshotMeta>> {
563 const BYTES_PER_PIXEL: usize = 4;
568 let (width, height) = self.view.resolution();
569 let (widthsize, heightsize) = (width as usize, height as usize);
570 let len = widthsize * heightsize * BYTES_PER_PIXEL;
571
572 image.resize(len, 0);
573 for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
574 self.view.read_line(i, line);
575 for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
576 pixel.swap(0, 2);
577 pixel[3] = 0xFF;
578 }
579 }
580
581 Ok(Some(VmScreenshotMeta {
582 color: image::ExtendedColorType::Rgba8,
583 width,
584 height,
585 }))
586 }
587}