1use super::PetriVmResourcesOpenVmm;
7use crate::OpenHclServicingFlags;
8use crate::PetriHaltReason;
9use crate::PetriVmFramebufferAccess;
10use crate::PetriVmInspector;
11use crate::PetriVmRuntime;
12use crate::ShutdownKind;
13use crate::VmScreenshotMeta;
14use crate::openhcl_diag::OpenHclDiagHandler;
15use crate::worker::Worker;
16use anyhow::Context;
17use async_trait::async_trait;
18use framebuffer::View;
19use futures::FutureExt;
20use futures_concurrency::future::Race;
21use get_resources::ged::FirmwareEvent;
22use hyperv_ic_resources::shutdown::ShutdownRpc;
23use mesh::CancelContext;
24use mesh::Receiver;
25use mesh::RecvError;
26use mesh::rpc::RpcError;
27use mesh::rpc::RpcSend;
28use mesh_process::Mesh;
29use openvmm_defs::rpc::PulseSaveRestoreError;
30use pal_async::socket::PolledSocket;
31use petri_artifacts_core::ResolvedArtifact;
32use pipette_client::PipetteClient;
33use std::future::Future;
34use std::path::Path;
35use std::sync::Arc;
36use std::time::Duration;
37use vmm_core_defs::HaltReason;
38use vtl2_settings_proto::Vtl2Settings;
39
40pub struct PetriVmOpenVmm {
45 inner: PetriVmInner,
46 halt: PetriVmHaltReceiver,
47}
48
49#[async_trait]
50impl PetriVmRuntime for PetriVmOpenVmm {
51 type VmInspector = OpenVmmInspector;
52 type VmFramebufferAccess = OpenVmmFramebufferAccess;
53
54 async fn teardown(self) -> anyhow::Result<()> {
55 tracing::info!("waiting for worker");
56 let worker = Arc::into_inner(self.inner.worker)
57 .context("all references to the OpenVMM worker have not been closed")?;
58 worker.shutdown().await?;
59
60 tracing::info!("Worker quit, waiting for mesh");
61 self.inner.mesh.shutdown().await;
62
63 tracing::info!("Mesh shutdown, waiting for logging tasks");
64 for t in self.inner.resources.log_stream_tasks {
65 t.await?;
66 }
67
68 Ok(())
69 }
70
71 async fn wait_for_halt(&mut self, allow_reset: bool) -> anyhow::Result<PetriHaltReason> {
72 let halt_reason = if let Some(already) = self.halt.already_received.take() {
73 already.map_err(anyhow::Error::from)
74 } else {
75 self.halt
76 .halt_notif
77 .recv()
78 .await
79 .context("Failed to get halt reason")
80 }?;
81
82 tracing::info!(?halt_reason, "Got halt reason");
83
84 let halt_reason = match halt_reason {
85 HaltReason::PowerOff => PetriHaltReason::PowerOff,
86 HaltReason::Reset => PetriHaltReason::Reset,
87 HaltReason::Hibernate => PetriHaltReason::Hibernate,
88 HaltReason::TripleFault { .. } => PetriHaltReason::TripleFault,
89 _ => PetriHaltReason::Other,
90 };
91
92 if allow_reset && halt_reason == PetriHaltReason::Reset {
93 self.reset().await?
94 }
95
96 Ok(halt_reason)
97 }
98
99 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
100 Self::wait_for_agent(self, set_high_vtl).await
101 }
102
103 fn openhcl_diag(&self) -> Option<OpenHclDiagHandler> {
104 self.inner.resources.vtl2_vsock_path.as_ref().map(|path| {
105 OpenHclDiagHandler::new(diag_client::DiagClient::from_hybrid_vsock(
106 self.inner.resources.driver.clone(),
107 path,
108 ))
109 })
110 }
111
112 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
113 Self::wait_for_boot_event(self).await
114 }
115
116 async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<()> {
117 Self::wait_for_enlightened_shutdown_ready(self)
118 .await
119 .map(|_| ())
120 }
121
122 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
123 Self::send_enlightened_shutdown(self, kind).await
124 }
125
126 async fn restart_openhcl(
127 &mut self,
128 new_openhcl: &ResolvedArtifact,
129 flags: OpenHclServicingFlags,
130 ) -> anyhow::Result<()> {
131 Self::save_openhcl(self, new_openhcl, flags).await?;
132 Self::restore_openhcl(self).await
133 }
134
135 async fn save_openhcl(
136 &mut self,
137 new_openhcl: &ResolvedArtifact,
138 flags: OpenHclServicingFlags,
139 ) -> anyhow::Result<()> {
140 Self::save_openhcl(self, new_openhcl, flags).await
141 }
142
143 async fn restore_openhcl(&mut self) -> anyhow::Result<()> {
144 Self::restore_openhcl(self).await
145 }
146
147 async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
148 Self::update_command_line(self, command_line).await
149 }
150
151 fn inspector(&self) -> Option<OpenVmmInspector> {
152 Some(OpenVmmInspector {
153 worker: self.inner.worker.clone(),
154 })
155 }
156
157 fn take_framebuffer_access(&mut self) -> Option<OpenVmmFramebufferAccess> {
158 self.inner
159 .framebuffer_view
160 .take()
161 .map(|view| OpenVmmFramebufferAccess { view })
162 }
163
164 async fn reset(&mut self) -> anyhow::Result<()> {
165 Self::reset(self).await
166 }
167
168 async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()> {
169 Self::set_vtl2_settings(self, settings).await
170 }
171
172 async fn set_vmbus_drive(
173 &mut self,
174 _disk: &crate::Drive,
175 _controller_id: &guid::Guid,
176 _controller_location: u32,
177 ) -> anyhow::Result<()> {
178 todo!("openvmm set vmbus drive")
179 }
180}
181
182pub(super) struct PetriVmInner {
183 pub(super) resources: PetriVmResourcesOpenVmm,
184 pub(super) mesh: Mesh,
185 pub(super) worker: Arc<Worker>,
186 pub(super) framebuffer_view: Option<View>,
187}
188
189struct PetriVmHaltReceiver {
190 halt_notif: Receiver<HaltReason>,
191 already_received: Option<Result<HaltReason, RecvError>>,
192}
193
194macro_rules! petri_vm_fn {
197 ($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
198 $(#[$($attrss)*])*
199 $vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
200 Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
201 }
202 };
203}
204
205impl PetriVmOpenVmm {
208 pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
209 Self {
210 inner,
211 halt: PetriVmHaltReceiver {
212 halt_notif,
213 already_received: None,
214 },
215 }
216 }
217
218 pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
220 self.inner
221 .resources
222 .vtl2_vsock_path
223 .as_deref()
224 .context("VM is not configured with OpenHCL")
225 }
226
227 petri_vm_fn!(
228 pub async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent>
231 );
232 petri_vm_fn!(
233 pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<mesh::OneshotReceiver<()>>
236 );
237 petri_vm_fn!(
238 pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
240 );
241 petri_vm_fn!(
242 pub async fn wait_for_kvp(&mut self) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>>
245 );
246 petri_vm_fn!(
247 pub async fn save_openhcl(
249 &mut self,
250 new_openhcl: &ResolvedArtifact,
251 flags: OpenHclServicingFlags
252 ) -> anyhow::Result<()>
253 );
254 petri_vm_fn!(
255 pub async fn restore_openhcl(
257 &mut self
258 ) -> anyhow::Result<()>
259 );
260 petri_vm_fn!(
261 pub async fn update_command_line(
263 &mut self,
264 command_line: &str
265 ) -> anyhow::Result<()>
266 );
267 petri_vm_fn!(
268 pub async fn reset(&mut self) -> anyhow::Result<()>
270 );
271 petri_vm_fn!(
272 pub async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient>
274 );
275 petri_vm_fn!(
276 pub async fn set_vtl2_settings(&mut self, settings: &Vtl2Settings) -> anyhow::Result<()>
278 );
279
280 petri_vm_fn!(pub(crate) async fn resume(&mut self) -> anyhow::Result<()>);
281 petri_vm_fn!(pub(crate) async fn verify_save_restore(&mut self) -> anyhow::Result<()>);
282 petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
283
284 pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
291 &mut self,
292 future: F,
293 ) -> anyhow::Result<T> {
294 Self::wait_for_halt_or_internal(&mut self.halt, future).await
295 }
296
297 async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
298 halt: &mut PetriVmHaltReceiver,
299 future: F,
300 ) -> anyhow::Result<T> {
301 let future = &mut std::pin::pin!(future);
302 enum Either<T> {
303 Future(anyhow::Result<T>),
304 Halt(Result<HaltReason, RecvError>),
305 }
306 let res = (
307 future.map(Either::Future),
308 halt.halt_notif.recv().map(Either::Halt),
309 )
310 .race()
311 .await;
312
313 match res {
314 Either::Future(Ok(success)) => Ok(success),
315 Either::Future(Err(e)) => {
316 tracing::warn!(
317 ?e,
318 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
319 );
320 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
321 c.cancelled().await;
322 Err(e)
323 }
324 Either::Halt(halt_result) => {
325 tracing::warn!(
326 halt_result = format_args!("{:x?}", halt_result),
327 "Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
328 );
329 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
330 let try_again = c.until_cancelled(future).await;
331
332 match try_again {
333 Ok(fut_result) => {
334 halt.already_received = Some(halt_result);
335 if let Err(e) = &fut_result {
336 tracing::warn!(
337 ?e,
338 "Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
339 );
340 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
341 c.cancelled().await;
342 }
343 fut_result
344 }
345 Err(_cancel) => match halt_result {
346 Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:x?}", halt_reason)),
347 Err(e) => Err(e).context("VM disappeared"),
348 },
349 }
350 }
351 }
352 }
353}
354
355impl PetriVmInner {
356 async fn wait_for_boot_event(&mut self) -> anyhow::Result<FirmwareEvent> {
357 self.resources
358 .firmware_event_recv
359 .recv()
360 .await
361 .context("Failed to get firmware boot event")
362 }
363
364 async fn wait_for_enlightened_shutdown_ready(
365 &mut self,
366 ) -> anyhow::Result<mesh::OneshotReceiver<()>> {
367 let recv = self
368 .resources
369 .shutdown_ic_send
370 .call(ShutdownRpc::WaitReady, ())
371 .await?;
372
373 Ok(recv)
374 }
375
376 async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
377 let shutdown_result = self
378 .resources
379 .shutdown_ic_send
380 .call(
381 ShutdownRpc::Shutdown,
382 hyperv_ic_resources::shutdown::ShutdownParams {
383 shutdown_type: match kind {
384 ShutdownKind::Shutdown => {
385 hyperv_ic_resources::shutdown::ShutdownType::PowerOff
386 }
387 ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
388 },
389 force: false,
390 },
391 )
392 .await?;
393
394 tracing::info!(?shutdown_result, "Shutdown sent");
395 anyhow::ensure!(
396 shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
397 "Got non-Ok shutdown response"
398 );
399
400 Ok(())
401 }
402
403 async fn wait_for_kvp(
404 &mut self,
405 ) -> anyhow::Result<mesh::Sender<hyperv_ic_resources::kvp::KvpRpc>> {
406 tracing::info!("Waiting for KVP IC");
407 let (send, _) = self
408 .resources
409 .kvp_ic_send
410 .call_failable(hyperv_ic_resources::kvp::KvpConnectRpc::WaitForGuest, ())
411 .await
412 .context("failed to connect to KVP IC")?;
413
414 Ok(send)
415 }
416
417 async fn save_openhcl(
418 &self,
419 new_openhcl: &ResolvedArtifact,
420 flags: OpenHclServicingFlags,
421 ) -> anyhow::Result<()> {
422 let ged_send = self
423 .resources
424 .ged_send
425 .as_ref()
426 .context("openhcl not configured")?;
427
428 let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
429 self.worker
430 .save_openhcl(ged_send, flags, igvm_file.into())
431 .await
432 }
433
434 async fn update_command_line(&mut self, command_line: &str) -> anyhow::Result<()> {
435 self.worker.update_command_line(command_line).await
436 }
437
438 async fn restore_openhcl(&self) -> anyhow::Result<()> {
439 let ged_send = self
440 .resources
441 .ged_send
442 .as_ref()
443 .context("openhcl not configured")?;
444
445 self.worker.restore_openhcl(ged_send).await
446 }
447
448 async fn set_vtl2_settings(&self, settings: &Vtl2Settings) -> anyhow::Result<()> {
449 let ged_send = self
450 .resources
451 .ged_send
452 .as_ref()
453 .context("openhcl not configured")?;
454
455 ged_send
456 .call_failable(
457 get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
458 prost::Message::encode_to_vec(settings),
459 )
460 .await?;
461
462 Ok(())
463 }
464
465 async fn reset(&mut self) -> anyhow::Result<()> {
466 tracing::info!("Resetting VM");
467 self.worker.reset().await?;
468 if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
470 agent.reset();
471
472 if self.resources.properties.using_vtl0_pipette {
473 self.launch_linux_direct_pipette().await?;
474 }
475 }
476 Ok(())
477 }
478
479 async fn wait_for_agent(&mut self, set_high_vtl: bool) -> anyhow::Result<PipetteClient> {
480 let listener = if set_high_vtl {
481 self.resources
482 .vtl2_pipette_listener
483 .as_mut()
484 .context("VM is not configured with VTL 2")?
485 } else {
486 &mut self.resources.pipette_listener
487 };
488
489 tracing::info!(set_high_vtl, "listening for pipette connection");
490 let (conn, _) = listener
491 .accept()
492 .await
493 .context("failed to accept pipette connection")?;
494 tracing::info!(set_high_vtl, "handshaking with pipette");
495 let client = PipetteClient::new(
496 &self.resources.driver,
497 PolledSocket::new(&self.resources.driver, conn)?,
498 &self.resources.output_dir,
499 )
500 .await
501 .context("failed to connect to pipette");
502 tracing::info!(set_high_vtl, "completed pipette handshake");
503 client
504 }
505
506 async fn resume(&self) -> anyhow::Result<()> {
507 self.worker.resume().await?;
508 Ok(())
509 }
510
511 async fn verify_save_restore(&self) -> anyhow::Result<()> {
512 for i in 0..2 {
513 let result = self.worker.pulse_save_restore().await;
514 match result {
515 Ok(()) => {}
516 Err(RpcError::Channel(err)) => return Err(err.into()),
517 Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
518 tracing::warn!("Reset not supported, could not test save + restore.");
519 break;
520 }
521 Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
522 return Err(anyhow::Error::from(err))
523 .context(format!("Save + restore {i} failed."));
524 }
525 }
526 }
527
528 Ok(())
529 }
530
531 async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
532 self.resources
534 .linux_direct_serial_agent
535 .as_mut()
536 .unwrap()
537 .run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
538 .await?;
539 Ok(())
540 }
541}
542
543pub struct OpenVmmInspector {
545 worker: Arc<Worker>,
546}
547
548#[async_trait]
549impl PetriVmInspector for OpenVmmInspector {
550 async fn inspect_all(&self) -> anyhow::Result<inspect::Node> {
551 Ok(self.worker.inspect_all().await)
552 }
553}
554
555pub struct OpenVmmFramebufferAccess {
557 view: View,
558}
559
560#[async_trait]
561impl PetriVmFramebufferAccess for OpenVmmFramebufferAccess {
562 async fn screenshot(
563 &mut self,
564 image: &mut Vec<u8>,
565 ) -> anyhow::Result<Option<VmScreenshotMeta>> {
566 const BYTES_PER_PIXEL: usize = 4;
571 let (width, height) = self.view.resolution();
572 let (widthsize, heightsize) = (width as usize, height as usize);
573 let len = widthsize * heightsize * BYTES_PER_PIXEL;
574
575 image.resize(len, 0);
576 for (i, line) in (0..height).zip(image.chunks_exact_mut(widthsize * BYTES_PER_PIXEL)) {
577 self.view.read_line(i, line);
578 for pixel in line.chunks_exact_mut(BYTES_PER_PIXEL) {
579 pixel.swap(0, 2);
580 pixel[3] = 0xFF;
581 }
582 }
583
584 Ok(Some(VmScreenshotMeta {
585 color: image::ExtendedColorType::Rgba8,
586 width,
587 height,
588 }))
589 }
590}