mod pci_shutdown;
pub mod vtl2_settings_worker;
use self::vtl2_settings_worker::DeviceInterfaces;
use crate::ControlRequest;
use crate::emuplat::EmuplatServicing;
use crate::emuplat::netvsp::RuntimeSavedState;
use crate::nvme_manager::NvmeManager;
use crate::options::TestScenarioConfig;
use crate::reference_time::ReferenceTime;
use crate::servicing;
use crate::servicing::NvmeSavedState;
use crate::servicing::ServicingState;
use crate::vmbus_relay_unit::VmbusRelayHandle;
use crate::worker::FirmwareType;
use crate::worker::NetworkSettingsError;
use anyhow::Context;
use async_trait::async_trait;
use futures::FutureExt;
use futures::StreamExt;
use futures_concurrency::future::Join;
use get_protocol::SaveGuestVtl2StateFlags;
use guest_emulation_transport::api::GuestSaveRequest;
use guid::Guid;
use hyperv_ic_resources::shutdown::ShutdownParams;
use hyperv_ic_resources::shutdown::ShutdownResult;
use hyperv_ic_resources::shutdown::ShutdownRpc;
use hyperv_ic_resources::shutdown::ShutdownType;
use igvm_defs::MemoryMapEntryType;
use inspect::Inspect;
use mesh::CancelContext;
use mesh::MeshPayload;
use mesh::error::RemoteError;
use mesh::rpc::FailableRpc;
use mesh::rpc::Rpc;
use mesh::rpc::RpcSend;
use mesh_worker::WorkerRpc;
use net_packet_capture::PacketCaptureParams;
use openhcl_dma_manager::DmaClientSpawner;
use openhcl_dma_manager::OpenhclDmaManager;
use pal_async::task::Spawn;
use pal_async::task::Task;
use parking_lot::Mutex;
use socket2::Socket;
use state_unit::SavedStateUnit;
use state_unit::SpawnedUnit;
use state_unit::StateUnits;
use std::sync::Arc;
use std::time::Duration;
use tracing::Instrument;
use tracing::instrument;
use uevent::UeventListener;
use underhill_threadpool::AffinitizedThreadpool;
use virt::IsolationType;
use virt_mshv_vtl::UhPartition;
use virt_mshv_vtl::VtlCrash;
use vm_resource::ResourceResolver;
use vm_topology::memory::MemoryRangeWithNode;
use vmbus_channel::channel::VmbusDevice;
use vmcore::vmtime::VmTimeKeeper;
use vmm_core::input_distributor::InputDistributor;
use vmm_core::partition_unit::PartitionUnit;
use vmm_core::vmbus_unit::ChannelUnit;
use vmm_core::vmbus_unit::VmbusServerHandle;
use vmotherboard::ChipsetDevices;
use vtl2_settings_worker::Vtl2ConfigNicRpc;
use vtl2_settings_worker::Vtl2SettingsWorker;
use vtl2_settings_worker::handle_vtl2_config_rpc;
#[derive(MeshPayload)]
pub enum UhVmRpc {
Pause(Rpc<(), bool>),
Resume(Rpc<(), bool>),
Save(FailableRpc<(), Vec<u8>>),
ClearHalt(Rpc<(), bool>), PacketCapture(FailableRpc<PacketCaptureParams<Socket>, PacketCaptureParams<Socket>>),
}
#[async_trait]
pub trait LoadedVmNetworkSettings: Inspect {
async fn prepare_for_hibernate(&self, rollback: bool);
async fn modify_network_settings(
&mut self,
instance_id: Guid,
subordinate_instance_id: Option<Guid>,
) -> anyhow::Result<()>;
async fn add_network(
&mut self,
instance_id: Guid,
subordinate_instance_id: Option<Guid>,
max_sub_channels: Option<u16>,
threadpool: &AffinitizedThreadpool,
uevent_listener: &UeventListener,
servicing_netvsp_state: &Option<Vec<crate::emuplat::netvsp::SavedState>>,
partition: Arc<UhPartition>,
state_units: &StateUnits,
vmbus_server: &Option<VmbusServerHandle>,
dma_client_spawner: DmaClientSpawner,
is_isolated: bool,
) -> anyhow::Result<RuntimeSavedState>;
async fn remove_network(&mut self, instance_id: Guid) -> anyhow::Result<()>;
async fn unload_for_servicing(&mut self);
async fn packet_capture(
&self,
mut params: PacketCaptureParams<Socket>,
) -> anyhow::Result<PacketCaptureParams<Socket>>;
}
pub(crate) struct LoadedVm {
pub partition_unit: PartitionUnit,
pub memory: underhill_mem::MemoryMappings,
pub firmware_type: FirmwareType,
pub isolation: IsolationType,
pub _chipset_devices: ChipsetDevices,
pub _vmtime: SpawnedUnit<VmTimeKeeper>,
pub _halt_task: Task<()>,
pub uevent_listener: Arc<UeventListener>,
pub resolver: ResourceResolver,
pub nvme_manager: Option<NvmeManager>,
pub emuplat_servicing: EmuplatServicing,
pub device_interfaces: Option<DeviceInterfaces>,
pub vmbus_client: Option<vmbus_client::VmbusClient>,
pub vtl0_memory_map: Vec<(MemoryRangeWithNode, MemoryMapEntryType)>,
pub partition: Arc<UhPartition>,
pub state_units: StateUnits,
pub last_state_unit_stop: Option<ReferenceTime>,
pub vmbus_server: Option<VmbusServerHandle>,
pub host_vmbus_relay: Option<VmbusRelayHandle>,
pub _vmbus_devices: Vec<SpawnedUnit<ChannelUnit<dyn VmbusDevice>>>,
pub _vmbus_intercept_devices: Vec<mesh::OneshotSender<()>>,
pub _ide_accel_devices: Vec<SpawnedUnit<ChannelUnit<storvsp::StorageDevice>>>,
pub network_settings: Option<Box<dyn LoadedVmNetworkSettings>>,
pub shutdown_relay: Option<(
mesh::Receiver<Rpc<ShutdownParams, ShutdownResult>>,
mesh::Sender<ShutdownRpc>,
)>,
pub vmgs_thin_client: vmgs_broker::VmgsThinClient,
pub vmgs_disk_metadata: disk_get_vmgs::save_restore::SavedBlockStorageMetadata,
pub _vmgs_handle: Task<()>,
pub get_client: guest_emulation_transport::GuestEmulationTransportClient,
pub device_platform_settings:
guest_emulation_transport::api::platform_settings::DevicePlatformSettings,
pub runtime_params: crate::loader::vtl2_config::RuntimeParameters,
pub _input_distributor: SpawnedUnit<InputDistributor>,
pub crash_notification_recv: mesh::Receiver<VtlCrash>,
pub control_send: Arc<Mutex<Option<mesh::Sender<ControlRequest>>>>,
pub _periodic_telemetry_task: Task<()>,
pub nvme_keep_alive: bool,
pub test_configuration: Option<TestScenarioConfig>,
pub dma_manager: OpenhclDmaManager,
}
pub struct LoadedVmState<T> {
pub restart_rpc: FailableRpc<(), T>,
pub servicing_state: ServicingState,
pub vm_rpc: mesh::Receiver<UhVmRpc>,
pub control_send: mesh::Sender<ControlRequest>,
}
impl LoadedVm {
pub async fn run<T: 'static + MeshPayload + Send>(
mut self,
threadpool: &AffinitizedThreadpool,
autostart_vps: bool,
correlation_id: Option<Guid>,
mut vm_rpc: mesh::Receiver<UhVmRpc>,
mut worker_rpc: mesh::Receiver<WorkerRpc<T>>,
) -> Option<LoadedVmState<T>> {
if autostart_vps {
self.start(correlation_id).await;
}
let (device_config_send, mut device_config_recv) = mesh::channel();
let _vtl2_settings_service_handle = {
let initial_settings = self
.device_platform_settings
.general
.vtl2_settings
.as_ref()
.map_or_else(Default::default, |settings| settings.dynamic.clone());
let mut vtl2_settings_worker = Vtl2SettingsWorker::new(
initial_settings,
device_config_send,
self.get_client.clone(),
self.device_interfaces.take().unwrap(),
);
threadpool.spawn("VTL2 settings services", {
let uevent_listener = self.uevent_listener.clone();
async move { vtl2_settings_worker.run(&uevent_listener).await }
})
};
let mut save_request_recv = self
.get_client
.take_save_request_recv()
.await
.expect("no failure");
let state = loop {
enum Event<T> {
WorkerRpc(WorkerRpc<T>),
WorkerRpcGone,
Vtl2ConfigNicRpc(Vtl2ConfigNicRpc),
UhVmRpc(UhVmRpc),
VtlCrash(VtlCrash),
ServicingRequest(GuestSaveRequest),
ShutdownRequest(Rpc<ShutdownParams, ShutdownResult>),
}
let event: Event<T> = futures::select! { message = worker_rpc.next() => message.map_or(Event::WorkerRpcGone, Event::WorkerRpc),
message = device_config_recv.select_next_some() => Event::Vtl2ConfigNicRpc(message),
message = vm_rpc.select_next_some() => Event::UhVmRpc(message),
message = self.crash_notification_recv.select_next_some() => Event::VtlCrash(message),
message = save_request_recv.select_next_some() => Event::ServicingRequest(message),
message = async {
if self.shutdown_relay.is_none() {
std::future::pending::<()>().await;
}
let (recv, _) = self.shutdown_relay.as_mut().unwrap();
recv.select_next_some().await
}.fuse() => Event::ShutdownRequest(message),
};
match event {
Event::WorkerRpcGone => break None,
Event::WorkerRpc(message) => match message {
WorkerRpc::Stop => break None,
WorkerRpc::Restart(rpc) => {
let state = async {
let running = self.stop().await;
match self.save(None, false).await {
Ok(servicing_state) => Some((rpc, servicing_state)),
Err(err) => {
if running {
self.start(None).await;
}
rpc.complete(Err(RemoteError::new(err)));
None
}
}
}
.instrument(tracing::info_span!("restart"))
.await;
if let Some((rpc, servicing_state)) = state {
break Some(LoadedVmState {
restart_rpc: rpc,
servicing_state,
vm_rpc,
control_send: self.control_send.lock().take().unwrap(),
});
}
}
WorkerRpc::Inspect(deferred) => deferred.respond(|resp| {
resp.field("threadpool", threadpool)
.merge(&self.state_units);
resp.child("init_data", |req| {
req.respond().field("dps", &self.device_platform_settings);
});
resp.field("runtime_params", &self.runtime_params);
resp.field("get", &self.get_client);
resp.field("vmgs", &self.vmgs_thin_client);
resp.field("network", &self.network_settings);
resp.field("nvme", &self.nvme_manager);
resp.field("resolver", &self.resolver);
resp.field(
"vtl0_memory_map",
inspect_helpers::vtl0_memory_map(&self.vtl0_memory_map),
);
resp.field("memory", &self.memory);
resp.field("dma_manager", &self.dma_manager);
}),
},
Event::Vtl2ConfigNicRpc(message) => {
handle_vtl2_config_rpc(message, &mut self, threadpool).await
}
Event::UhVmRpc(msg) => match msg {
UhVmRpc::Resume(rpc) => {
rpc.handle(async |()| {
if !self.state_units.is_running() {
self.start(None).await;
true
} else {
false
}
})
.await
}
UhVmRpc::Pause(rpc) => rpc.handle(async |()| self.stop().await).await,
UhVmRpc::Save(rpc) => {
rpc.handle_failable(async |()| {
let running = self.stop().await;
let r = self.save(None, false).await;
if running {
self.start(None).await;
}
r.map(mesh::payload::encode)
})
.await
}
UhVmRpc::ClearHalt(rpc) => {
rpc.handle(async |()| self.partition_unit.clear_halt().await)
.await
}
UhVmRpc::PacketCapture(rpc) => {
rpc.handle_failable(async |params| {
let network_settings = self
.network_settings
.as_ref()
.context("No network settings have been set up")?;
network_settings.packet_capture(params).await
})
.await
}
},
Event::ServicingRequest(message) => {
let GuestSaveRequest {
correlation_id,
deadline,
capabilities_flags,
} = message;
match self
.handle_servicing_request(correlation_id, deadline, capabilities_flags)
.await
{
Ok(true) => {
}
Ok(false) => {
continue;
}
Err(err) => {
tracing::error!(
error = err.as_ref() as &dyn std::error::Error,
"failed to notify host of servicing result"
);
break None;
}
}
}
Event::ShutdownRequest(rpc) => {
rpc.handle(async |msg| {
if matches!(msg.shutdown_type, ShutdownType::Hibernate) {
self.handle_hibernate_request(false).await;
}
let (_, send_guest) =
self.shutdown_relay.as_mut().expect("active shutdown_relay");
tracing::info!(params = ?msg, "Relaying shutdown message");
let result = match send_guest.call(ShutdownRpc::Shutdown, msg).await {
Ok(result) => result,
Err(err) => {
tracing::error!(
error = &err as &dyn std::error::Error,
"Failed to relay shutdown notification to guest"
);
ShutdownResult::Failed(0x80000001)
}
};
if !matches!(result, ShutdownResult::Ok) {
tracing::warn!(?result, "Shutdown request failed");
self.handle_hibernate_request(true).await;
}
result
})
.await
}
Event::VtlCrash(vtl_crash) => self.notify_of_vtl_crash(vtl_crash),
}
};
let _client_notify_send = self.partition_unit.teardown().await;
if let Some(vmbus_relay) = self.host_vmbus_relay {
vmbus_relay.teardown().await;
}
if let Some(vmbus) = self.vmbus_server {
vmbus.remove().await.shutdown().await;
}
state
}
async fn handle_servicing_request(
&mut self,
correlation_id: Guid,
deadline: std::time::Instant,
capabilities_flags: SaveGuestVtl2StateFlags,
) -> anyhow::Result<bool> {
if let Some(TestScenarioConfig::SaveStuck) = self.test_configuration {
tracing::info!("Test configuration SERVICING_SAVE_STUCK is set. Waiting indefinitely.");
std::future::pending::<()>().await;
}
let running = self.state_units.is_running();
let success = match self
.handle_servicing_inner(correlation_id, deadline, capabilities_flags)
.await
.and_then(|state| {
if let Some(TestScenarioConfig::SaveFail) = self.test_configuration {
tracing::info!(
"Test configuration SERVICING_SAVE_FAIL is set. Failing the save."
);
return Err(anyhow::anyhow!("Simulated servicing save failure"));
}
Ok(state)
}) {
Ok(state) => {
self.get_client
.send_servicing_state(mesh::payload::encode(state))
.await?;
true
}
Err(err) => {
tracing::error!(
error = err.as_ref() as &dyn std::error::Error,
"error while handling servicing"
);
self.get_client
.send_servicing_failure(format_args!("{:#}", err))
.await
.context("failed to notify host of servicing-while-paused failure")?;
if running {
self.start(Some(correlation_id)).await;
}
false
}
};
Ok(success)
}
async fn handle_servicing_inner(
&mut self,
correlation_id: Guid,
deadline: std::time::Instant,
capabilities_flags: SaveGuestVtl2StateFlags,
) -> anyhow::Result<ServicingState> {
if self.isolation.is_isolated() {
anyhow::bail!("Servicing is not yet supported for isolated VMs");
}
let nvme_keepalive = self.nvme_keep_alive && capabilities_flags.enable_nvme_keepalive();
let r = async {
if !self.stop().await {
anyhow::bail!("cannot service underhill while paused");
}
let mut state = self.save(Some(deadline), nvme_keepalive).await?;
state.init_state.correlation_id = Some(correlation_id);
let shutdown_mana = async {
if let Some(network_settings) = self.network_settings.as_mut() {
network_settings
.unload_for_servicing()
.instrument(tracing::info_span!("shutdown_mana"))
.await;
}
};
let shutdown_nvme = async {
if let Some(nvme_manager) = self.nvme_manager.take() {
nvme_manager
.shutdown(nvme_keepalive)
.instrument(tracing::info_span!("shutdown_nvme_vfio", %correlation_id, %nvme_keepalive))
.await;
}
};
let shutdown_pci = async {
pci_shutdown::shutdown_pci_devices()
.instrument(tracing::info_span!("shutdown_pci_devices"))
.await
};
let (r, (), ()) = (shutdown_pci, shutdown_mana, shutdown_nvme).join().await;
r?;
Ok(state)
}
.instrument(tracing::info_span!("servicing_save_vtl2", %correlation_id))
.await;
let mut state = match r {
Ok(state) => state,
Err(err) => {
self.resume_drivers();
return Err(err);
}
};
state.init_state.flush_logs_result = Some({
let ctx = CancelContext::new().with_timeout(Duration::from_secs(1));
let now = std::time::Instant::now();
let call = self
.control_send
.lock()
.as_ref()
.unwrap()
.call(ControlRequest::FlushLogs, ctx);
let error = call
.await
.map_err(anyhow::Error::from)
.and_then(|x| x.map_err(anyhow::Error::from))
.err()
.map(|err| format!("{err:#}"));
servicing::FlushLogsResult {
duration_us: now.elapsed().as_micros() as u64,
error,
}
});
Ok(state)
}
async fn handle_hibernate_request(&self, rollback: bool) {
if let Some(network_settings) = &self.network_settings {
if !rollback {
network_settings
.prepare_for_hibernate(rollback)
.instrument(tracing::info_span!("prepare_for_guest_hibernate"))
.await;
} else {
network_settings
.prepare_for_hibernate(rollback)
.instrument(tracing::info_span!("rollback_prepare_for_guest_hibernate"))
.await;
};
}
}
async fn start(&mut self, correlation_id: Option<Guid>) {
self.state_units.start().await;
let reference_time = ReferenceTime::new(self.partition.reference_time());
if let Some(stopped) = self.last_state_unit_stop {
let blackout_time = reference_time.since(stopped);
tracing::info!(
correlation_id = %correlation_id.unwrap_or(Guid::ZERO),
blackout_time_ms = blackout_time.map(|t| t.as_millis() as u64),
blackout_time = blackout_time
.map_or_else(|| "unknown".to_string(), |t| format!("{:?}", t))
.as_str(),
"resuming VM"
);
} else {
let boot_time = reference_time.since(ReferenceTime::new(0));
tracing::info!(
boot_time_ms = boot_time.map(|t| t.as_millis() as u64),
boot_time = boot_time
.map_or_else(|| "unknown".to_string(), |t| format!("{:?}", t))
.as_str(),
"starting VM"
)
}
}
async fn stop(&mut self) -> bool {
if self.state_units.is_running() {
self.last_state_unit_stop = Some(ReferenceTime::new(self.partition.reference_time()));
tracing::info!("stopping VM");
self.state_units.stop().await;
true
} else {
false
}
}
fn resume_drivers(&mut self) {
if let Some(client) = &mut self.vmbus_client {
client.start();
}
}
async fn save(
&mut self,
_deadline: Option<std::time::Instant>,
vf_keepalive_flag: bool,
) -> anyhow::Result<ServicingState> {
assert!(!self.state_units.is_running());
let emuplat = (self.emuplat_servicing.save()).context("emuplat save failed")?;
let nvme_state = if let Some(n) = &self.nvme_manager {
n.save(vf_keepalive_flag)
.instrument(tracing::info_span!("nvme_manager_save"))
.await
.map(|s| NvmeSavedState { nvme_state: s })
} else {
None
};
let units = self.save_units().await.context("state unit save failed")?;
let vmgs = self
.vmgs_thin_client
.save()
.await
.context("vmgs save failed")?;
let dma_manager_state = if vf_keepalive_flag {
use vmcore::save_restore::SaveRestore;
Some(self.dma_manager.save().context("dma_manager save failed")?)
} else {
None
};
let vmbus_client = if let Some(vmbus_client) = &mut self.vmbus_client {
vmbus_client.stop().await;
Some(vmbus_client.save().await)
} else {
None
};
let mut state = ServicingState {
init_state: servicing::ServicingInitState {
firmware_type: self.firmware_type.into(),
vm_stop_reference_time: self.last_state_unit_stop.unwrap().as_100ns(),
correlation_id: None,
emuplat,
flush_logs_result: None,
vmgs: (vmgs, self.vmgs_disk_metadata.clone()),
overlay_shutdown_device: self.shutdown_relay.is_some(),
nvme_state,
dma_manager_state,
vmbus_client,
},
units,
};
state
.fix_pre_save()
.context("failed to fix up servicing state before save")?;
Ok(state)
}
#[instrument(skip(self))]
async fn save_units(&mut self) -> anyhow::Result<Vec<SavedStateUnit>> {
Ok(self.state_units.save().await?)
}
#[instrument(skip(self, saved_state))]
pub async fn restore_units(&mut self, saved_state: Vec<SavedStateUnit>) -> anyhow::Result<()> {
self.state_units.restore(saved_state).await?;
Ok(())
}
fn notify_of_vtl_crash(&self, vtl_crash: VtlCrash) {
tracing::info!("Notifying the host of the guest system crash {vtl_crash:x?}");
let VtlCrash {
vp_index,
last_vtl,
control,
parameters,
} = vtl_crash;
self.get_client.notify_of_vtl_crash(
vp_index.index(),
last_vtl.into(),
control.into(),
parameters,
);
}
async fn add_vf_manager(
&mut self,
threadpool: &AffinitizedThreadpool,
instance_id: Guid,
subordinate_instance_id: Option<Guid>,
max_sub_channels: Option<u16>,
) -> anyhow::Result<()> {
if self.network_settings.is_none() {
return Err(NetworkSettingsError::NetworkSettingsMissing.into());
}
let save_state = self
.network_settings
.as_mut()
.unwrap()
.add_network(
instance_id,
subordinate_instance_id,
max_sub_channels,
threadpool,
&self.uevent_listener,
&None, self.partition.clone(),
&self.state_units,
&self.vmbus_server,
self.dma_manager.client_spawner(),
self.isolation.is_isolated(),
)
.await?;
self.state_units.start_stopped_units().await;
self.emuplat_servicing.netvsp_state.push(save_state);
Ok(())
}
async fn remove_vf_manager(&mut self, instance_id: Guid) -> anyhow::Result<()> {
if self.network_settings.is_none() {
return Err(NetworkSettingsError::NetworkSettingsMissing.into());
}
self.network_settings
.as_mut()
.unwrap()
.remove_network(instance_id)
.await?;
if let Some(index) = self
.emuplat_servicing
.netvsp_state
.iter()
.position(|value| value.instance_id == instance_id)
{
self.emuplat_servicing.netvsp_state.swap_remove(index);
} else {
return Err(NetworkSettingsError::RuntimeSavedStateMissing(instance_id).into());
}
Ok(())
}
}
mod inspect_helpers {
use super::*;
fn inspect_memory_map_entry_type(typ: &MemoryMapEntryType) -> impl Inspect + '_ {
inspect::adhoc(|req| match *typ {
MemoryMapEntryType::MEMORY => req.value("MEMORY".into()),
MemoryMapEntryType::PERSISTENT => req.value("PERSISTENT".into()),
MemoryMapEntryType::PLATFORM_RESERVED => req.value("PLATFORM_RESERVED".into()),
MemoryMapEntryType::VTL2_PROTECTABLE => req.value("VTL2_PROTECTABLE".into()),
_ => req.value(typ.0.into()),
})
}
pub(super) fn vtl0_memory_map(
memory: &[(MemoryRangeWithNode, MemoryMapEntryType)],
) -> impl Inspect + '_ {
inspect::iter_by_key(memory.iter().map(|(entry, typ)| {
(
entry.range,
inspect::adhoc(|req| {
req.respond()
.field("length", inspect::AsHex(entry.range.len()))
.field("type", inspect_memory_map_entry_type(typ))
.field("vnode", entry.vnode);
}),
)
}))
}
}