Skip to main content

underhill_core/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! This module implements the interactive control process and the entry point
5//! for the underhill environment.
6
7#![cfg(target_os = "linux")]
8#![expect(missing_docs)]
9#![forbid(unsafe_code)]
10
11mod diag;
12mod dispatch;
13mod emuplat;
14mod get_tracing;
15mod inspect_internal;
16mod inspect_proc;
17mod livedump;
18mod loader;
19mod nvme_manager;
20mod options;
21mod reference_time;
22mod servicing;
23mod threadpool_vm_task_backend;
24mod vmbus_relay_unit;
25mod vmgs_logger;
26mod vp;
27mod vpci;
28mod worker;
29mod wrapped_partition;
30
31// `pub` so that the missing_docs warning fires for options without
32// documentation.
33pub use options::Options;
34
35use crate::diag::DiagWorker;
36use crate::dispatch::UhVmRpc;
37use crate::worker::UnderhillEnvCfg;
38use crate::worker::UnderhillRemoteConsoleCfg;
39use crate::worker::UnderhillVmWorker;
40use crate::worker::UnderhillWorkerParameters;
41use anyhow::Context;
42use bootloader_fdt_parser::BootTimes;
43use cvm_tracing::CVM_ALLOWED;
44use framebuffer::FRAMEBUFFER_SIZE;
45use framebuffer::FramebufferAccess;
46use futures::StreamExt;
47use futures_concurrency::stream::Merge;
48use get_tracing::init_tracing;
49use get_tracing::init_tracing_backend;
50use inspect::Inspect;
51use inspect::SensitivityLevel;
52use mesh::CancelContext;
53use mesh::CancelReason;
54use mesh::MeshPayload;
55use mesh::error::RemoteError;
56use mesh::rpc::Rpc;
57use mesh::rpc::RpcSend;
58use mesh_process::Mesh;
59use mesh_process::ProcessConfig;
60use mesh_process::try_run_mesh_host;
61use mesh_tracing::RemoteTracer;
62use mesh_tracing::TracingBackend;
63use mesh_worker::RegisteredWorkers;
64use mesh_worker::WorkerEvent;
65use mesh_worker::WorkerHandle;
66use mesh_worker::WorkerHost;
67use mesh_worker::WorkerHostRunner;
68use mesh_worker::launch_local_worker;
69use mesh_worker::register_workers;
70use pal_async::DefaultDriver;
71use pal_async::DefaultPool;
72use pal_async::task::Spawn;
73#[cfg(feature = "profiler")]
74use profiler_worker::ProfilerWorker;
75#[cfg(feature = "profiler")]
76use profiler_worker::ProfilerWorkerParameters;
77use std::time::Duration;
78use vmsocket::VmAddress;
79use vmsocket::VmListener;
80use vnc_worker_defs::VncParameters;
81
82fn new_underhill_remote_console_cfg(
83    framebuffer_gpa_base: Option<u64>,
84) -> anyhow::Result<(UnderhillRemoteConsoleCfg, Option<FramebufferAccess>)> {
85    if let Some(framebuffer_gpa_base) = framebuffer_gpa_base {
86        // Underhill accesses the framebuffer by using /dev/mshv_vtl_low to read
87        // from a second mapping placed after the end of RAM at a static
88        // location specified by the host.
89        //
90        // Open the file directly rather than use the `hcl` crate to avoid
91        // leaking `hcl` stuff into this crate.
92        //
93        // FUTURE: use an approach that doesn't require double mapping the
94        // framebuffer from the host.
95        let gpa_fd = fs_err::OpenOptions::new()
96            .read(true)
97            .write(true)
98            .open("/dev/mshv_vtl_low")
99            .context("failed to open gpa device")?;
100
101        let vram = sparse_mmap::new_mappable_from_file(gpa_fd.file(), true, false)?;
102        let (fb, fba) = framebuffer::framebuffer(vram, FRAMEBUFFER_SIZE, framebuffer_gpa_base)
103            .context("allocating framebuffer")?;
104        tracing::debug!("framebuffer_gpa_base: {:#x}", framebuffer_gpa_base);
105
106        Ok((
107            UnderhillRemoteConsoleCfg {
108                synth_keyboard: true,
109                synth_mouse: true,
110                synth_video: true,
111                input: mesh::Receiver::new(),
112                framebuffer: Some(fb),
113            },
114            Some(fba),
115        ))
116    } else {
117        Ok((
118            UnderhillRemoteConsoleCfg {
119                synth_keyboard: false,
120                synth_mouse: false,
121                synth_video: false,
122                input: mesh::Receiver::new(),
123                framebuffer: None,
124            },
125            None,
126        ))
127    }
128}
129
130pub fn main() -> anyhow::Result<()> {
131    // Install a panic hook to prefix the current async task name before the
132    // standard panic output.
133    install_task_name_panic_hook();
134
135    if let Some(path) = std::env::var_os("OPENVMM_WRITE_SAVED_STATE_PROTO") {
136        if cfg!(debug_assertions) {
137            mesh::payload::protofile::DescriptorWriter::new(
138                vmcore::save_restore::saved_state_roots(),
139            )
140            .write_to_path(path)
141            .context("failed to write protobuf descriptors")?;
142            return Ok(());
143        } else {
144            // The generated code for this is too large for release builds.
145            anyhow::bail!(".proto output only supported in debug builds");
146        }
147    }
148
149    // FUTURE: create and use the affinitized threadpool here.
150    let (_, tracing_driver) = DefaultPool::spawn_on_thread("tracing");
151
152    // Try to run as a worker host, sending a remote tracer that will forward
153    // tracing events back to the initial process for logging to the host. See
154    // [`get_tracing`] doc comments for more details.
155    //
156    // On success the worker runs to completion and then exits the process (does
157    // not return). Any worker host setup errors are return and bubbled up.
158    try_run_mesh_host("underhill", {
159        let tracing_driver = tracing_driver.clone();
160        async |params: MeshHostParams| {
161            if let Some(remote_tracer) = params.tracer {
162                init_tracing(tracing_driver, remote_tracer).context("failed to init tracing")?;
163            }
164            params.runner.run(RegisteredWorkers).await;
165            Ok(())
166        }
167    })?;
168
169    // Initialize the tracing backend used by this and all subprocesses.
170    let mut tracing = init_tracing_backend(tracing_driver.clone())?;
171    // Initialize tracing from the backend.
172    init_tracing(tracing_driver, tracing.tracer()).context("failed to init tracing")?;
173    DefaultPool::run_with(|driver| do_main(driver, tracing))
174}
175
176fn install_task_name_panic_hook() {
177    use std::io::Write;
178
179    let panic_hook = std::panic::take_hook();
180    std::panic::set_hook(Box::new(move |info| {
181        pal_async::task::with_current_task_metadata(|metadata| {
182            if let Some(metadata) = metadata {
183                let _ = write!(std::io::stderr(), "task '{}', ", metadata.name());
184            }
185        });
186        // This will proceed with writing "thread ... panicked at ..."
187        panic_hook(info);
188    }));
189}
190
191async fn do_main(driver: DefaultDriver, mut tracing: TracingBackend) -> anyhow::Result<()> {
192    let opt = Options::parse(Vec::new(), Vec::new())?;
193
194    let crate_name = build_info::get().crate_name();
195    let crate_revision = build_info::get().scm_revision();
196    tracing::info!(CVM_ALLOWED, ?crate_name, ?crate_revision, "VMM process");
197    log_boot_times().context("failure logging boot times")?;
198
199    // Write the current pid to a file.
200    if let Some(pid_path) = &opt.pid {
201        std::fs::write(pid_path, std::process::id().to_string())
202            .with_context(|| format!("failed to write pid to {}", pid_path.display()))?;
203    }
204
205    let mesh = Mesh::new("underhill".to_string()).context("failed to create mesh")?;
206
207    let r = run_control(driver, &mesh, opt, &mut tracing).await;
208    if let Err(err) = &r {
209        tracing::error!(
210            CVM_ALLOWED,
211            error = err.as_ref() as &dyn std::error::Error,
212            "VM failure"
213        );
214    }
215
216    // Wait a few seconds for child processes to terminate and tracing to finish.
217    CancelContext::new()
218        .with_timeout(Duration::from_secs(10))
219        .until_cancelled(async {
220            mesh.shutdown().await;
221            tracing.shutdown().await;
222        })
223        .await
224        .ok();
225
226    r
227}
228
229fn log_boot_times() -> anyhow::Result<()> {
230    fn diff(start: Option<u64>, end: Option<u64>) -> Option<tracing::field::DebugValue<Duration>> {
231        use reference_time::ReferenceTime;
232        Some(tracing::field::debug(
233            ReferenceTime::new(end?).since(ReferenceTime::new(start?))?,
234        ))
235    }
236
237    // Read boot times provided by the bootloader.
238    let BootTimes {
239        start,
240        end,
241        sidecar_start,
242        sidecar_end,
243    } = BootTimes::new().context("failed to parse boot times")?;
244    tracing::info!(
245        CVM_ALLOWED,
246        start,
247        end,
248        sidecar_start,
249        sidecar_end,
250        elapsed = diff(start, end),
251        sidecar_elapsed = diff(sidecar_start, sidecar_end),
252        "boot loader times"
253    );
254    Ok(())
255}
256
257struct DiagState {
258    _worker: WorkerHandle,
259    request_recv: mesh::Receiver<diag_server::DiagRequest>,
260}
261
262impl DiagState {
263    async fn new() -> anyhow::Result<Self> {
264        // Start the diagnostics worker immediately.
265        let (request_send, request_recv) = mesh::channel();
266        let worker = launch_local_worker::<DiagWorker>(diag::DiagWorkerParameters { request_send })
267            .await
268            .context("failed to launch diagnostics worker")?;
269        Ok(Self {
270            _worker: worker,
271            request_recv,
272        })
273    }
274}
275
276#[derive(Inspect)]
277struct Workers {
278    #[inspect(safe)]
279    vm: WorkerHandle,
280    #[inspect(skip)]
281    vm_rpc: mesh::Sender<UhVmRpc>,
282    vnc: Option<WorkerHandle>,
283    #[cfg(feature = "gdb")]
284    gdb: Option<WorkerHandle>,
285}
286
287#[derive(MeshPayload)]
288struct MeshHostParams {
289    tracer: Option<RemoteTracer>,
290    runner: WorkerHostRunner,
291}
292
293async fn launch_mesh_host(
294    mesh: &Mesh,
295    name: &str,
296    tracer: Option<RemoteTracer>,
297) -> anyhow::Result<WorkerHost> {
298    let (host, runner) = mesh_worker::worker_host();
299    mesh.launch_host(ProcessConfig::new(name), MeshHostParams { tracer, runner })
300        .await?;
301    Ok(host)
302}
303
304async fn launch_workers(
305    mesh: &Mesh,
306    tracing: &mut TracingBackend,
307    control_send: mesh::Sender<ControlRequest>,
308    opt: Options,
309) -> anyhow::Result<Workers> {
310    let env_cfg = UnderhillEnvCfg {
311        vmbus_max_version: opt.vmbus_max_version,
312        vmbus_enable_mnf: opt.vmbus_enable_mnf,
313        vmbus_force_confidential_external_memory: opt.vmbus_force_confidential_external_memory,
314        vmbus_channel_unstick_delay: (opt.vmbus_channel_unstick_delay_ms != 0)
315            .then(|| Duration::from_millis(opt.vmbus_channel_unstick_delay_ms)),
316        cmdline_append: opt.cmdline_append.clone(),
317        reformat_vmgs: opt.reformat_vmgs,
318        vtl0_starts_paused: opt.vtl0_starts_paused,
319        emulated_serial_wait_for_rts: opt.serial_wait_for_rts,
320        force_load_vtl0_image: opt.force_load_vtl0_image,
321        nvme_vfio: opt.nvme_vfio,
322        halt_on_guest_halt: opt.halt_on_guest_halt,
323        no_sidecar_hotplug: opt.no_sidecar_hotplug,
324        gdbstub: opt.gdbstub,
325        hide_isolation: opt.hide_isolation,
326        nvme_keep_alive: opt.nvme_keep_alive,
327        mana_keep_alive: opt.mana_keep_alive,
328        nvme_always_flr: opt.nvme_always_flr,
329        test_configuration: opt.test_configuration,
330        disable_uefi_frontpage: opt.disable_uefi_frontpage,
331        default_boot_always_attempt: opt.default_boot_always_attempt,
332        guest_state_lifetime: opt.guest_state_lifetime,
333        guest_state_encryption_policy: opt.guest_state_encryption_policy,
334        efi_diagnostics_log_level: opt.efi_diagnostics_log_level,
335        strict_encryption_policy: opt.strict_encryption_policy,
336        attempt_ak_cert_callback: opt.attempt_ak_cert_callback,
337        enable_vpci_relay: opt.enable_vpci_relay,
338        disable_proxy_redirect: opt.disable_proxy_redirect,
339        disable_lower_vtl_timer_virt: opt.disable_lower_vtl_timer_virt,
340        config_timeout_in_seconds: opt.config_timeout_in_seconds,
341        servicing_timeout_dump_collection_in_ms: opt.servicing_timeout_dump_collection_in_ms,
342    };
343
344    let (mut remote_console_cfg, framebuffer_access) =
345        new_underhill_remote_console_cfg(opt.framebuffer_gpa_base)?;
346
347    let mut vnc_worker = None;
348    if let Some(framebuffer) = framebuffer_access {
349        let listener = VmListener::bind(VmAddress::vsock_any(opt.vnc_port))
350            .context("failed to bind socket")?;
351
352        let input_send = remote_console_cfg.input.sender();
353
354        let vnc_host = launch_mesh_host(mesh, "vnc", Some(tracing.tracer()))
355            .await
356            .context("spawning vnc process failed")?;
357
358        vnc_worker = Some(
359            vnc_host
360                .launch_worker(
361                    vnc_worker_defs::VNC_WORKER_VMSOCKET,
362                    VncParameters {
363                        listener,
364                        framebuffer,
365                        input_send,
366                    },
367                )
368                .await?,
369        )
370    }
371
372    #[cfg(feature = "gdb")]
373    let mut gdbstub_worker = None;
374    #[cfg_attr(not(feature = "gdb"), expect(unused_mut))]
375    let mut debugger_rpc = None;
376    #[cfg(feature = "gdb")]
377    if opt.gdbstub {
378        let listener = VmListener::bind(VmAddress::vsock_any(opt.gdbstub_port))
379            .context("failed to bind socket")?;
380
381        let gdb_host = launch_mesh_host(mesh, "gdb", Some(tracing.tracer()))
382            .await
383            .context("failed to spawn gdb host process")?;
384
385        // Get the VP count of this machine. It's too early to read it directly
386        // from IGVM parameters, but the kernel already has the IGVM parsed VP
387        // count via the boot loader anyways.
388        let vp_count =
389            pal::unix::affinity::max_present_cpu().context("failed to get max present cpu")? + 1;
390
391        let (send, recv) = mesh::channel();
392        debugger_rpc = Some(recv);
393        gdbstub_worker = Some(
394            gdb_host
395                .launch_worker(
396                    debug_worker_defs::DEBUGGER_VSOCK_WORKER,
397                    debug_worker_defs::DebuggerParameters {
398                        listener,
399                        req_chan: send,
400                        vp_count,
401                        target_arch: if cfg!(guest_arch = "x86_64") {
402                            debug_worker_defs::TargetArch::X86_64
403                        } else {
404                            debug_worker_defs::TargetArch::Aarch64
405                        },
406                    },
407                )
408                .await?,
409        );
410    }
411    let (vm_rpc, vm_rpc_rx) = mesh::channel();
412
413    // Spawn the worker in a separate process in case the diagnostics server (in
414    // this process) is used to run gdbserver against it, or in case it needs to
415    // be restarted.
416    let host = launch_mesh_host(mesh, "vm", Some(tracing.tracer()))
417        .await
418        .context("failed to launch worker process")?;
419
420    let vm_worker = host
421        .start_worker(
422            worker::UNDERHILL_WORKER,
423            UnderhillWorkerParameters {
424                env_cfg,
425                remote_console_cfg,
426                debugger_rpc,
427                vm_rpc: vm_rpc_rx,
428                control_send,
429            },
430        )
431        .context("failed to launch worker")?;
432
433    Ok(Workers {
434        vm: vm_worker,
435        vm_rpc,
436        vnc: vnc_worker,
437        #[cfg(feature = "gdb")]
438        gdb: gdbstub_worker,
439    })
440}
441
442/// State for inspect only.
443#[derive(Inspect)]
444enum ControlState {
445    WaitingForStart,
446    Starting,
447    Started,
448    Restarting,
449}
450
451#[derive(MeshPayload)]
452pub enum ControlRequest {
453    FlushLogs(Rpc<CancelContext, Result<(), CancelReason>>),
454    MakeWorker(Rpc<String, Result<WorkerHost, RemoteError>>),
455}
456
457async fn run_control(
458    driver: DefaultDriver,
459    mesh: &Mesh,
460    opt: Options,
461    mut tracing: &mut TracingBackend,
462) -> anyhow::Result<()> {
463    let (control_send, mut control_recv) = mesh::channel();
464    let mut control_send = Some(control_send);
465
466    if opt.signal_vtl0_started {
467        signal_vtl0_started(&driver)
468            .await
469            .context("failed to signal vtl0 started")?;
470    }
471
472    let mut diag = DiagState::new().await?;
473
474    let (diag_reinspect_send, mut diag_reinspect_recv) = mesh::channel();
475    #[cfg(feature = "profiler")]
476    let mut profiler_host = None;
477    let mut state;
478    let mut workers = if opt.wait_for_start {
479        state = ControlState::WaitingForStart;
480        None
481    } else {
482        state = ControlState::Starting;
483        let workers = launch_workers(mesh, tracing, control_send.take().unwrap(), opt)
484            .await
485            .context("failed to launch workers")?;
486        Some(workers)
487    };
488
489    enum Event {
490        Diag(diag_server::DiagRequest),
491        Worker(WorkerEvent),
492        Control(ControlRequest),
493    }
494
495    let mut restart_rpc = None;
496    #[cfg(feature = "mem-profile-tracing")]
497    let mut profiler = mem_profile_tracing::HeapProfiler::new();
498    loop {
499        let event = {
500            let mut stream = (
501                (&mut diag.request_recv).map(Event::Diag),
502                (&mut diag_reinspect_recv)
503                    .map(|req| Event::Diag(diag_server::DiagRequest::Inspect(req))),
504                (&mut control_recv).map(Event::Control),
505                futures::stream::select_all(workers.as_mut().map(|w| &mut w.vm)).map(Event::Worker),
506            )
507                .merge();
508
509            let Some(event) = stream.next().await else {
510                break;
511            };
512            event
513        };
514
515        match event {
516            Event::Diag(request) => {
517                match request {
518                    diag_server::DiagRequest::Start(rpc) => {
519                        rpc.handle_failable(async |params| {
520                            if workers.is_some() {
521                                Err(anyhow::anyhow!("workers have already been started"))?;
522                            }
523                            let new_opt = Options::parse(params.args, params.env)
524                                .context("failed to parse new options")?;
525
526                            workers = Some(
527                                launch_workers(
528                                    mesh,
529                                    tracing,
530                                    control_send.take().unwrap(),
531                                    new_opt,
532                                )
533                                .await?,
534                            );
535                            state = ControlState::Starting;
536                            anyhow::Ok(())
537                        })
538                        .await
539                    }
540                    diag_server::DiagRequest::Inspect(deferred) => deferred.respond(|resp| {
541                        resp.sensitivity_field("mesh", SensitivityLevel::Safe, mesh)
542                            .sensitivity_field_mut("trace", SensitivityLevel::Safe, &mut tracing)
543                            .sensitivity_field(
544                                "build_info",
545                                SensitivityLevel::Safe,
546                                build_info::get(),
547                            )
548                            .sensitivity_child(
549                                "proc",
550                                SensitivityLevel::Safe,
551                                inspect_proc::inspect_proc,
552                            )
553                            .sensitivity_field("control_state", SensitivityLevel::Safe, &state)
554                            // This node can not be renamed due to stability guarantees.
555                            // See the comment at the top of inspect_internal for more details.
556                            .sensitivity_child("uhdiag", SensitivityLevel::Safe, |req| {
557                                inspect_internal::inspect_internal_diagnostics(
558                                    req,
559                                    &diag_reinspect_send,
560                                    &driver,
561                                )
562                            });
563
564                        resp.merge(&workers);
565                    }),
566                    diag_server::DiagRequest::Crash(pid) => {
567                        mesh.crash(pid);
568                    }
569                    diag_server::DiagRequest::Restart(rpc) => {
570                        let Some(workers) = &mut workers else {
571                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!(
572                                "worker has not been started yet"
573                            ))));
574                            continue;
575                        };
576
577                        let r = async {
578                            if restart_rpc.is_some() {
579                                anyhow::bail!("previous restart still in progress");
580                            }
581
582                            let host = launch_mesh_host(mesh, "vm", Some(tracing.tracer()))
583                                .await
584                                .context("failed to launch worker process")?;
585
586                            workers.vm.restart(&host);
587                            Ok(())
588                        }
589                        .await;
590
591                        if r.is_err() {
592                            rpc.complete(r.map_err(RemoteError::new));
593                        } else {
594                            state = ControlState::Restarting;
595                            restart_rpc = Some(rpc);
596                        }
597                    }
598                    diag_server::DiagRequest::Pause(rpc) => {
599                        let Some(workers) = &mut workers else {
600                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!(
601                                "worker has not been started yet"
602                            ))));
603                            continue;
604                        };
605
606                        // create the req future output the spawn, so that
607                        // we don't need to clone + move vm_rpc.
608                        let req = workers.vm_rpc.call(UhVmRpc::Pause, ());
609
610                        // FUTURE: consider supporting cancellation
611                        driver
612                            .spawn("diag-pause", async move {
613                                let was_paused = req.await.expect("failed to pause VM");
614                                rpc.handle_failable_sync(|_| {
615                                    if !was_paused {
616                                        Err(anyhow::anyhow!("VM is already paused"))
617                                    } else {
618                                        Ok(())
619                                    }
620                                });
621                            })
622                            .detach();
623                    }
624                    diag_server::DiagRequest::PacketCapture(rpc) => {
625                        let Some(workers) = &mut workers else {
626                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!(
627                                "worker has not been started yet"
628                            ))));
629                            continue;
630                        };
631
632                        workers.vm_rpc.send(UhVmRpc::PacketCapture(rpc));
633                    }
634                    #[cfg(feature = "mem-profile-tracing")]
635                    diag_server::DiagRequest::MemoryProfileTrace(rpc) => {
636                        rpc.handle_failable(async |pid| {
637                            if pid == std::process::id() as i32 {
638                                anyhow::Ok(profiler.capture_and_restart())
639                            } else {
640                                let Some(workers) = &mut workers else {
641                                    anyhow::bail!("workers have not been started yet");
642                                };
643
644                                let result = workers
645                                    .vm_rpc
646                                    .call(UhVmRpc::MemoryProfileTrace, pid)
647                                    .await
648                                    .context("failed to get memory profile from worker process")?;
649                                Ok(result?)
650                            }
651                        })
652                        .await
653                    }
654                    diag_server::DiagRequest::Resume(rpc) => {
655                        let Some(workers) = &mut workers else {
656                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!(
657                                "worker has not been started yet"
658                            ))));
659                            continue;
660                        };
661
662                        let was_resumed = workers
663                            .vm_rpc
664                            .call(UhVmRpc::Resume, ())
665                            .await
666                            .context("failed to resumed VM")?;
667
668                        let was_halted = workers
669                            .vm_rpc
670                            .call(UhVmRpc::ClearHalt, ())
671                            .await
672                            .context("failed to clear halt from VPs")?;
673
674                        rpc.handle_sync(|_| {
675                            if was_resumed || was_halted {
676                                Ok(())
677                            } else {
678                                Err(RemoteError::new(anyhow::anyhow!("VM is currently running")))
679                            }
680                        });
681                    }
682                    diag_server::DiagRequest::Save(rpc) => {
683                        let Some(workers) = &mut workers else {
684                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!(
685                                "worker has not been started yet"
686                            ))));
687                            continue;
688                        };
689
690                        workers.vm_rpc.send(UhVmRpc::Save(rpc));
691                    }
692                    #[cfg(feature = "profiler")]
693                    diag_server::DiagRequest::Profile(rpc) => {
694                        let (rpc_params, rpc_sender) = rpc.split();
695                        // Create profiler host if there is none created before
696                        if profiler_host.is_none() {
697                            match launch_mesh_host(mesh, "profiler", Some(tracing.tracer()))
698                                .await
699                                .context("failed to launch profiler host")
700                            {
701                                Ok(host) => {
702                                    profiler_host = Some(host);
703                                }
704                                Err(e) => {
705                                    rpc_sender.complete(Err(RemoteError::new(e)));
706                                    continue;
707                                }
708                            }
709                        }
710
711                        let profiling_duration = rpc_params.duration;
712                        let host = profiler_host.as_ref().unwrap();
713                        let mut profiler_worker;
714                        match host
715                            .launch_worker(
716                                profiler_worker::PROFILER_WORKER,
717                                ProfilerWorkerParameters {
718                                    profiler_request: rpc_params,
719                                },
720                            )
721                            .await
722                        {
723                            Ok(worker) => {
724                                profiler_worker = worker;
725                            }
726                            Err(e) => {
727                                rpc_sender.complete(Err(RemoteError::new(e)));
728                                continue;
729                            }
730                        }
731
732                        driver
733                            .spawn("profiler_worker", async move {
734                                let result = CancelContext::new()
735                                    .with_timeout(Duration::from_secs(profiling_duration + 30))
736                                    .until_cancelled(profiler_worker.join())
737                                    .await
738                                    .context("profiler worker cancelled")
739                                    .and_then(|result| result.context("profiler worker failed"))
740                                    .map_err(RemoteError::new);
741
742                                rpc_sender.complete(result);
743                            })
744                            .detach();
745                    }
746                }
747            }
748            Event::Worker(event) => match event {
749                WorkerEvent::Started => {
750                    if let Some(response) = restart_rpc.take() {
751                        tracing::info!(CVM_ALLOWED, "restart complete");
752                        response.complete(Ok(()));
753                    } else {
754                        tracing::info!(CVM_ALLOWED, "vm worker started");
755                    }
756                    state = ControlState::Started;
757                }
758                WorkerEvent::Stopped => {
759                    anyhow::bail!("worker unexpectedly stopped");
760                }
761                WorkerEvent::Failed(err) => {
762                    return Err(anyhow::Error::from(err)).context("vm worker failed");
763                }
764                WorkerEvent::RestartFailed(err) => {
765                    tracing::error!(
766                        CVM_ALLOWED,
767                        error = &err as &dyn std::error::Error,
768                        "restart failed"
769                    );
770                    restart_rpc.take().unwrap().complete(Err(err));
771                    state = ControlState::Started;
772                }
773            },
774            Event::Control(req) => match req {
775                ControlRequest::FlushLogs(rpc) => {
776                    rpc.handle(async |mut ctx| {
777                        tracing::info!(CVM_ALLOWED, "flushing logs");
778                        ctx.until_cancelled(tracing.flush()).await?;
779                        Ok(())
780                    })
781                    .await
782                }
783                ControlRequest::MakeWorker(rpc) => {
784                    rpc.handle_failable(async |name| {
785                        launch_mesh_host(mesh, &name, Some(tracing.tracer())).await
786                    })
787                    .await
788                }
789            },
790        }
791    }
792
793    Ok(())
794}
795
796async fn signal_vtl0_started(driver: &DefaultDriver) -> anyhow::Result<()> {
797    tracing::info!(CVM_ALLOWED, "signaling vtl0 started early");
798    let (client, task) = guest_emulation_transport::spawn_get_worker(driver.clone())
799        .await
800        .context("failed to spawn GET")?;
801    client.complete_start_vtl0(None).await;
802    // Disconnect the GET so that it can be reused.
803    drop(client);
804    task.await.unwrap();
805    tracing::info!(CVM_ALLOWED, "signaled vtl0 start");
806    Ok(())
807}
808
809// The "base" workers for Underhill. Other workers are defined in the
810// `underhill_resources` crate.
811//
812// FUTURE: split these workers into separate crates and move them to
813// `underhill_resources`, too.
814register_workers! {
815    UnderhillVmWorker,
816    DiagWorker,
817    #[cfg(feature = "profiler")]
818    ProfilerWorker,
819}