use super::PetriVmResourcesOpenVmm;
use crate::OpenHclServicingFlags;
use crate::PetriVm;
use crate::ShutdownKind;
use crate::openhcl_diag::OpenHclDiagHandler;
use crate::worker::Worker;
use anyhow::Context;
use async_trait::async_trait;
use futures::FutureExt;
use futures_concurrency::future::Race;
use hvlite_defs::rpc::PulseSaveRestoreError;
use hyperv_ic_resources::shutdown::ShutdownRpc;
use mesh::CancelContext;
use mesh::Receiver;
use mesh::RecvError;
use mesh::rpc::RpcError;
use mesh::rpc::RpcSend;
use mesh_process::Mesh;
use pal_async::DefaultDriver;
use pal_async::socket::PolledSocket;
use pal_async::task::Task;
use pal_async::timer::PolledTimer;
use petri_artifacts_common::tags::GuestQuirks;
use petri_artifacts_core::ResolvedArtifact;
use pipette_client::PipetteClient;
use std::future::Future;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use unix_socket::UnixListener;
use vmm_core_defs::HaltReason;
pub struct PetriVmOpenVmm {
inner: PetriVmInner,
halt: PetriVmHaltReceiver,
}
#[async_trait]
impl PetriVm for PetriVmOpenVmm {
async fn wait_for_halt(&mut self) -> anyhow::Result<HaltReason> {
Self::wait_for_halt(self).await
}
async fn wait_for_teardown(self: Box<Self>) -> anyhow::Result<HaltReason> {
Self::wait_for_teardown(*self).await
}
async fn test_inspect_openhcl(&mut self) -> anyhow::Result<()> {
Self::test_inspect_openhcl(self).await
}
async fn wait_for_agent(&mut self) -> anyhow::Result<PipetteClient> {
Self::wait_for_agent(self).await
}
async fn wait_for_vtl2_ready(&mut self) -> anyhow::Result<()> {
Self::wait_for_vtl2_ready(self).await
}
async fn wait_for_successful_boot_event(&mut self) -> anyhow::Result<()> {
Self::wait_for_successful_boot_event(self).await
}
async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
Self::send_enlightened_shutdown(self, kind).await
}
}
pub(super) struct PetriVmInner {
pub(super) resources: PetriVmResourcesOpenVmm,
pub(super) mesh: Mesh,
pub(super) worker: Arc<Worker>,
pub(super) watchdog_tasks: Vec<Task<()>>,
pub(super) quirks: GuestQuirks,
}
struct PetriVmHaltReceiver {
halt_notif: Receiver<HaltReason>,
already_received: Option<Result<HaltReason, RecvError>>,
}
macro_rules! petri_vm_fn {
($(#[$($attrss:tt)*])* $vis:vis async fn $fn_name:ident (&mut self $(,$arg:ident: $ty:ty)*) $(-> $ret:ty)?) => {
$(#[$($attrss)*])*
$vis async fn $fn_name(&mut self, $($arg:$ty,)*) $(-> $ret)? {
Self::wait_for_halt_or_internal(&mut self.halt, self.inner.$fn_name($($arg,)*)).await
}
};
}
impl PetriVmOpenVmm {
pub(super) fn new(inner: PetriVmInner, halt_notif: Receiver<HaltReason>) -> Self {
Self {
inner,
halt: PetriVmHaltReceiver {
halt_notif,
already_received: None,
},
}
}
pub fn vtl2_vsock_path(&self) -> anyhow::Result<&Path> {
self.inner
.resources
.vtl2_vsock_path
.as_deref()
.context("VM is not configured with OpenHCL")
}
pub async fn wait_for_halt(&mut self) -> anyhow::Result<HaltReason> {
if let Some(already) = self.halt.already_received.take() {
already.map_err(anyhow::Error::from)
} else {
self.halt
.halt_notif
.recv()
.await
.context("Failed to get halt reason")
}
}
pub async fn wait_for_teardown(mut self) -> anyhow::Result<HaltReason> {
let halt_reason = self.wait_for_halt().await?;
tracing::info!(?halt_reason, "Got halt reason, cancelling watchdogs");
futures::future::join_all(self.inner.watchdog_tasks.into_iter().map(|t| t.cancel())).await;
tracing::info!(?halt_reason, "Cancelled watchdogs, waiting for worker");
let worker = Arc::into_inner(self.inner.worker)
.expect("Watchdog task was cancelled, we should be the only ref left");
worker.shutdown().await?;
tracing::info!("Worker quit, waiting for mesh");
self.inner.mesh.shutdown().await;
tracing::info!("Mesh shutdown, waiting for logging tasks");
for t in self.inner.resources.log_stream_tasks {
t.await?;
}
Ok(halt_reason)
}
petri_vm_fn!(
pub async fn openhcl_core_dump(&mut self, name: &str, path: &Path) -> anyhow::Result<()>
);
petri_vm_fn!(
pub async fn openhcl_crash(&mut self, name: &str) -> anyhow::Result<()>
);
petri_vm_fn!(
pub async fn wait_for_successful_boot_event(&mut self) -> anyhow::Result<()>
);
petri_vm_fn!(
pub async fn wait_for_enlightened_shutdown_ready(&mut self) -> anyhow::Result<mesh::OneshotReceiver<()>>
);
petri_vm_fn!(
pub async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()>
);
petri_vm_fn!(
pub async fn restart_openhcl(
&mut self,
new_openhcl: ResolvedArtifact<impl petri_artifacts_common::tags::IsOpenhclIgvm>,
flags: OpenHclServicingFlags
) -> anyhow::Result<()>
);
petri_vm_fn!(
pub async fn reset(&mut self) -> anyhow::Result<()>
);
petri_vm_fn!(
pub async fn test_inspect_openhcl(&mut self) -> anyhow::Result<()>
);
petri_vm_fn!(
pub async fn wait_for_agent(&mut self) -> anyhow::Result<PipetteClient>
);
petri_vm_fn!(
pub async fn wait_for_vtl2_ready(&mut self) -> anyhow::Result<()>
);
petri_vm_fn!(
pub async fn wait_for_vtl2_agent(&mut self) -> anyhow::Result<PipetteClient>
);
petri_vm_fn!(
pub async fn modify_vtl2_settings(&mut self, settings: &vtl2_settings_proto::Vtl2Settings) -> anyhow::Result<()>
);
petri_vm_fn!(pub(crate) async fn resume(&mut self) -> anyhow::Result<()>);
petri_vm_fn!(pub(crate) async fn verify_save_restore(&mut self) -> anyhow::Result<()>);
petri_vm_fn!(pub(crate) async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()>);
pub async fn wait_for_halt_or<T, F: Future<Output = anyhow::Result<T>>>(
&mut self,
future: F,
) -> anyhow::Result<T> {
Self::wait_for_halt_or_internal(&mut self.halt, future).await
}
async fn wait_for_halt_or_internal<T, F: Future<Output = anyhow::Result<T>>>(
halt: &mut PetriVmHaltReceiver,
future: F,
) -> anyhow::Result<T> {
let future = &mut std::pin::pin!(future);
enum Either<T> {
Future(anyhow::Result<T>),
Halt(Result<HaltReason, RecvError>),
}
let res = (
future.map(Either::Future),
halt.halt_notif.recv().map(Either::Halt),
)
.race()
.await;
match res {
Either::Future(Ok(success)) => Ok(success),
Either::Future(Err(e)) => {
tracing::warn!(
?e,
"Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
);
let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
c.cancelled().await;
Err(e)
}
Either::Halt(halt_result) => {
tracing::warn!(
?halt_result,
"Halt channel returned while waiting for other future, sleeping for 5 seconds to let outstanding work finish"
);
let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
let try_again = c.until_cancelled(future).await;
match try_again {
Ok(fut_result) => {
halt.already_received = Some(halt_result);
if let Err(e) = &fut_result {
tracing::warn!(
?e,
"Future returned with an error, sleeping for 5 seconds to let outstanding work finish"
);
let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
c.cancelled().await;
}
fut_result
}
Err(_cancel) => match halt_result {
Ok(halt_reason) => Err(anyhow::anyhow!("VM halted: {:?}", halt_reason)),
Err(e) => Err(e).context("VM disappeared"),
},
}
}
}
}
}
impl PetriVmInner {
async fn openhcl_core_dump(&self, name: &str, path: &Path) -> anyhow::Result<()> {
self.openhcl_diag()?.core_dump(name, path).await
}
async fn openhcl_crash(&self, name: &str) -> anyhow::Result<()> {
self.openhcl_diag()?.crash(name).await
}
async fn wait_for_successful_boot_event(&mut self) -> anyhow::Result<()> {
if let Some(expected_event) = self.resources.expected_boot_event {
let event = self
.resources
.firmware_event_recv
.recv()
.await
.context("Failed to get firmware boot event")?;
anyhow::ensure!(
event == expected_event,
"Did not receive expected successful boot event"
);
} else {
tracing::warn!("Configured firmware does not emit a boot event, skipping");
}
Ok(())
}
async fn wait_for_enlightened_shutdown_ready(
&mut self,
) -> anyhow::Result<mesh::OneshotReceiver<()>> {
tracing::info!("Waiting for shutdown ic ready");
let recv = self
.resources
.shutdown_ic_send
.call(ShutdownRpc::WaitReady, ())
.await?;
Ok(recv)
}
async fn send_enlightened_shutdown(&mut self, kind: ShutdownKind) -> anyhow::Result<()> {
self.wait_for_enlightened_shutdown_ready().await?;
if let Some(duration) = self.quirks.hyperv_shutdown_ic_sleep {
tracing::info!("QUIRK: Waiting for {:?}", duration);
PolledTimer::new(&self.resources.driver)
.sleep(duration)
.await;
}
tracing::info!("Sending shutdown command");
let shutdown_result = self
.resources
.shutdown_ic_send
.call(
ShutdownRpc::Shutdown,
hyperv_ic_resources::shutdown::ShutdownParams {
shutdown_type: match kind {
ShutdownKind::Shutdown => {
hyperv_ic_resources::shutdown::ShutdownType::PowerOff
}
ShutdownKind::Reboot => hyperv_ic_resources::shutdown::ShutdownType::Reboot,
},
force: false,
},
)
.await?;
tracing::info!(?shutdown_result, "Shutdown sent");
anyhow::ensure!(
shutdown_result == hyperv_ic_resources::shutdown::ShutdownResult::Ok,
"Got non-Ok shutdown response"
);
Ok(())
}
async fn restart_openhcl(
&self,
new_openhcl: ResolvedArtifact<impl petri_artifacts_common::tags::IsOpenhclIgvm>,
flags: OpenHclServicingFlags,
) -> anyhow::Result<()> {
let ged_send = self
.resources
.ged_send
.as_ref()
.context("openhcl not configured")?;
let igvm_file = fs_err::File::open(new_openhcl).context("failed to open igvm file")?;
self.worker
.restart_openhcl(ged_send, flags, igvm_file.into())
.await
}
async fn modify_vtl2_settings(
&self,
settings: &vtl2_settings_proto::Vtl2Settings,
) -> anyhow::Result<()> {
let ged_send = self
.resources
.ged_send
.as_ref()
.context("openhcl not configured")?;
ged_send
.call_failable(
get_resources::ged::GuestEmulationRequest::ModifyVtl2Settings,
prost::Message::encode_to_vec(settings),
)
.await?;
Ok(())
}
async fn reset(&mut self) -> anyhow::Result<()> {
tracing::info!("Resetting VM");
self.worker.reset().await?;
if let Some(agent) = self.resources.linux_direct_serial_agent.as_mut() {
agent.reset();
self.launch_linux_direct_pipette().await?;
}
Ok(())
}
async fn test_inspect_openhcl(&self) -> anyhow::Result<()> {
self.openhcl_diag()?.test_inspect().await
}
async fn wait_for_agent(&mut self) -> anyhow::Result<PipetteClient> {
Self::wait_for_agent_core(
&self.resources.driver,
&mut self.resources.pipette_listener,
&self.resources.output_dir,
)
.await
}
async fn wait_for_vtl2_ready(&mut self) -> anyhow::Result<()> {
self.openhcl_diag()?.wait_for_vtl2().await
}
async fn wait_for_vtl2_agent(&mut self) -> anyhow::Result<PipetteClient> {
self.launch_vtl2_pipette().await?;
Self::wait_for_agent_core(
&self.resources.driver,
self.resources
.vtl2_pipette_listener
.as_mut()
.context("VM is not configured with VTL 2")?,
&self.resources.output_dir,
)
.await
}
async fn wait_for_agent_core(
driver: &DefaultDriver,
listener: &mut PolledSocket<UnixListener>,
output_dir: &Path,
) -> anyhow::Result<PipetteClient> {
tracing::info!("listening for pipette connection");
let (conn, _) = listener
.accept()
.await
.context("failed to accept pipette connection")?;
tracing::info!("handshaking with pipette");
let client = PipetteClient::new(&driver, PolledSocket::new(driver, conn)?, output_dir)
.await
.context("failed to connect to pipette");
tracing::info!("completed pipette handshake");
client
}
async fn resume(&self) -> anyhow::Result<()> {
self.worker.resume().await?;
Ok(())
}
async fn verify_save_restore(&self) -> anyhow::Result<()> {
for i in 0..2 {
let result = self.worker.pulse_save_restore().await;
match result {
Ok(()) => {}
Err(RpcError::Channel(err)) => return Err(err.into()),
Err(RpcError::Call(PulseSaveRestoreError::ResetNotSupported)) => {
tracing::warn!("Reset not supported, could not test save + restore.");
break;
}
Err(RpcError::Call(PulseSaveRestoreError::Other(err))) => {
return Err(anyhow::Error::from(err))
.context(format!("Save + restore {i} failed."));
}
}
}
Ok(())
}
async fn launch_linux_direct_pipette(&mut self) -> anyhow::Result<()> {
self.resources
.linux_direct_serial_agent
.as_mut()
.unwrap()
.run_command("mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'")
.await?;
Ok(())
}
async fn launch_vtl2_pipette(&mut self) -> anyhow::Result<()> {
let res = self
.openhcl_diag()?
.run_vtl2_command(
"sh",
&[
"-c",
"mkdir /cidata && mount LABEL=cidata /cidata && sh -c '/cidata/pipette &'",
],
)
.await?;
if !res.exit_status.success() {
anyhow::bail!("Failed to start VTL 2 pipette: {:?}", res);
}
Ok(())
}
fn openhcl_diag(&self) -> anyhow::Result<&OpenHclDiagHandler> {
if let Some(ohd) = &self.resources.openhcl_diag_handler {
Ok(ohd)
} else {
anyhow::bail!("VM is not configured with OpenHCL")
}
}
}