vmcore/
vmtime.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Support for VM time.
5//!
6//! This is a VM-specific timeline, which monotonically increases when and only
7//! when the VM is running. This module provides types used to access this time
8//! and to wait for it to reach target times. This can be used in device
9//! emulators to implement VM timers.
10//!
11//! This is related to the idea of the hypervisor reference time, but it is not
12//! guaranteed to be the same value (and is likely not, except when the
13//! hypervisor reference time is emulated using VM time).
14//!
15//! The root of VM time keeping is the [`VmTimeKeeper`]. It manages a clock that
16//! can be shared via use of [`VmTimeAccess`] objects. Internally, this clock is
17//! based on an offset from the OS's monotonic clock while the VM is running, and
18//! a fixed time when the VM is not running.
19//!
20//! The infrastructure here supports access of VM time across multiple processes
21//! in the same OS (but not across machines, virtual or physical). See the
22//! comments on [`VmTimeSourceBuilder`] for more information.
23
24pub use saved_state::SavedState;
25
26use futures::StreamExt;
27use futures::future::join_all;
28use futures_concurrency::future::Race;
29use futures_concurrency::stream::Merge;
30use inspect::Inspect;
31use inspect::InspectMut;
32use mesh::MeshPayload;
33use mesh::payload::Protobuf;
34use mesh::rpc::Rpc;
35use mesh::rpc::RpcSend;
36use pal_async::driver::Driver;
37use pal_async::driver::PollImpl;
38use pal_async::driver::SpawnDriver;
39use pal_async::task::Task;
40use pal_async::timer::Instant;
41use pal_async::timer::PollTimer;
42use parking_lot::RwLock;
43use save_restore_derive::SavedStateRoot;
44use slab::Slab;
45use std::future::poll_fn;
46use std::sync::Arc;
47use std::task::Context;
48use std::task::Poll;
49use std::task::Waker;
50use std::time::Duration;
51use thiserror::Error;
52
53/// Roughly analogous to [`std::time::Instant`], but for VM time.
54#[derive(Debug, Copy, Clone, PartialEq, Eq, Protobuf, Inspect)]
55#[inspect(transparent)]
56#[mesh(transparent)]
57pub struct VmTime(#[inspect(hex)] u64);
58
59impl VmTime {
60    /// Converts from a time in 100ns units.
61    pub fn from_100ns(n: u64) -> Self {
62        Self(n)
63    }
64
65    /// Gets the time from VM boot (or some other origin) in 100ns units.
66    pub const fn as_100ns(&self) -> u64 {
67        self.0
68    }
69
70    /// Adds `d` to the time.
71    pub fn wrapping_add(self, d: Duration) -> Self {
72        Self((self.0 as u128).wrapping_add(d.as_nanos() / 100) as u64)
73    }
74
75    /// Returns whether `self` is before `t`.
76    ///
77    /// Note that this is a relative comparison in the 64-bit space and is not
78    /// transitive: if `a` is before `b`, and `b` is before `c`, `a` still may
79    /// be after `c`.
80    pub fn is_before(self, t: Self) -> bool {
81        let delta = self.0.wrapping_sub(t.0);
82        (delta as i64) < 0
83    }
84
85    /// Returns whether `self` is after `t`.
86    ///
87    /// See the comment about transitivity in [`Self::is_before`].
88    pub fn is_after(self, t: Self) -> bool {
89        let delta = self.0.wrapping_sub(t.0);
90        (delta as i64) > 0
91    }
92
93    /// Returns the time between `self` and `t`, returning `None` if `self` is
94    /// before `t`.
95    pub fn checked_sub(self, t: Self) -> Option<Duration> {
96        let delta = self.0.wrapping_sub(t.0);
97        if (delta as i64) >= 0 {
98            Some(duration_from_100ns(delta))
99        } else {
100            None
101        }
102    }
103
104    /// Returns `self` or `t`, whichever is earlier.
105    pub fn min(self, t: Self) -> Self {
106        if self.is_before(t) { self } else { t }
107    }
108
109    /// Returns `self` or `t`, whichever is later.
110    pub fn max(self, t: Self) -> Self {
111        if self.is_before(t) { t } else { self }
112    }
113}
114
115fn duration_from_100ns(n: u64) -> Duration {
116    const NUM_100NS_IN_SEC: u64 = 10 * 1000 * 1000;
117    Duration::new(n / NUM_100NS_IN_SEC, (n % NUM_100NS_IN_SEC) as u32 * 100)
118}
119
120#[derive(Inspect)]
121struct TimerState {
122    time: TimeState,
123    #[inspect(skip)]
124    timer: PollImpl<dyn PollTimer>,
125    #[inspect(with = "|x| inspect::iter_by_key(x.iter().map(|(_, w)| (&w.name, w)))")]
126    waiters: Slab<WaiterState>,
127    next: Option<VmTime>,
128    last: VmTime,
129}
130
131#[derive(Debug, Inspect)]
132struct WaiterState {
133    #[inspect(skip)] // used as a key
134    name: Arc<str>,
135    next: Option<VmTime>,
136    #[inspect(rename = "waiting", with = "Option::is_some")]
137    waker: Option<Waker>,
138}
139
140impl WaiterState {
141    fn new(name: Arc<str>) -> Self {
142        Self {
143            name,
144            next: None,
145            waker: None,
146        }
147    }
148}
149
150#[derive(Copy, Clone, Debug, Protobuf)]
151struct Timestamp {
152    vmtime: VmTime,
153    os_time: u64, // Instant::as_nanos()
154}
155
156impl Timestamp {
157    fn new(vmtime: VmTime, os_time: Instant) -> Self {
158        Self {
159            vmtime,
160            os_time: os_time.as_nanos(),
161        }
162    }
163
164    fn os_time(&self) -> Instant {
165        Instant::from_nanos(self.os_time)
166    }
167}
168
169impl TimerState {
170    fn new(driver: &impl Driver, uptime: VmTime) -> Self {
171        Self {
172            time: TimeState::Stopped(uptime),
173            timer: driver.new_dyn_timer(),
174            waiters: Slab::new(),
175            next: None,
176            last: uptime,
177        }
178    }
179
180    /// Starts the timer.
181    fn start(&mut self, now: Timestamp) {
182        let vmtime = self.time.stop_time().expect("should be stopped");
183        assert_eq!(now.vmtime, vmtime);
184        self.time = TimeState::Started(now);
185        tracing::trace!(?now, "vmtime start");
186        self.wake(now);
187    }
188
189    /// Stops the timer.
190    fn stop(&mut self, now_os: Instant) -> VmTime {
191        assert!(self.time.is_started());
192        let now = self.now(now_os);
193        self.time = TimeState::Stopped(now.vmtime);
194        tracing::debug!(?now, "vmtime stop");
195        now.vmtime
196    }
197
198    /// Resets the current time to `time`.
199    fn reset(&mut self, time: VmTime) {
200        assert!(!self.time.is_started());
201        self.time = TimeState::Stopped(time);
202        self.last = time;
203        self.next = None;
204        // Wake all the wakers to re-evaluate things.
205        for (_, waiter) in &mut self.waiters {
206            if let Some(waker) = waiter.waker.take() {
207                waker.wake();
208            }
209        }
210    }
211
212    /// Returns the timestamp corresponding to the given VM time.
213    ///
214    /// If the VM time is before the last start time, then the timestamp is at
215    /// the host time when the VM last started.
216    fn timestamp(&self, time: VmTime) -> Option<Timestamp> {
217        let start_time = self.time.start_time()?;
218        let since = time
219            .checked_sub(start_time.vmtime)
220            .unwrap_or(Duration::ZERO);
221        Some(Timestamp::new(time, start_time.os_time() + since))
222    }
223
224    /// Returns the current guest time given a host time.
225    fn now(&self, now_os: Instant) -> Timestamp {
226        self.time.now(now_os)
227    }
228
229    fn set_next(&mut self, next: VmTime) {
230        if !self.time.is_started() {
231            return;
232        }
233        if self
234            .next
235            .is_none_or(|current_next| next.is_before(current_next))
236        {
237            let deadline = self.timestamp(next).unwrap().os_time();
238            tracing::trace!(?deadline, "updating deadline");
239            self.timer.set_deadline(deadline);
240            self.next = Some(next);
241        }
242    }
243
244    fn wake(&mut self, now: Timestamp) {
245        assert!(!now.vmtime.is_before(self.last));
246        self.last = now.vmtime;
247        let mut next = None;
248        for (_, state) in &mut self.waiters {
249            if let Some(this_next) = state.next {
250                if this_next.is_after(now.vmtime) {
251                    if next.is_none_or(|next| this_next.is_before(next)) {
252                        next = Some(this_next);
253                    }
254                } else if let Some(waker) = state.waker.take() {
255                    waker.wake();
256                }
257            }
258        }
259        if let Some(next) = next {
260            self.set_next(next);
261        }
262    }
263
264    fn cancel_timeout(&mut self, index: usize) {
265        self.waiters[index].next = None;
266    }
267
268    /// Updates the next timeout for an individual waiter.
269    fn update_timeout(&mut self, index: usize, time: VmTime) {
270        let state = &mut self.waiters[index];
271        tracing::trace!(vmtime = ?time, user = state.name.as_ref(), "timeout update");
272        state.next = Some(time);
273        if time.is_before(self.last) {
274            // The wake time is even before the last timer wake, so just wake
275            // the waiter and skip updating the timer.
276            if let Some(waker) = state.waker.take() {
277                waker.wake();
278            }
279            return;
280        }
281
282        // Update the timer if needed.
283        if self.next.is_some_and(|next| next.is_before(time)) {
284            return;
285        }
286        self.set_next(time);
287    }
288
289    /// Polls a single waiter.
290    fn poll_timeout(
291        &mut self,
292        cx: &mut Context<'_>,
293        index: usize,
294        now_os: Instant,
295        next: Option<VmTime>,
296    ) -> Poll<Timestamp> {
297        let now = self.now(now_os);
298        let state = &mut self.waiters[index];
299        if next.is_some_and(|next| next.is_before(now.vmtime)) {
300            state.waker = None;
301            state.next = None;
302            return Poll::Ready(now);
303        }
304        state.next = next;
305        state.waker = Some(cx.waker().clone());
306        if let Some(next) = next {
307            self.set_next(next);
308        }
309        Poll::Pending
310    }
311
312    /// Polls the timer, waking any waiters whose timeout has expired.
313    fn poll(&mut self, cx: &mut Context<'_>) {
314        while self.time.is_started() {
315            let next = match self.next {
316                Some(_) => {
317                    // The timer's deadline is already set.
318                    tracing::trace!("polling existing deadline");
319                    None
320                }
321                None => {
322                    // Set the timer far out in the future.
323                    let deadline = Instant::now() + Duration::from_secs(86400);
324                    tracing::trace!(?deadline, "polling with long timeout");
325                    Some(deadline)
326                }
327            };
328            if let Poll::Ready(now) = self.timer.poll_timer(cx, next) {
329                self.next = None;
330                self.wake(self.now(now));
331            } else {
332                return;
333            }
334        }
335    }
336}
337
338/// A time keeper, which tracks the current time and all waiters.
339#[derive(Debug, InspectMut)]
340pub struct VmTimeKeeper {
341    #[inspect(skip)]
342    _task: Task<()>,
343    #[inspect(flatten, send = "KeeperRequest::Inspect")]
344    req_send: mesh::Sender<KeeperRequest>,
345    #[inspect(skip)]
346    builder: VmTimeSourceBuilder,
347    #[inspect(skip)]
348    time: TimeState,
349}
350
351// UNSAFETY: Needed to derive SavedStateRoot in the same crate it is declared
352#[expect(unsafe_code)]
353mod saved_state {
354    use super::*;
355
356    /// Saved state for [`VmTimeKeeper`].
357    #[derive(Protobuf, SavedStateRoot)]
358    #[mesh(package = "vmtime")]
359    pub struct SavedState {
360        #[mesh(1)]
361        pub(super) vmtime: VmTime,
362    }
363
364    impl SavedState {
365        /// Create a new instance of `SavedState` from an existing `VmTime`.
366        pub fn from_vmtime(vmtime: VmTime) -> Self {
367            SavedState { vmtime }
368        }
369    }
370}
371
372#[derive(Debug, MeshPayload, Copy, Clone)]
373enum TimeState {
374    Stopped(VmTime),
375    Started(Timestamp),
376}
377
378impl Inspect for TimeState {
379    fn inspect(&self, req: inspect::Request<'_>) {
380        let mut resp = req.respond();
381        let state = match *self {
382            TimeState::Stopped(_time) => "stopped",
383            TimeState::Started(time) => {
384                resp.field("start_time", time.vmtime);
385                "started"
386            }
387        };
388        resp.field("state", state)
389            .field("now", self.now(Instant::now()).vmtime);
390    }
391}
392
393impl TimeState {
394    fn is_started(&self) -> bool {
395        self.start_time().is_some()
396    }
397
398    fn stop_time(&self) -> Option<VmTime> {
399        match *self {
400            TimeState::Stopped(time) => Some(time),
401            TimeState::Started(_) => None,
402        }
403    }
404
405    fn start_time(&self) -> Option<Timestamp> {
406        match *self {
407            TimeState::Stopped(_) => None,
408            TimeState::Started(time) => Some(time),
409        }
410    }
411
412    fn now(&self, now_os: Instant) -> Timestamp {
413        match *self {
414            TimeState::Stopped(time) => Timestamp::new(time, now_os),
415            TimeState::Started(start_time) => {
416                if now_os >= start_time.os_time() {
417                    Timestamp::new(
418                        start_time
419                            .vmtime
420                            .wrapping_add(now_os - start_time.os_time()),
421                        now_os,
422                    )
423                } else {
424                    // `now` can be before `running.start_host` if it was captured
425                    // outside the lock and raced with the call to `start()`. Treat
426                    // this as `now` being the same as `start_host`.
427                    //
428                    // But if `now` is too much before `running.start_host`, then
429                    // there is probably some serious OS timekeeping bug, or maybe
430                    // the debugger broke in at just the wrong time.
431                    let delta = start_time.os_time() - now_os;
432                    if delta > Duration::from_secs(1) {
433                        tracing::error!(
434                            now = now_os.as_nanos(),
435                            start_host = start_time.os_time().as_nanos(),
436                            ?delta,
437                            "time went backward"
438                        );
439                    }
440                    start_time
441                }
442            }
443        }
444    }
445}
446
447impl VmTimeKeeper {
448    /// Creates a new time keeper with the specified current guest time.
449    pub fn new(driver: &impl SpawnDriver, uptime: VmTime) -> Self {
450        let (new_send, new_recv) = mesh::mpsc_channel();
451        let (req_send, req_recv) = mesh::channel();
452        let time = TimeState::Stopped(uptime);
453        let task = driver.spawn("vm-time-keeper", async move {
454            let mut primary = PrimaryKeeper {
455                req_recv,
456                new_recv,
457                keepers: Vec::new(),
458                next_id: 0,
459                time,
460            };
461            primary.run().await;
462        });
463        Self {
464            time,
465            req_send,
466            builder: VmTimeSourceBuilder { new_send },
467            _task: task,
468        }
469    }
470
471    /// Saves the time state.
472    pub fn save(&self) -> SavedState {
473        SavedState {
474            vmtime: self.time.stop_time().expect("should be stopped"),
475        }
476    }
477
478    /// Restores the time state.
479    pub async fn restore(&mut self, state: SavedState) {
480        let SavedState { vmtime } = state;
481        self.reset_to(vmtime).await
482    }
483
484    async fn reset_to(&mut self, vmtime: VmTime) {
485        assert!(!self.time.is_started(), "should be stopped");
486        self.time = TimeState::Stopped(vmtime);
487        self.req_send
488            .call(KeeperRequest::Reset, vmtime)
489            .await
490            .unwrap();
491    }
492
493    /// Reset the VM time to 0.
494    pub async fn reset(&mut self) {
495        self.reset_to(VmTime::from_100ns(0)).await
496    }
497
498    /// Starts the timer, so that the current time will increase.
499    pub async fn start(&mut self) {
500        let vmtime = self.time.stop_time().expect("should be stopped");
501        let timestamp = Timestamp::new(vmtime, Instant::now());
502        self.time = TimeState::Started(timestamp);
503        self.req_send
504            .call(KeeperRequest::Start, timestamp)
505            .await
506            .unwrap();
507    }
508
509    /// Stops the timer, so that the current time will stop increasing.
510    pub async fn stop(&mut self) {
511        assert!(self.time.is_started(), "should be running");
512        let stop_time = self.req_send.call(KeeperRequest::Stop, ()).await.unwrap();
513        self.time = TimeState::Stopped(stop_time);
514    }
515
516    /// Returns a time source builder, which can be used to spawn tasks that
517    /// back [`VmTimeSource`] instances, all backed by this time keeper's clock.
518    pub fn builder(&self) -> &VmTimeSourceBuilder {
519        &self.builder
520    }
521}
522
523/// A time source builder, used to spawn tasks that back [`VmTimeSource`]
524/// instances.
525///
526/// Note that this can be sent across processes via `mesh`.
527///
528/// However, the time keeping infrastructure assumes that all time keeping tasks
529/// share a single global monotonic OS clock. This means that if you send this
530/// across a VM/kernel/network boundary, the resulting time sources will not be
531/// in sync with each other.
532#[derive(MeshPayload, Clone, Debug)]
533pub struct VmTimeSourceBuilder {
534    new_send: mesh::Sender<NewKeeperRequest>,
535}
536
537/// Error returned by [`VmTimeSourceBuilder::build`] when the time keeper has
538/// been torn down.
539#[derive(Debug, Error)]
540#[error("the time keeper has been torn down")]
541pub struct TimeKeeperIsGone;
542
543impl VmTimeSourceBuilder {
544    /// Builds and spawns a backing task for [`VmTimeSource`]s. All
545    /// [`VmTimeSource`] instances cloned from the first one will share a
546    /// backing task.
547    pub async fn build(&self, driver: &impl SpawnDriver) -> Result<VmTimeSource, TimeKeeperIsGone> {
548        let (send, recv) = mesh::channel();
549        let time = self
550            .new_send
551            .call(NewKeeperRequest::New, send)
552            .await
553            .map_err(|_| TimeKeeperIsGone)?;
554
555        let mut state = Arc::new(RwLock::new(TimerState::new(driver, VmTime::from_100ns(0))));
556        // Synchronize the time.
557        {
558            let state = Arc::get_mut(&mut state).unwrap().get_mut();
559            match time {
560                TimeState::Stopped(vmtime) => state.reset(vmtime),
561                TimeState::Started(timestamp) => state.start(timestamp),
562            }
563        }
564        let mut keeper = SecondaryKeeper {
565            state: state.clone(),
566            recv,
567        };
568        driver
569            .spawn("vm-time", async move { keeper.run().await })
570            .detach();
571        Ok(VmTimeSource {
572            state,
573            remote: self.clone(),
574        })
575    }
576}
577
578/// Task that stores the current time state and manages the list of secondary
579/// keepers.
580///
581/// There is one of these per VM time clock (i.e. one per VM).
582#[derive(Inspect)]
583struct PrimaryKeeper {
584    #[inspect(skip)]
585    req_recv: mesh::Receiver<KeeperRequest>,
586    #[inspect(skip)]
587    new_recv: mesh::Receiver<NewKeeperRequest>,
588    #[inspect(
589        with = "|x| inspect::iter_by_key(x.iter().map(|(id, sender)| (id, inspect::send(sender, KeeperRequest::Inspect))))"
590    )]
591    keepers: Vec<(u64, mesh::Sender<KeeperRequest>)>,
592    #[inspect(skip)]
593    next_id: u64,
594    time: TimeState,
595}
596
597#[derive(MeshPayload)]
598enum KeeperRequest {
599    Start(Rpc<Timestamp, ()>),
600    Stop(Rpc<(), VmTime>),
601    Reset(Rpc<VmTime, ()>),
602    Inspect(inspect::Deferred),
603}
604
605#[derive(MeshPayload)]
606enum NewKeeperRequest {
607    New(Rpc<mesh::Sender<KeeperRequest>, TimeState>),
608}
609
610impl PrimaryKeeper {
611    async fn run(&mut self) {
612        enum Event {
613            New(NewKeeperRequest),
614            Request(KeeperRequest),
615        }
616
617        while let Some(event) = (
618            (&mut self.new_recv).map(Event::New),
619            (&mut self.req_recv).map(Event::Request),
620        )
621            .merge()
622            .next()
623            .await
624        {
625            // Garbage collect the existing keepers.
626            self.keepers.retain(|(_, s)| !s.is_closed());
627            match event {
628                Event::New(req) => match req {
629                    NewKeeperRequest::New(rpc) => rpc.handle_sync(|sender| {
630                        self.keepers.push((self.next_id, sender));
631                        self.next_id += 1;
632                        self.time
633                    }),
634                },
635                Event::Request(req) => {
636                    match req {
637                        KeeperRequest::Start(rpc) => {
638                            rpc.handle(async |start_time| {
639                                assert!(!self.time.is_started());
640                                self.time = TimeState::Started(start_time);
641                                join_all(self.keepers.iter().map(|(_, sender)| {
642                                    sender.call(KeeperRequest::Start, start_time)
643                                }))
644                                .await;
645                            })
646                            .await
647                        }
648                        KeeperRequest::Stop(rpc) => {
649                            rpc.handle(async |()| {
650                                let results = join_all(
651                                    self.keepers
652                                        .iter()
653                                        .map(|(_, sender)| sender.call(KeeperRequest::Stop, ())),
654                                )
655                                .await;
656
657                                let start_time = self.time.start_time().expect("should be running");
658                                let now = start_time
659                                    .vmtime
660                                    .wrapping_add(Instant::now() - start_time.os_time());
661
662                                // Compute the stop time as the max of all stop
663                                // times so that no keeper goes backwards next
664                                // start.
665                                let stop_time = results
666                                    .into_iter()
667                                    .filter_map(|r| r.ok())
668                                    .fold(now, |a, b| a.max(b));
669
670                                self.time = TimeState::Stopped(stop_time);
671
672                                // Update all the keepers with the stop time so that
673                                // it's consistent.
674                                join_all(self.keepers.iter().map(|(_, sender)| {
675                                    sender.call(KeeperRequest::Reset, stop_time)
676                                }))
677                                .await;
678
679                                stop_time
680                            })
681                            .await
682                        }
683                        KeeperRequest::Reset(rpc) => {
684                            rpc.handle(async |time| {
685                                assert!(!self.time.is_started(), "should not be running");
686                                self.time = TimeState::Stopped(time);
687                                join_all(
688                                    self.keepers
689                                        .iter()
690                                        .map(|(_, sender)| sender.call(KeeperRequest::Reset, time)),
691                                )
692                                .await;
693                            })
694                            .await
695                        }
696                        KeeperRequest::Inspect(deferred) => deferred.inspect(&self),
697                    }
698                }
699            }
700        }
701    }
702}
703
704/// Task that provides access to the current VM time.
705///
706/// There can be multiple of these per VM, across multiple processes. They are
707/// all backed by the same clock and report the same time.
708#[derive(InspectMut)]
709struct SecondaryKeeper {
710    #[inspect(flatten)]
711    state: Arc<RwLock<TimerState>>,
712    #[inspect(skip)]
713    recv: mesh::Receiver<KeeperRequest>,
714}
715
716impl SecondaryKeeper {
717    async fn run(&mut self) {
718        loop {
719            let r = {
720                let state = &self.state;
721                (
722                    self.recv.next(),
723                    poll_fn(|cx| {
724                        state.write().poll(cx);
725                        Poll::Pending
726                    }),
727                )
728                    .race()
729                    .await
730            };
731            match r {
732                Some(req) => match req {
733                    KeeperRequest::Start(rpc) => rpc.handle_sync(|start_time| {
734                        let mut state = self.state.write();
735                        state.start(start_time);
736                    }),
737                    KeeperRequest::Reset(rpc) => rpc.handle_sync(|vmtime| {
738                        let mut state = self.state.write();
739                        state.reset(vmtime);
740                    }),
741                    KeeperRequest::Stop(rpc) => rpc.handle_sync(|()| {
742                        let mut state = self.state.write();
743                        state.stop(Instant::now())
744                    }),
745                    KeeperRequest::Inspect(deferred) => deferred.inspect(&mut *self),
746                },
747                None => break,
748            }
749        }
750    }
751}
752
753/// A time source, used to instantiate [`VmTimeAccess`].
754#[derive(Clone)]
755pub struct VmTimeSource {
756    state: Arc<RwLock<TimerState>>,
757    remote: VmTimeSourceBuilder,
758}
759
760impl VmTimeSource {
761    /// Gets a time accessor.
762    ///
763    /// `name` is used for diagnostics via `inspect`.
764    pub fn access(&self, name: impl Into<Arc<str>>) -> VmTimeAccess {
765        let name = name.into();
766        VmTimeAccess {
767            timeout: None,
768            waiting: false,
769            index: self
770                .state
771                .write()
772                .waiters
773                .insert(WaiterState::new(name.clone())),
774            state: self.state.clone(),
775            name,
776        }
777    }
778
779    /// Gets the builder for creating additional time sources backing tasks
780    /// whose times are in sync with this one.
781    pub fn builder(&self) -> &VmTimeSourceBuilder {
782        &self.remote
783    }
784}
785
786/// An individual time accessor, used to query and wait for time.
787#[derive(Inspect)]
788pub struct VmTimeAccess {
789    timeout: Option<VmTime>,
790    waiting: bool,
791    #[inspect(skip)]
792    index: usize,
793    #[inspect(skip)]
794    state: Arc<RwLock<TimerState>>,
795    name: Arc<str>,
796}
797
798impl Drop for VmTimeAccess {
799    fn drop(&mut self) {
800        self.state.write().waiters.remove(self.index);
801    }
802}
803
804impl VmTimeAccess {
805    /// Gets the current time.
806    pub fn now(&self) -> VmTime {
807        let now = Instant::now();
808        self.state.read().now(now).vmtime
809    }
810
811    /// Returns the host time corresponding to a guest time.
812    ///
813    /// If the guest time is before the VM last resumed, then returns the time
814    /// the VM last resumed.
815    ///
816    /// If the VM is not running, returns `None`.
817    pub fn host_time(&self, time: VmTime) -> Option<Instant> {
818        Some(self.state.read().timestamp(time)?.os_time())
819    }
820
821    /// Get the currently set timeout.
822    pub fn get_timeout(&self) -> Option<VmTime> {
823        self.timeout
824    }
825
826    /// Sets the timeout [`poll_timeout`](Self::poll_timeout) will return ready.
827    pub fn set_timeout(&mut self, time: VmTime) {
828        self.timeout = Some(time);
829        if self.waiting {
830            self.state.write().update_timeout(self.index, time);
831        }
832    }
833
834    /// Sets the timeout for [`poll_timeout`](Self::poll_timeout) will return ready,
835    /// but only if `time` is earlier than the current timeout.
836    pub fn set_timeout_if_before(&mut self, time: VmTime) {
837        if self.timeout.is_none_or(|timeout| time.is_before(timeout)) {
838            self.set_timeout(time);
839        }
840    }
841
842    /// Clears the current timeout for [`poll_timeout`](Self::poll_timeout).
843    pub fn cancel_timeout(&mut self) {
844        if self.waiting && self.timeout.is_some() {
845            self.state.write().cancel_timeout(self.index);
846        }
847        self.timeout = None;
848    }
849
850    /// Polls the current time against the current timeout.
851    ///
852    /// Returns `Poll::Ready(self.now())` if the current timeout is before now.
853    /// Returns `Poll::Pending` if there is no current timeout, or if the
854    /// current timeout is after now.
855    ///
856    /// Although this takes `&self`, note that it only stores a single waker,
857    /// meaning that if you poll this from multiple tasks concurrently, only one
858    /// task will be woken when the time elapses. Create another instance of
859    /// this type with [`VmTimeSource`] if you need to poll this from multiple
860    /// tasks.
861    pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
862        let now = Instant::now();
863        match self
864            .state
865            .write()
866            .poll_timeout(cx, self.index, now, self.timeout)
867        {
868            Poll::Ready(now) => {
869                self.waiting = false;
870                self.timeout = None;
871                Poll::Ready(now.vmtime)
872            }
873            Poll::Pending => {
874                self.waiting = true;
875                Poll::Pending
876            }
877        }
878    }
879}
880
881#[derive(Debug, Inspect)]
882#[inspect(tag = "state")]
883enum VmTimerPeriodicInner {
884    Stopped,
885    Running {
886        last_timeout: VmTime,
887        period: Duration,
888    },
889}
890
891/// An abstraction over [`VmTimeAccess`] that streamlines the process of setting
892/// up a periodic timer.
893#[derive(Inspect)]
894pub struct VmTimerPeriodic {
895    vmtime: VmTimeAccess,
896    inner: VmTimerPeriodicInner,
897}
898
899impl VmTimerPeriodic {
900    /// Create a new periodic timer, backed by the given [`VmTimeAccess`].
901    pub fn new(vmtime_access: VmTimeAccess) -> Self {
902        Self {
903            vmtime: vmtime_access,
904            inner: VmTimerPeriodicInner::Stopped,
905        }
906    }
907
908    /// Cancel the timer.
909    ///
910    /// If the timer isn't running, this method is a no-op.
911    pub fn cancel(&mut self) {
912        self.vmtime.cancel_timeout();
913        self.inner = VmTimerPeriodicInner::Stopped;
914    }
915
916    /// Start the timer, configuring it to fire at the specified period.
917    ///
918    /// If the timer is currently running, the timer will be cancelled +
919    /// restarted.
920    pub fn start(&mut self, period: Duration) {
921        self.cancel();
922
923        let time = self.vmtime.now().wrapping_add(period);
924        self.vmtime.set_timeout(time);
925        self.inner = VmTimerPeriodicInner::Running {
926            last_timeout: time,
927            period,
928        }
929    }
930
931    /// Check if the timer is currently running.
932    pub fn is_running(&self) -> bool {
933        matches!(self.inner, VmTimerPeriodicInner::Running { .. })
934    }
935
936    /// Polls the timer.
937    ///
938    /// Returns `Poll::Ready(now)` when the timer is past-due, returning
939    /// `Poll::Pending` otherwise.
940    pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
941        match self.inner {
942            VmTimerPeriodicInner::Stopped => {
943                assert_eq!(self.vmtime.get_timeout(), None);
944                // Make sure the waker is still managed properly
945                // This is guaranteed to return Pending according to its documentation thanks to the above assert.
946                self.vmtime.poll_timeout(cx)
947            }
948            VmTimerPeriodicInner::Running {
949                ref mut last_timeout,
950                period,
951            } => {
952                let mut res = Poll::Pending;
953                while let Poll::Ready(now) = self.vmtime.poll_timeout(cx) {
954                    res = Poll::Ready(now);
955
956                    let time = last_timeout.wrapping_add(period);
957                    self.vmtime.set_timeout(time);
958                    *last_timeout = time;
959                }
960                res
961            }
962        }
963    }
964}
965
966#[cfg(test)]
967mod tests {
968    use super::VmTime;
969    use super::VmTimeKeeper;
970    use futures::FutureExt;
971    use pal_async::DefaultDriver;
972    use pal_async::async_test;
973    use pal_async::timer::PolledTimer;
974    use std::future::poll_fn;
975    use std::time::Duration;
976
977    #[async_test]
978    async fn test_vmtime(driver: DefaultDriver) {
979        let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
980        let mut access = keeper
981            .builder()
982            .build(&driver)
983            .await
984            .unwrap()
985            .access("test");
986        keeper.start().await;
987
988        // Test long timeout.
989        access.set_timeout(access.now().wrapping_add(Duration::from_secs(1000)));
990        let mut timer = PolledTimer::new(&driver);
991        futures::select! {
992            _ = timer.sleep(Duration::from_millis(50)).fuse() => {}
993            _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
994        }
995
996        // Test short timeout.
997        let deadline = access.now().wrapping_add(Duration::from_millis(10));
998        access.set_timeout(deadline);
999        futures::select! {
1000            _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1001            now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1002                assert!(now.is_after(deadline));
1003            }
1004        }
1005        // Timeout should be cleared by the successful poll.
1006        assert!(
1007            poll_fn(|cx| access.poll_timeout(cx))
1008                .now_or_never()
1009                .is_none()
1010        );
1011
1012        // Test changing timeout.
1013        let now = access.now();
1014        let deadline = now.wrapping_add(Duration::from_millis(2000));
1015        access.set_timeout(deadline);
1016        futures::select! {
1017            _ = timer.sleep(Duration::from_millis(30)).fuse() => {
1018                let deadline = now.wrapping_add(Duration::from_millis(50));
1019                access.set_timeout(deadline);
1020                futures::select! {
1021                    _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1022                    now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1023                        assert!(now.is_after(deadline));
1024                    }
1025                }
1026            }
1027            _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
1028        }
1029        keeper.stop().await;
1030    }
1031
1032    #[async_test]
1033    async fn test_multi_vmtime(driver: DefaultDriver) {
1034        let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
1035        let src1 = keeper.builder().build(&driver).await.unwrap();
1036        keeper.start().await;
1037        let src2 = src1.builder().build(&driver).await.unwrap();
1038        let acc1 = src1.access("test");
1039        let acc2 = src2.access("test");
1040        {
1041            let t1 = acc1.now();
1042            let t2 = acc2.now();
1043            let t3 = acc1.now();
1044            assert!(!t2.is_before(t1), "{t1:?} {t2:?}");
1045            assert!(!t3.is_before(t2), "{t2:?} {t3:?}");
1046        }
1047        let now = acc1.now();
1048        keeper.stop().await;
1049        let t1 = acc1.now();
1050        let t2 = acc2.now();
1051        assert!(!t1.is_before(now));
1052        assert_eq!(t1, t2);
1053        let zero = VmTime::from_100ns(0);
1054        // Even on very fast machines, at least _some_ time will have advanced.
1055        assert_ne!(t1, zero);
1056        keeper.reset().await;
1057        assert_eq!(acc1.now(), zero);
1058        assert_eq!(acc2.now(), zero);
1059    }
1060}