Skip to main content

underhill_core/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! This module implements the interactive control process and the entry point
5//! for the underhill environment.
6
7#![cfg(target_os = "linux")]
8#![expect(missing_docs)]
9#![forbid(unsafe_code)]
10
11mod diag;
12mod dispatch;
13mod emuplat;
14mod get_tracing;
15mod inspect_internal;
16mod inspect_proc;
17mod livedump;
18mod loader;
19mod nvme_manager;
20mod options;
21mod reference_time;
22mod servicing;
23mod threadpool_vm_task_backend;
24mod vmbus_relay_unit;
25mod vmgs_logger;
26mod vp;
27mod vpci;
28mod worker;
29mod wrapped_partition;
30
31// `pub` so that the missing_docs warning fires for options without
32// documentation.
33pub use options::Options;
34
35use crate::diag::DiagWorker;
36use crate::dispatch::UhVmRpc;
37use crate::worker::UnderhillEnvCfg;
38use crate::worker::UnderhillRemoteConsoleCfg;
39use crate::worker::UnderhillVmWorker;
40use crate::worker::UnderhillWorkerParameters;
41use anyhow::Context;
42use bootloader_fdt_parser::BootTimes;
43use cvm_tracing::CVM_ALLOWED;
44use framebuffer::FRAMEBUFFER_SIZE;
45use framebuffer::FramebufferAccess;
46use futures::StreamExt;
47use futures_concurrency::stream::Merge;
48use get_tracing::init_tracing;
49use get_tracing::init_tracing_backend;
50use inspect::Inspect;
51use inspect::SensitivityLevel;
52use mesh::CancelContext;
53use mesh::CancelReason;
54use mesh::MeshPayload;
55use mesh::error::RemoteError;
56use mesh::rpc::Rpc;
57use mesh::rpc::RpcSend;
58use mesh_process::Mesh;
59use mesh_process::ProcessConfig;
60use mesh_process::try_run_mesh_host;
61use mesh_tracing::RemoteTracer;
62use mesh_tracing::TracingBackend;
63use mesh_worker::RegisteredWorkers;
64use mesh_worker::WorkerEvent;
65use mesh_worker::WorkerHandle;
66use mesh_worker::WorkerHost;
67use mesh_worker::WorkerHostRunner;
68use mesh_worker::launch_local_worker;
69use mesh_worker::register_workers;
70use pal_async::DefaultDriver;
71use pal_async::DefaultPool;
72use pal_async::task::Spawn;
73#[cfg(feature = "profiler")]
74use profiler_worker::ProfilerWorker;
75#[cfg(feature = "profiler")]
76use profiler_worker::ProfilerWorkerParameters;
77use std::time::Duration;
78use vmsocket::VmAddress;
79use vmsocket::VmListener;
80use vnc_worker_defs::VncParameters;
81
82fn new_underhill_remote_console_cfg(
83    framebuffer_gpa_base: Option<u64>,
84) -> anyhow::Result<(
85    UnderhillRemoteConsoleCfg,
86    Option<FramebufferAccess>,
87    Option<mesh::Receiver<Vec<video_core::DirtyRect>>>,
88)> {
89    if let Some(framebuffer_gpa_base) = framebuffer_gpa_base {
90        // Underhill accesses the framebuffer by using /dev/mshv_vtl_low to read
91        // from a second mapping placed after the end of RAM at a static
92        // location specified by the host.
93        //
94        // Open the file directly rather than use the `hcl` crate to avoid
95        // leaking `hcl` stuff into this crate.
96        //
97        // FUTURE: use an approach that doesn't require double mapping the
98        // framebuffer from the host.
99        let gpa_fd = fs_err::OpenOptions::new()
100            .read(true)
101            .write(true)
102            .open("/dev/mshv_vtl_low")
103            .context("failed to open gpa device")?;
104
105        let vram = sparse_mmap::new_mappable_from_file(gpa_fd.file(), true, false)?;
106        let (fb, fba) = framebuffer::framebuffer(vram, FRAMEBUFFER_SIZE, framebuffer_gpa_base)
107            .context("allocating framebuffer")?;
108        tracing::debug!("framebuffer_gpa_base: {:#x}", framebuffer_gpa_base);
109
110        let (dirt_send, dirt_recv) = mesh::channel();
111
112        Ok((
113            UnderhillRemoteConsoleCfg {
114                synth_keyboard: true,
115                synth_mouse: true,
116                synth_video: true,
117                input: mesh::Receiver::new(),
118                framebuffer: Some(fb),
119                dirt_send: Some(dirt_send),
120            },
121            Some(fba),
122            Some(dirt_recv),
123        ))
124    } else {
125        Ok((
126            UnderhillRemoteConsoleCfg {
127                synth_keyboard: false,
128                synth_mouse: false,
129                synth_video: false,
130                input: mesh::Receiver::new(),
131                framebuffer: None,
132                dirt_send: None,
133            },
134            None,
135            None,
136        ))
137    }
138}
139
140pub fn main() -> anyhow::Result<()> {
141    // Install a panic hook to prefix the current async task name before the
142    // standard panic output.
143    install_task_name_panic_hook();
144
145    if let Some(path) = std::env::var_os("OPENVMM_WRITE_SAVED_STATE_PROTO") {
146        if cfg!(debug_assertions) {
147            mesh::payload::protofile::DescriptorWriter::new(
148                vmcore::save_restore::saved_state_roots(),
149            )
150            .write_to_path(path)
151            .context("failed to write protobuf descriptors")?;
152            return Ok(());
153        } else {
154            // The generated code for this is too large for release builds.
155            anyhow::bail!(".proto output only supported in debug builds");
156        }
157    }
158
159    // FUTURE: create and use the affinitized threadpool here.
160    let (_, tracing_driver) = DefaultPool::spawn_on_thread("tracing");
161
162    // Try to run as a worker host, sending a remote tracer that will forward
163    // tracing events back to the initial process for logging to the host. See
164    // [`get_tracing`] doc comments for more details.
165    //
166    // On success the worker runs to completion and then exits the process (does
167    // not return). Any worker host setup errors are return and bubbled up.
168    try_run_mesh_host("underhill", {
169        let tracing_driver = tracing_driver.clone();
170        async |params: MeshHostParams| {
171            if let Some(remote_tracer) = params.tracer {
172                init_tracing(tracing_driver, remote_tracer).context("failed to init tracing")?;
173            }
174            params.runner.run(RegisteredWorkers).await;
175            Ok(())
176        }
177    })?;
178
179    // Initialize the tracing backend used by this and all subprocesses.
180    let mut tracing = init_tracing_backend(tracing_driver.clone())?;
181    // Initialize tracing from the backend.
182    init_tracing(tracing_driver, tracing.tracer()).context("failed to init tracing")?;
183    DefaultPool::run_with(|driver| do_main(driver, tracing))
184}
185
186fn install_task_name_panic_hook() {
187    use std::io::Write;
188
189    let panic_hook = std::panic::take_hook();
190    std::panic::set_hook(Box::new(move |info| {
191        pal_async::task::with_current_task_metadata(|metadata| {
192            if let Some(metadata) = metadata {
193                let _ = write!(std::io::stderr(), "task '{}', ", metadata.name());
194            }
195        });
196        // This will proceed with writing "thread ... panicked at ..."
197        panic_hook(info);
198    }));
199}
200
201async fn do_main(driver: DefaultDriver, mut tracing: TracingBackend) -> anyhow::Result<()> {
202    let opt = Options::parse(Vec::new(), Vec::new())?;
203
204    let crate_name = build_info::get().crate_name();
205    let crate_revision = build_info::get().scm_revision();
206    let openhcl_version = build_info::get().openhcl_version();
207    tracing::info!(
208        CVM_ALLOWED,
209        ?crate_name,
210        ?crate_revision,
211        ?openhcl_version,
212        "VMM process"
213    );
214    log_boot_times().context("failure logging boot times")?;
215
216    // Write the current pid to a file.
217    if let Some(pid_path) = &opt.pid {
218        std::fs::write(pid_path, std::process::id().to_string())
219            .with_context(|| format!("failed to write pid to {}", pid_path.display()))?;
220    }
221
222    let mesh = Mesh::new("underhill".to_string()).context("failed to create mesh")?;
223
224    let r = run_control(driver, &mesh, opt, &mut tracing).await;
225    if let Err(err) = &r {
226        tracing::error!(
227            CVM_ALLOWED,
228            error = err.as_ref() as &dyn std::error::Error,
229            "VM failure"
230        );
231    }
232
233    // Wait a few seconds for child processes to terminate and tracing to finish.
234    CancelContext::new()
235        .with_timeout(Duration::from_secs(10))
236        .until_cancelled(async {
237            mesh.shutdown().await;
238            tracing.shutdown().await;
239        })
240        .await
241        .ok();
242
243    r
244}
245
246fn log_boot_times() -> anyhow::Result<()> {
247    fn diff(start: Option<u64>, end: Option<u64>) -> Option<tracing::field::DebugValue<Duration>> {
248        use reference_time::ReferenceTime;
249        Some(tracing::field::debug(
250            ReferenceTime::new(end?).since(ReferenceTime::new(start?))?,
251        ))
252    }
253
254    // Read boot times provided by the bootloader.
255    let BootTimes {
256        start,
257        end,
258        sidecar_start,
259        sidecar_end,
260    } = BootTimes::new().context("failed to parse boot times")?;
261    tracing::info!(
262        CVM_ALLOWED,
263        start,
264        end,
265        sidecar_start,
266        sidecar_end,
267        elapsed = diff(start, end),
268        sidecar_elapsed = diff(sidecar_start, sidecar_end),
269        "boot loader times"
270    );
271    Ok(())
272}
273
274struct DiagState {
275    _worker: WorkerHandle,
276    request_recv: mesh::Receiver<diag_server::DiagRequest>,
277}
278
279impl DiagState {
280    async fn new() -> anyhow::Result<Self> {
281        // Start the diagnostics worker immediately.
282        let (request_send, request_recv) = mesh::channel();
283        let worker = launch_local_worker::<DiagWorker>(diag::DiagWorkerParameters { request_send })
284            .await
285            .context("failed to launch diagnostics worker")?;
286        Ok(Self {
287            _worker: worker,
288            request_recv,
289        })
290    }
291}
292
293#[derive(Inspect)]
294struct Workers {
295    #[inspect(safe)]
296    vm: WorkerHandle,
297    #[inspect(skip)]
298    vm_rpc: mesh::Sender<UhVmRpc>,
299    vnc: Option<WorkerHandle>,
300    #[cfg(feature = "gdb")]
301    gdb: Option<WorkerHandle>,
302}
303
304#[derive(MeshPayload)]
305struct MeshHostParams {
306    tracer: Option<RemoteTracer>,
307    runner: WorkerHostRunner,
308}
309
310async fn launch_mesh_host(
311    mesh: &Mesh,
312    name: &str,
313    tracer: Option<RemoteTracer>,
314) -> anyhow::Result<WorkerHost> {
315    let (host, runner) = mesh_worker::worker_host();
316    mesh.launch_host(ProcessConfig::new(name), MeshHostParams { tracer, runner })
317        .await?;
318    Ok(host)
319}
320
321async fn launch_workers(
322    mesh: &Mesh,
323    tracing: &mut TracingBackend,
324    control_send: mesh::Sender<ControlRequest>,
325    opt: Options,
326) -> anyhow::Result<Workers> {
327    let env_cfg = UnderhillEnvCfg {
328        vmbus_max_version: opt.vmbus_max_version,
329        vmbus_enable_mnf: opt.vmbus_enable_mnf,
330        vmbus_force_confidential_external_memory: opt.vmbus_force_confidential_external_memory,
331        vmbus_channel_unstick_delay: (opt.vmbus_channel_unstick_delay_ms != 0)
332            .then(|| Duration::from_millis(opt.vmbus_channel_unstick_delay_ms)),
333        cmdline_append: opt.cmdline_append.clone(),
334        reformat_vmgs: opt.reformat_vmgs,
335        vtl0_starts_paused: opt.vtl0_starts_paused,
336        emulated_serial_wait_for_rts: opt.serial_wait_for_rts,
337        force_load_vtl0_image: opt.force_load_vtl0_image,
338        nvme_vfio: opt.nvme_vfio,
339        halt_on_guest_halt: opt.halt_on_guest_halt,
340        no_sidecar_hotplug: opt.no_sidecar_hotplug,
341        gdbstub: opt.gdbstub,
342        hide_isolation: opt.hide_isolation,
343        nvme_keep_alive: opt.nvme_keep_alive,
344        mana_keep_alive: opt.mana_keep_alive,
345        nvme_always_flr: opt.nvme_always_flr,
346        test_configuration: opt.test_configuration,
347        disable_uefi_frontpage: opt.disable_uefi_frontpage,
348        default_boot_always_attempt: opt.default_boot_always_attempt,
349        guest_state_lifetime: opt.guest_state_lifetime,
350        guest_state_encryption_policy: opt.guest_state_encryption_policy,
351        hardware_sealing_policy: opt.hardware_sealing_policy,
352        efi_diagnostics_log_level: opt.efi_diagnostics_log_level,
353        efi_diagnostics_rate_limit: opt.efi_diagnostics_rate_limit,
354        strict_encryption_policy: opt.strict_encryption_policy,
355        attempt_ak_cert_callback: opt.attempt_ak_cert_callback,
356        enable_vpci_relay: opt.enable_vpci_relay,
357        disable_proxy_redirect: opt.disable_proxy_redirect,
358        disable_lower_vtl_timer_virt: opt.disable_lower_vtl_timer_virt,
359        config_timeout_in_seconds: opt.config_timeout_in_seconds,
360        servicing_timeout_dump_collection_in_ms: opt.servicing_timeout_dump_collection_in_ms,
361    };
362
363    let (mut remote_console_cfg, framebuffer_access, dirty_rect_recv) =
364        new_underhill_remote_console_cfg(opt.framebuffer_gpa_base)?;
365
366    let mut vnc_worker = None;
367    if let Some(framebuffer) = framebuffer_access {
368        let listener = VmListener::bind(VmAddress::vsock_any(opt.vnc_port))
369            .context("failed to bind socket")?;
370
371        let input_send = remote_console_cfg.input.sender();
372
373        let vnc_host = launch_mesh_host(mesh, "vnc", Some(tracing.tracer()))
374            .await
375            .context("spawning vnc process failed")?;
376
377        vnc_worker = Some(
378            vnc_host
379                .launch_worker(
380                    vnc_worker_defs::VNC_WORKER_VMSOCKET,
381                    VncParameters {
382                        listener,
383                        framebuffer,
384                        input_send,
385                        dirty_recv: dirty_rect_recv,
386                        max_clients: 16,
387                        evict_oldest: false,
388                    },
389                )
390                .await?,
391        )
392    }
393
394    #[cfg(feature = "gdb")]
395    let mut gdbstub_worker = None;
396    #[cfg_attr(not(feature = "gdb"), expect(unused_mut))]
397    let mut debugger_rpc = None;
398    #[cfg(feature = "gdb")]
399    if opt.gdbstub {
400        let listener = VmListener::bind(VmAddress::vsock_any(opt.gdbstub_port))
401            .context("failed to bind socket")?;
402
403        let gdb_host = launch_mesh_host(mesh, "gdb", Some(tracing.tracer()))
404            .await
405            .context("failed to spawn gdb host process")?;
406
407        // Get the VP count of this machine. It's too early to read it directly
408        // from IGVM parameters, but the kernel already has the IGVM parsed VP
409        // count via the boot loader anyways.
410        let vp_count =
411            pal::unix::affinity::max_present_cpu().context("failed to get max present cpu")? + 1;
412
413        let (send, recv) = mesh::channel();
414        debugger_rpc = Some(recv);
415        gdbstub_worker = Some(
416            gdb_host
417                .launch_worker(
418                    debug_worker_defs::DEBUGGER_VSOCK_WORKER,
419                    debug_worker_defs::DebuggerParameters {
420                        listener,
421                        req_chan: send,
422                        vp_count,
423                        target_arch: if cfg!(guest_arch = "x86_64") {
424                            debug_worker_defs::TargetArch::X86_64
425                        } else {
426                            debug_worker_defs::TargetArch::Aarch64
427                        },
428                    },
429                )
430                .await?,
431        );
432    }
433    let (vm_rpc, vm_rpc_rx) = mesh::channel();
434
435    // Spawn the worker in a separate process in case the diagnostics server (in
436    // this process) is used to run gdbserver against it, or in case it needs to
437    // be restarted.
438    let host = launch_mesh_host(mesh, "vm", Some(tracing.tracer()))
439        .await
440        .context("failed to launch worker process")?;
441
442    let vm_worker = host
443        .start_worker(
444            worker::UNDERHILL_WORKER,
445            UnderhillWorkerParameters {
446                env_cfg,
447                remote_console_cfg,
448                debugger_rpc,
449                vm_rpc: vm_rpc_rx,
450                control_send,
451            },
452        )
453        .context("failed to launch worker")?;
454
455    Ok(Workers {
456        vm: vm_worker,
457        vm_rpc,
458        vnc: vnc_worker,
459        #[cfg(feature = "gdb")]
460        gdb: gdbstub_worker,
461    })
462}
463
464/// State for inspect only.
465#[derive(Inspect)]
466enum ControlState {
467    WaitingForStart,
468    Starting,
469    Started,
470    Restarting,
471}
472
473#[derive(MeshPayload)]
474pub enum ControlRequest {
475    FlushLogs(Rpc<CancelContext, Result<(), CancelReason>>),
476    MakeWorker(Rpc<String, Result<WorkerHost, RemoteError>>),
477}
478
479async fn run_control(
480    driver: DefaultDriver,
481    mesh: &Mesh,
482    opt: Options,
483    mut tracing: &mut TracingBackend,
484) -> anyhow::Result<()> {
485    let (control_send, mut control_recv) = mesh::channel();
486    let mut control_send = Some(control_send);
487
488    if opt.signal_vtl0_started {
489        signal_vtl0_started(&driver)
490            .await
491            .context("failed to signal vtl0 started")?;
492    }
493
494    let mut diag = DiagState::new().await?;
495
496    let (diag_reinspect_send, mut diag_reinspect_recv) = mesh::channel();
497    #[cfg(feature = "profiler")]
498    let mut profiler_host = None;
499    let mut state;
500    let mut workers = if opt.wait_for_start {
501        state = ControlState::WaitingForStart;
502        None
503    } else {
504        state = ControlState::Starting;
505        let workers = launch_workers(mesh, tracing, control_send.take().unwrap(), opt)
506            .await
507            .context("failed to launch workers")?;
508        Some(workers)
509    };
510
511    enum Event {
512        Diag(diag_server::DiagRequest),
513        Worker(WorkerEvent),
514        Control(ControlRequest),
515    }
516
517    let mut restart_rpc = None;
518    #[cfg(feature = "mem-profile-tracing")]
519    let mut profiler = mem_profile_tracing::HeapProfiler::new();
520    loop {
521        let event = {
522            let mut stream = (
523                (&mut diag.request_recv).map(Event::Diag),
524                (&mut diag_reinspect_recv)
525                    .map(|req| Event::Diag(diag_server::DiagRequest::Inspect(req))),
526                (&mut control_recv).map(Event::Control),
527                futures::stream::select_all(workers.as_mut().map(|w| &mut w.vm)).map(Event::Worker),
528            )
529                .merge();
530
531            let Some(event) = stream.next().await else {
532                break;
533            };
534            event
535        };
536
537        match event {
538            Event::Diag(request) => {
539                match request {
540                    diag_server::DiagRequest::Start(rpc) => {
541                        rpc.handle_failable(async |params| {
542                            if workers.is_some() {
543                                Err(anyhow::anyhow!("workers have already been started"))?;
544                            }
545                            let new_opt = Options::parse(params.args, params.env)
546                                .context("failed to parse new options")?;
547
548                            workers = Some(
549                                launch_workers(
550                                    mesh,
551                                    tracing,
552                                    control_send.take().unwrap(),
553                                    new_opt,
554                                )
555                                .await?,
556                            );
557                            state = ControlState::Starting;
558                            anyhow::Ok(())
559                        })
560                        .await
561                    }
562                    diag_server::DiagRequest::Inspect(deferred) => deferred.respond(|resp| {
563                        resp.sensitivity_field("mesh", SensitivityLevel::Safe, mesh)
564                            .sensitivity_field_mut("trace", SensitivityLevel::Safe, &mut tracing)
565                            .sensitivity_field(
566                                "build_info",
567                                SensitivityLevel::Safe,
568                                build_info::get(),
569                            )
570                            .sensitivity_child(
571                                "proc",
572                                SensitivityLevel::Safe,
573                                inspect_proc::inspect_proc,
574                            )
575                            .sensitivity_field("control_state", SensitivityLevel::Safe, &state)
576                            // This node can not be renamed due to stability guarantees.
577                            // See the comment at the top of inspect_internal for more details.
578                            .sensitivity_child("uhdiag", SensitivityLevel::Safe, |req| {
579                                inspect_internal::inspect_internal_diagnostics(
580                                    req,
581                                    &diag_reinspect_send,
582                                    &driver,
583                                )
584                            });
585
586                        resp.merge(&workers);
587                    }),
588                    diag_server::DiagRequest::Crash(pid) => {
589                        mesh.crash(pid);
590                    }
591                    diag_server::DiagRequest::Restart(rpc) => {
592                        let Some(workers) = &mut workers else {
593                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!(
594                                "worker has not been started yet"
595                            ))));
596                            continue;
597                        };
598
599                        let r = async {
600                            if restart_rpc.is_some() {
601                                anyhow::bail!("previous restart still in progress");
602                            }
603
604                            let host = launch_mesh_host(mesh, "vm", Some(tracing.tracer()))
605                                .await
606                                .context("failed to launch worker process")?;
607
608                            workers.vm.restart(&host);
609                            Ok(())
610                        }
611                        .await;
612
613                        if r.is_err() {
614                            rpc.complete(r.map_err(RemoteError::new));
615                        } else {
616                            state = ControlState::Restarting;
617                            restart_rpc = Some(rpc);
618                        }
619                    }
620                    diag_server::DiagRequest::Pause(rpc) => {
621                        let Some(workers) = &mut workers else {
622                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!(
623                                "worker has not been started yet"
624                            ))));
625                            continue;
626                        };
627
628                        // create the req future output the spawn, so that
629                        // we don't need to clone + move vm_rpc.
630                        let req = workers.vm_rpc.call(UhVmRpc::Pause, ());
631
632                        // FUTURE: consider supporting cancellation
633                        driver
634                            .spawn("diag-pause", async move {
635                                let was_paused = req.await.expect("failed to pause VM");
636                                rpc.handle_failable_sync(|_| {
637                                    if !was_paused {
638                                        Err(anyhow::anyhow!("VM is already paused"))
639                                    } else {
640                                        Ok(())
641                                    }
642                                });
643                            })
644                            .detach();
645                    }
646                    diag_server::DiagRequest::PacketCapture(rpc) => {
647                        let Some(workers) = &mut workers else {
648                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!(
649                                "worker has not been started yet"
650                            ))));
651                            continue;
652                        };
653
654                        workers.vm_rpc.send(UhVmRpc::PacketCapture(rpc));
655                    }
656                    #[cfg(feature = "mem-profile-tracing")]
657                    diag_server::DiagRequest::MemoryProfileTrace(rpc) => {
658                        rpc.handle_failable(async |pid| {
659                            if pid == std::process::id() as i32 {
660                                anyhow::Ok(profiler.capture_and_restart())
661                            } else {
662                                let Some(workers) = &mut workers else {
663                                    anyhow::bail!("workers have not been started yet");
664                                };
665
666                                let result = workers
667                                    .vm_rpc
668                                    .call(UhVmRpc::MemoryProfileTrace, pid)
669                                    .await
670                                    .context("failed to get memory profile from worker process")?;
671                                Ok(result?)
672                            }
673                        })
674                        .await
675                    }
676                    diag_server::DiagRequest::Resume(rpc) => {
677                        let Some(workers) = &mut workers else {
678                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!(
679                                "worker has not been started yet"
680                            ))));
681                            continue;
682                        };
683
684                        let was_resumed = workers
685                            .vm_rpc
686                            .call(UhVmRpc::Resume, ())
687                            .await
688                            .context("failed to resumed VM")?;
689
690                        let was_halted = workers
691                            .vm_rpc
692                            .call(UhVmRpc::ClearHalt, ())
693                            .await
694                            .context("failed to clear halt from VPs")?;
695
696                        rpc.handle_sync(|_| {
697                            if was_resumed || was_halted {
698                                Ok(())
699                            } else {
700                                Err(RemoteError::new(anyhow::anyhow!("VM is currently running")))
701                            }
702                        });
703                    }
704                    diag_server::DiagRequest::Save(rpc) => {
705                        let Some(workers) = &mut workers else {
706                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!(
707                                "worker has not been started yet"
708                            ))));
709                            continue;
710                        };
711
712                        workers.vm_rpc.send(UhVmRpc::Save(rpc));
713                    }
714                    #[cfg(feature = "profiler")]
715                    diag_server::DiagRequest::Profile(rpc) => {
716                        let (rpc_params, rpc_sender) = rpc.split();
717                        // Create profiler host if there is none created before
718                        if profiler_host.is_none() {
719                            match launch_mesh_host(mesh, "profiler", Some(tracing.tracer()))
720                                .await
721                                .context("failed to launch profiler host")
722                            {
723                                Ok(host) => {
724                                    profiler_host = Some(host);
725                                }
726                                Err(e) => {
727                                    rpc_sender.complete(Err(RemoteError::new(e)));
728                                    continue;
729                                }
730                            }
731                        }
732
733                        let profiling_duration = rpc_params.duration;
734                        let host = profiler_host.as_ref().unwrap();
735                        let mut profiler_worker;
736                        match host
737                            .launch_worker(
738                                profiler_worker::PROFILER_WORKER,
739                                ProfilerWorkerParameters {
740                                    profiler_request: rpc_params,
741                                },
742                            )
743                            .await
744                        {
745                            Ok(worker) => {
746                                profiler_worker = worker;
747                            }
748                            Err(e) => {
749                                rpc_sender.complete(Err(RemoteError::new(e)));
750                                continue;
751                            }
752                        }
753
754                        driver
755                            .spawn("profiler_worker", async move {
756                                let result = CancelContext::new()
757                                    .with_timeout(Duration::from_secs(profiling_duration + 30))
758                                    .until_cancelled(profiler_worker.join())
759                                    .await
760                                    .context("profiler worker cancelled")
761                                    .and_then(|result| result.context("profiler worker failed"))
762                                    .map_err(RemoteError::new);
763
764                                rpc_sender.complete(result);
765                            })
766                            .detach();
767                    }
768                }
769            }
770            Event::Worker(event) => match event {
771                WorkerEvent::Started => {
772                    if let Some(response) = restart_rpc.take() {
773                        tracing::info!(CVM_ALLOWED, "restart complete");
774                        response.complete(Ok(()));
775                    } else {
776                        tracing::info!(CVM_ALLOWED, "vm worker started");
777                    }
778                    state = ControlState::Started;
779                }
780                WorkerEvent::Stopped => {
781                    anyhow::bail!("worker unexpectedly stopped");
782                }
783                WorkerEvent::Failed(err) => {
784                    return Err(anyhow::Error::from(err)).context("vm worker failed");
785                }
786                WorkerEvent::RestartFailed(err) => {
787                    tracing::error!(
788                        CVM_ALLOWED,
789                        error = &err as &dyn std::error::Error,
790                        "restart failed"
791                    );
792                    restart_rpc.take().unwrap().complete(Err(err));
793                    state = ControlState::Started;
794                }
795            },
796            Event::Control(req) => match req {
797                ControlRequest::FlushLogs(rpc) => {
798                    rpc.handle(async |mut ctx| {
799                        tracing::info!(CVM_ALLOWED, "flushing logs");
800                        ctx.until_cancelled(tracing.flush()).await?;
801                        Ok(())
802                    })
803                    .await
804                }
805                ControlRequest::MakeWorker(rpc) => {
806                    rpc.handle_failable(async |name| {
807                        launch_mesh_host(mesh, &name, Some(tracing.tracer())).await
808                    })
809                    .await
810                }
811            },
812        }
813    }
814
815    Ok(())
816}
817
818async fn signal_vtl0_started(driver: &DefaultDriver) -> anyhow::Result<()> {
819    tracing::info!(CVM_ALLOWED, "signaling vtl0 started early");
820    let (client, task) = guest_emulation_transport::spawn_get_worker(driver.clone())
821        .await
822        .context("failed to spawn GET")?;
823    client.complete_start_vtl0(None).await;
824    // Disconnect the GET so that it can be reused.
825    drop(client);
826    task.await.unwrap();
827    tracing::info!(CVM_ALLOWED, "signaled vtl0 start");
828    Ok(())
829}
830
831// The "base" workers for Underhill. Other workers are defined in the
832// `underhill_resources` crate.
833//
834// FUTURE: split these workers into separate crates and move them to
835// `underhill_resources`, too.
836register_workers! {
837    UnderhillVmWorker,
838    DiagWorker,
839    #[cfg(feature = "profiler")]
840    ProfilerWorker,
841}