vmcore/
vmtime.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Support for VM time.
5//!
6//! This is a VM-specific timeline, which monotonically increases when and only
7//! when the VM is running. This module provides types used to access this time
8//! and to wait for it to reach target times. This can be used in device
9//! emulators to implement VM timers.
10//!
11//! This is related to the idea of the hypervisor reference time, but it is not
12//! guaranteed to be the same value (and is likely not, except when the
13//! hypervisor reference time is emulated using VM time).
14//!
15//! The root of VM time keeping is the [`VmTimeKeeper`]. It manages a clock that
16//! can be shared via use of [`VmTimeAccess`] objects. Internally, this clock is
17//! based on an offset from the OS's monotonic clock while the VM is running, and
18//! a fixed time when the VM is not running.
19//!
20//! The infrastructure here supports access of VM time across multiple processes
21//! in the same OS (but not across machines, virtual or physical). See the
22//! comments on [`VmTimeSourceBuilder`] for more information.
23
24pub use saved_state::SavedState;
25
26use futures::StreamExt;
27use futures::future::join_all;
28use futures_concurrency::future::Race;
29use futures_concurrency::stream::Merge;
30use inspect::Inspect;
31use inspect::InspectMut;
32use inspect::adhoc;
33use mesh::MeshPayload;
34use mesh::payload::Protobuf;
35use mesh::rpc::Rpc;
36use mesh::rpc::RpcSend;
37use pal_async::driver::Driver;
38use pal_async::driver::PollImpl;
39use pal_async::driver::SpawnDriver;
40use pal_async::task::Task;
41use pal_async::timer::Instant;
42use pal_async::timer::PollTimer;
43use parking_lot::RwLock;
44use save_restore_derive::SavedStateRoot;
45use slab::Slab;
46use std::future::poll_fn;
47use std::sync::Arc;
48use std::task::Context;
49use std::task::Poll;
50use std::task::Waker;
51use std::time::Duration;
52use thiserror::Error;
53
54/// Roughly analogous to [`std::time::Instant`], but for VM time.
55#[derive(Debug, Copy, Clone, PartialEq, Eq, Protobuf, Inspect)]
56#[inspect(transparent)]
57#[mesh(transparent)]
58pub struct VmTime(#[inspect(hex)] u64);
59
60impl VmTime {
61    /// Converts from a time in 100ns units.
62    pub fn from_100ns(n: u64) -> Self {
63        Self(n)
64    }
65
66    /// Gets the time from VM boot (or some other origin) in 100ns units.
67    pub const fn as_100ns(&self) -> u64 {
68        self.0
69    }
70
71    /// Adds `d` to the time.
72    pub fn wrapping_add(self, d: Duration) -> Self {
73        Self((self.0 as u128).wrapping_add(d.as_nanos() / 100) as u64)
74    }
75
76    /// Returns whether `self` is before `t`.
77    ///
78    /// Note that this is a relative comparison in the 64-bit space and is not
79    /// transitive: if `a` is before `b`, and `b` is before `c`, `a` still may
80    /// be after `c`.
81    pub fn is_before(self, t: Self) -> bool {
82        let delta = self.0.wrapping_sub(t.0);
83        (delta as i64) < 0
84    }
85
86    /// Returns whether `self` is after `t`.
87    ///
88    /// See the comment about transitivity in [`Self::is_before`].
89    pub fn is_after(self, t: Self) -> bool {
90        let delta = self.0.wrapping_sub(t.0);
91        (delta as i64) > 0
92    }
93
94    /// Returns the time between `self` and `t`, returning `None` if `self` is
95    /// before `t`.
96    pub fn checked_sub(self, t: Self) -> Option<Duration> {
97        let delta = self.0.wrapping_sub(t.0);
98        if (delta as i64) >= 0 {
99            Some(duration_from_100ns(delta))
100        } else {
101            None
102        }
103    }
104
105    /// Returns `self` or `t`, whichever is earlier.
106    pub fn min(self, t: Self) -> Self {
107        if self.is_before(t) { self } else { t }
108    }
109
110    /// Returns `self` or `t`, whichever is later.
111    pub fn max(self, t: Self) -> Self {
112        if self.is_before(t) { t } else { self }
113    }
114}
115
116fn duration_from_100ns(n: u64) -> Duration {
117    const NUM_100NS_IN_SEC: u64 = 10 * 1000 * 1000;
118    Duration::new(n / NUM_100NS_IN_SEC, (n % NUM_100NS_IN_SEC) as u32 * 100)
119}
120
121#[derive(Inspect)]
122struct TimerState {
123    time: TimeState,
124    #[inspect(skip)]
125    timer: PollImpl<dyn PollTimer>,
126    #[inspect(with = "|x| inspect::iter_by_key(x.iter().map(|(_, w)| (&w.name, w)))")]
127    waiters: Slab<WaiterState>,
128    next: Option<VmTime>,
129    last: VmTime,
130}
131
132#[derive(Debug, Inspect)]
133struct WaiterState {
134    #[inspect(skip)] // used as a key
135    name: Arc<str>,
136    next: Option<VmTime>,
137    #[inspect(rename = "waiting", with = "Option::is_some")]
138    waker: Option<Waker>,
139}
140
141impl WaiterState {
142    fn new(name: Arc<str>) -> Self {
143        Self {
144            name,
145            next: None,
146            waker: None,
147        }
148    }
149}
150
151#[derive(Copy, Clone, Debug, Protobuf)]
152struct Timestamp {
153    vmtime: VmTime,
154    os_time: u64, // Instant::as_nanos()
155}
156
157impl Timestamp {
158    fn new(vmtime: VmTime, os_time: Instant) -> Self {
159        Self {
160            vmtime,
161            os_time: os_time.as_nanos(),
162        }
163    }
164
165    fn os_time(&self) -> Instant {
166        Instant::from_nanos(self.os_time)
167    }
168}
169
170impl TimerState {
171    fn new(driver: &impl Driver, uptime: VmTime) -> Self {
172        Self {
173            time: TimeState::Stopped(uptime),
174            timer: driver.new_dyn_timer(),
175            waiters: Slab::new(),
176            next: None,
177            last: uptime,
178        }
179    }
180
181    /// Starts the timer.
182    fn start(&mut self, now: Timestamp) {
183        let vmtime = self.time.stop_time().expect("should be stopped");
184        assert_eq!(now.vmtime, vmtime);
185        self.time = TimeState::Started(now);
186        tracing::trace!(?now, "vmtime start");
187        self.wake(now);
188    }
189
190    /// Stops the timer.
191    fn stop(&mut self, now_os: Instant) -> VmTime {
192        assert!(self.time.is_started());
193        let now = self.now(now_os);
194        self.time = TimeState::Stopped(now.vmtime);
195        tracing::debug!(?now, "vmtime stop");
196        now.vmtime
197    }
198
199    /// Resets the current time to `time`.
200    fn reset(&mut self, time: VmTime) {
201        assert!(!self.time.is_started());
202        self.time = TimeState::Stopped(time);
203        self.last = time;
204        self.next = None;
205        // Wake all the wakers to re-evaluate things.
206        for (_, waiter) in &mut self.waiters {
207            if let Some(waker) = waiter.waker.take() {
208                waker.wake();
209            }
210        }
211    }
212
213    /// Returns the timestamp corresponding to the given VM time.
214    ///
215    /// If the VM time is before the last start time, then the timestamp is at
216    /// the host time when the VM last started.
217    fn timestamp(&self, time: VmTime) -> Option<Timestamp> {
218        let start_time = self.time.start_time()?;
219        let since = time
220            .checked_sub(start_time.vmtime)
221            .unwrap_or(Duration::ZERO);
222        Some(Timestamp::new(time, start_time.os_time() + since))
223    }
224
225    /// Returns the current guest time given a host time.
226    fn now(&self, now_os: Instant) -> Timestamp {
227        self.time.now(now_os)
228    }
229
230    fn set_next(&mut self, next: VmTime) {
231        if !self.time.is_started() {
232            return;
233        }
234        if self
235            .next
236            .is_none_or(|current_next| next.is_before(current_next))
237        {
238            let deadline = self.timestamp(next).unwrap().os_time();
239            tracing::trace!(?deadline, "updating deadline");
240            self.timer.set_deadline(deadline);
241            self.next = Some(next);
242        }
243    }
244
245    fn wake(&mut self, now: Timestamp) {
246        assert!(!now.vmtime.is_before(self.last));
247        self.last = now.vmtime;
248        let mut next = None;
249        for (_, state) in &mut self.waiters {
250            if let Some(this_next) = state.next {
251                if this_next.is_after(now.vmtime) {
252                    if next.is_none_or(|next| this_next.is_before(next)) {
253                        next = Some(this_next);
254                    }
255                } else if let Some(waker) = state.waker.take() {
256                    waker.wake();
257                }
258            }
259        }
260        if let Some(next) = next {
261            self.set_next(next);
262        }
263    }
264
265    fn cancel_timeout(&mut self, index: usize) {
266        self.waiters[index].next = None;
267    }
268
269    /// Updates the next timeout for an individual waiter.
270    fn update_timeout(&mut self, index: usize, time: VmTime) {
271        let state = &mut self.waiters[index];
272        tracing::trace!(vmtime = ?time, user = state.name.as_ref(), "timeout update");
273        state.next = Some(time);
274        if time.is_before(self.last) {
275            // The wake time is even before the last timer wake, so just wake
276            // the waiter and skip updating the timer.
277            if let Some(waker) = state.waker.take() {
278                waker.wake();
279            }
280            return;
281        }
282
283        // Update the timer if needed.
284        if self.next.is_some_and(|next| next.is_before(time)) {
285            return;
286        }
287        self.set_next(time);
288    }
289
290    /// Polls a single waiter.
291    fn poll_timeout(
292        &mut self,
293        cx: &mut Context<'_>,
294        index: usize,
295        now_os: Instant,
296        next: Option<VmTime>,
297    ) -> Poll<Timestamp> {
298        let now = self.now(now_os);
299        let state = &mut self.waiters[index];
300        if next.is_some_and(|next| next.is_before(now.vmtime)) {
301            state.waker = None;
302            state.next = None;
303            return Poll::Ready(now);
304        }
305        state.next = next;
306        state.waker = Some(cx.waker().clone());
307        if let Some(next) = next {
308            self.set_next(next);
309        }
310        Poll::Pending
311    }
312
313    /// Polls the timer, waking any waiters whose timeout has expired.
314    fn poll(&mut self, cx: &mut Context<'_>) {
315        while self.time.is_started() {
316            let next = match self.next {
317                Some(_) => {
318                    // The timer's deadline is already set.
319                    tracing::trace!("polling existing deadline");
320                    None
321                }
322                None => {
323                    // Set the timer far out in the future.
324                    let deadline = Instant::now() + Duration::from_secs(86400);
325                    tracing::trace!(?deadline, "polling with long timeout");
326                    Some(deadline)
327                }
328            };
329            if let Poll::Ready(now) = self.timer.poll_timer(cx, next) {
330                self.next = None;
331                self.wake(self.now(now));
332            } else {
333                return;
334            }
335        }
336    }
337}
338
339/// A time keeper, which tracks the current time and all waiters.
340#[derive(Debug)]
341pub struct VmTimeKeeper {
342    _task: Task<()>,
343    req_send: mesh::Sender<KeeperRequest>,
344    builder: VmTimeSourceBuilder,
345    time: TimeState,
346}
347
348// UNSAFETY: Needed to derive SavedStateRoot in the same crate it is declared
349#[expect(unsafe_code)]
350mod saved_state {
351    use super::*;
352
353    /// Saved state for [`VmTimeKeeper`].
354    #[derive(Protobuf, SavedStateRoot)]
355    #[mesh(package = "vmtime")]
356    pub struct SavedState {
357        #[mesh(1)]
358        pub(super) vmtime: VmTime,
359    }
360
361    impl SavedState {
362        /// Create a new instance of `SavedState` from an existing `VmTime`.
363        pub fn from_vmtime(vmtime: VmTime) -> Self {
364            SavedState { vmtime }
365        }
366    }
367}
368
369#[derive(Debug, MeshPayload, Copy, Clone)]
370enum TimeState {
371    Stopped(VmTime),
372    Started(Timestamp),
373}
374
375impl Inspect for TimeState {
376    fn inspect(&self, req: inspect::Request<'_>) {
377        let mut resp = req.respond();
378        let state = match *self {
379            TimeState::Stopped(_time) => "stopped",
380            TimeState::Started(time) => {
381                resp.field("start_time", time.vmtime);
382                "started"
383            }
384        };
385        resp.field("state", state)
386            .field("now", self.now(Instant::now()).vmtime);
387    }
388}
389
390impl TimeState {
391    fn is_started(&self) -> bool {
392        self.start_time().is_some()
393    }
394
395    fn stop_time(&self) -> Option<VmTime> {
396        match *self {
397            TimeState::Stopped(time) => Some(time),
398            TimeState::Started(_) => None,
399        }
400    }
401
402    fn start_time(&self) -> Option<Timestamp> {
403        match *self {
404            TimeState::Stopped(_) => None,
405            TimeState::Started(time) => Some(time),
406        }
407    }
408
409    fn now(&self, now_os: Instant) -> Timestamp {
410        match *self {
411            TimeState::Stopped(time) => Timestamp::new(time, now_os),
412            TimeState::Started(start_time) => {
413                if now_os >= start_time.os_time() {
414                    Timestamp::new(
415                        start_time
416                            .vmtime
417                            .wrapping_add(now_os - start_time.os_time()),
418                        now_os,
419                    )
420                } else {
421                    // `now` can be before `running.start_host` if it was captured
422                    // outside the lock and raced with the call to `start()`. Treat
423                    // this as `now` being the same as `start_host`.
424                    //
425                    // But if `now` is too much before `running.start_host`, then
426                    // there is probably some serious OS timekeeping bug, or maybe
427                    // the debugger broke in at just the wrong time.
428                    let delta = start_time.os_time() - now_os;
429                    if delta > Duration::from_secs(1) {
430                        tracing::error!(
431                            now = now_os.as_nanos(),
432                            start_host = start_time.os_time().as_nanos(),
433                            ?delta,
434                            "time went backward"
435                        );
436                    }
437                    start_time
438                }
439            }
440        }
441    }
442}
443
444impl InspectMut for VmTimeKeeper {
445    fn inspect_mut(&mut self, req: inspect::Request<'_>) {
446        self.req_send.send(KeeperRequest::Inspect(req.defer()));
447    }
448}
449
450impl VmTimeKeeper {
451    /// Creates a new time keeper with the specified current guest time.
452    pub fn new(driver: &impl SpawnDriver, uptime: VmTime) -> Self {
453        let (new_send, new_recv) = mesh::mpsc_channel();
454        let (req_send, req_recv) = mesh::channel();
455        let time = TimeState::Stopped(uptime);
456        let task = driver.spawn("vm-time-keeper", async move {
457            let mut primary = PrimaryKeeper {
458                req_recv,
459                new_recv,
460                keepers: Vec::new(),
461                next_id: 0,
462                time,
463            };
464            primary.run().await;
465        });
466        Self {
467            time,
468            req_send,
469            builder: VmTimeSourceBuilder { new_send },
470            _task: task,
471        }
472    }
473
474    /// Saves the time state.
475    pub fn save(&self) -> SavedState {
476        SavedState {
477            vmtime: self.time.stop_time().expect("should be stopped"),
478        }
479    }
480
481    /// Restores the time state.
482    pub async fn restore(&mut self, state: SavedState) {
483        let SavedState { vmtime } = state;
484        self.reset_to(vmtime).await
485    }
486
487    async fn reset_to(&mut self, vmtime: VmTime) {
488        assert!(!self.time.is_started(), "should be stopped");
489        self.time = TimeState::Stopped(vmtime);
490        self.req_send
491            .call(KeeperRequest::Reset, vmtime)
492            .await
493            .unwrap();
494    }
495
496    /// Reset the VM time to 0.
497    pub async fn reset(&mut self) {
498        self.reset_to(VmTime::from_100ns(0)).await
499    }
500
501    /// Starts the timer, so that the current time will increase.
502    pub async fn start(&mut self) {
503        let vmtime = self.time.stop_time().expect("should be stopped");
504        let timestamp = Timestamp::new(vmtime, Instant::now());
505        self.time = TimeState::Started(timestamp);
506        self.req_send
507            .call(KeeperRequest::Start, timestamp)
508            .await
509            .unwrap();
510    }
511
512    /// Stops the timer, so that the current time will stop increasing.
513    pub async fn stop(&mut self) {
514        assert!(self.time.is_started(), "should be running");
515        let stop_time = self.req_send.call(KeeperRequest::Stop, ()).await.unwrap();
516        self.time = TimeState::Stopped(stop_time);
517    }
518
519    /// Returns a time source builder, which can be used to spawn tasks that
520    /// back [`VmTimeSource`] instances, all backed by this time keeper's clock.
521    pub fn builder(&self) -> &VmTimeSourceBuilder {
522        &self.builder
523    }
524}
525
526/// A time source builder, used to spawn tasks that back [`VmTimeSource`]
527/// instances.
528///
529/// Note that this can be sent across processes via `mesh`.
530///
531/// However, the time keeping infrastructure assumes that all time keeping tasks
532/// share a single global monotonic OS clock. This means that if you send this
533/// across a VM/kernel/network boundary, the resulting time sources will not be
534/// in sync with each other.
535#[derive(MeshPayload, Clone, Debug)]
536pub struct VmTimeSourceBuilder {
537    new_send: mesh::Sender<NewKeeperRequest>,
538}
539
540/// Error returned by [`VmTimeSourceBuilder::build`] when the time keeper has
541/// been torn down.
542#[derive(Debug, Error)]
543#[error("the time keeper has been torn down")]
544pub struct TimeKeeperIsGone;
545
546impl VmTimeSourceBuilder {
547    /// Builds and spawns a backing task for [`VmTimeSource`]s. All
548    /// [`VmTimeSource`] instances cloned from the first one will share a
549    /// backing task.
550    pub async fn build(&self, driver: &impl SpawnDriver) -> Result<VmTimeSource, TimeKeeperIsGone> {
551        let (send, recv) = mesh::channel();
552        let time = self
553            .new_send
554            .call(NewKeeperRequest::New, send)
555            .await
556            .map_err(|_| TimeKeeperIsGone)?;
557
558        let mut state = Arc::new(RwLock::new(TimerState::new(driver, VmTime::from_100ns(0))));
559        // Synchronize the time.
560        {
561            let state = Arc::get_mut(&mut state).unwrap().get_mut();
562            match time {
563                TimeState::Stopped(vmtime) => state.reset(vmtime),
564                TimeState::Started(timestamp) => state.start(timestamp),
565            }
566        }
567        let mut keeper = SecondaryKeeper {
568            state: state.clone(),
569            recv,
570        };
571        driver
572            .spawn("vm-time", async move { keeper.run().await })
573            .detach();
574        Ok(VmTimeSource {
575            state,
576            remote: self.clone(),
577        })
578    }
579}
580
581/// Task that stores the current time state and manages the list of secondary
582/// keepers.
583///
584/// There is one of these per VM time clock (i.e. one per VM).
585#[derive(Inspect)]
586#[inspect(extra = "Self::inspect_extra")]
587struct PrimaryKeeper {
588    #[inspect(skip)]
589    req_recv: mesh::Receiver<KeeperRequest>,
590    #[inspect(skip)]
591    new_recv: mesh::Receiver<NewKeeperRequest>,
592    #[inspect(skip)]
593    keepers: Vec<(u64, mesh::Sender<KeeperRequest>)>,
594    #[inspect(skip)]
595    next_id: u64,
596    time: TimeState,
597}
598
599#[derive(MeshPayload)]
600enum KeeperRequest {
601    Start(Rpc<Timestamp, ()>),
602    Stop(Rpc<(), VmTime>),
603    Reset(Rpc<VmTime, ()>),
604    Inspect(inspect::Deferred),
605}
606
607#[derive(MeshPayload)]
608enum NewKeeperRequest {
609    New(Rpc<mesh::Sender<KeeperRequest>, TimeState>),
610}
611
612impl PrimaryKeeper {
613    fn inspect_extra(&self, resp: &mut inspect::Response<'_>) {
614        resp.fields(
615            "keepers",
616            self.keepers
617                .iter()
618                .map(|&(id, ref s)| (id, adhoc(|req| s.send(KeeperRequest::Inspect(req.defer()))))),
619        );
620    }
621
622    async fn run(&mut self) {
623        enum Event {
624            New(NewKeeperRequest),
625            Request(KeeperRequest),
626        }
627
628        while let Some(event) = (
629            (&mut self.new_recv).map(Event::New),
630            (&mut self.req_recv).map(Event::Request),
631        )
632            .merge()
633            .next()
634            .await
635        {
636            // Garbage collect the existing keepers.
637            self.keepers.retain(|(_, s)| !s.is_closed());
638            match event {
639                Event::New(req) => match req {
640                    NewKeeperRequest::New(rpc) => rpc.handle_sync(|sender| {
641                        self.keepers.push((self.next_id, sender));
642                        self.next_id += 1;
643                        self.time
644                    }),
645                },
646                Event::Request(req) => {
647                    match req {
648                        KeeperRequest::Start(rpc) => {
649                            rpc.handle(async |start_time| {
650                                assert!(!self.time.is_started());
651                                self.time = TimeState::Started(start_time);
652                                join_all(self.keepers.iter().map(|(_, sender)| {
653                                    sender.call(KeeperRequest::Start, start_time)
654                                }))
655                                .await;
656                            })
657                            .await
658                        }
659                        KeeperRequest::Stop(rpc) => {
660                            rpc.handle(async |()| {
661                                let results = join_all(
662                                    self.keepers
663                                        .iter()
664                                        .map(|(_, sender)| sender.call(KeeperRequest::Stop, ())),
665                                )
666                                .await;
667
668                                let start_time = self.time.start_time().expect("should be running");
669                                let now = start_time
670                                    .vmtime
671                                    .wrapping_add(Instant::now() - start_time.os_time());
672
673                                // Compute the stop time as the max of all stop
674                                // times so that no keeper goes backwards next
675                                // start.
676                                let stop_time = results
677                                    .into_iter()
678                                    .filter_map(|r| r.ok())
679                                    .fold(now, |a, b| a.max(b));
680
681                                self.time = TimeState::Stopped(stop_time);
682
683                                // Update all the keepers with the stop time so that
684                                // it's consistent.
685                                join_all(self.keepers.iter().map(|(_, sender)| {
686                                    sender.call(KeeperRequest::Reset, stop_time)
687                                }))
688                                .await;
689
690                                stop_time
691                            })
692                            .await
693                        }
694                        KeeperRequest::Reset(rpc) => {
695                            rpc.handle(async |time| {
696                                assert!(!self.time.is_started(), "should not be running");
697                                self.time = TimeState::Stopped(time);
698                                join_all(
699                                    self.keepers
700                                        .iter()
701                                        .map(|(_, sender)| sender.call(KeeperRequest::Reset, time)),
702                                )
703                                .await;
704                            })
705                            .await
706                        }
707                        KeeperRequest::Inspect(deferred) => deferred.inspect(&self),
708                    }
709                }
710            }
711        }
712    }
713}
714
715/// Task that provides access to the current VM time.
716///
717/// There can be multiple of these per VM, across multiple processes. They are
718/// all backed by the same clock and report the same time.
719#[derive(InspectMut)]
720struct SecondaryKeeper {
721    #[inspect(flatten)]
722    state: Arc<RwLock<TimerState>>,
723    #[inspect(skip)]
724    recv: mesh::Receiver<KeeperRequest>,
725}
726
727impl SecondaryKeeper {
728    async fn run(&mut self) {
729        loop {
730            let r = {
731                let state = &self.state;
732                (
733                    self.recv.next(),
734                    poll_fn(|cx| {
735                        state.write().poll(cx);
736                        Poll::Pending
737                    }),
738                )
739                    .race()
740                    .await
741            };
742            match r {
743                Some(req) => match req {
744                    KeeperRequest::Start(rpc) => rpc.handle_sync(|start_time| {
745                        let mut state = self.state.write();
746                        state.start(start_time);
747                    }),
748                    KeeperRequest::Reset(rpc) => rpc.handle_sync(|vmtime| {
749                        let mut state = self.state.write();
750                        state.reset(vmtime);
751                    }),
752                    KeeperRequest::Stop(rpc) => rpc.handle_sync(|()| {
753                        let mut state = self.state.write();
754                        state.stop(Instant::now())
755                    }),
756                    KeeperRequest::Inspect(deferred) => deferred.inspect(&mut *self),
757                },
758                None => break,
759            }
760        }
761    }
762}
763
764/// A time source, used to instantiate [`VmTimeAccess`].
765#[derive(Clone)]
766pub struct VmTimeSource {
767    state: Arc<RwLock<TimerState>>,
768    remote: VmTimeSourceBuilder,
769}
770
771impl VmTimeSource {
772    /// Gets a time accessor.
773    ///
774    /// `name` is used for diagnostics via `inspect`.
775    pub fn access(&self, name: impl Into<Arc<str>>) -> VmTimeAccess {
776        let name = name.into();
777        VmTimeAccess {
778            timeout: None,
779            waiting: false,
780            index: self
781                .state
782                .write()
783                .waiters
784                .insert(WaiterState::new(name.clone())),
785            state: self.state.clone(),
786            name,
787        }
788    }
789
790    /// Gets the builder for creating additional time sources backing tasks
791    /// whose times are in sync with this one.
792    pub fn builder(&self) -> &VmTimeSourceBuilder {
793        &self.remote
794    }
795}
796
797/// An individual time accessor, used to query and wait for time.
798#[derive(Inspect)]
799pub struct VmTimeAccess {
800    timeout: Option<VmTime>,
801    waiting: bool,
802    #[inspect(skip)]
803    index: usize,
804    #[inspect(skip)]
805    state: Arc<RwLock<TimerState>>,
806    name: Arc<str>,
807}
808
809impl Drop for VmTimeAccess {
810    fn drop(&mut self) {
811        self.state.write().waiters.remove(self.index);
812    }
813}
814
815impl VmTimeAccess {
816    /// Gets the current time.
817    pub fn now(&self) -> VmTime {
818        let now = Instant::now();
819        self.state.read().now(now).vmtime
820    }
821
822    /// Returns the host time corresponding to a guest time.
823    ///
824    /// If the guest time is before the VM last resumed, then returns the time
825    /// the VM last resumed.
826    ///
827    /// If the VM is not running, returns `None`.
828    pub fn host_time(&self, time: VmTime) -> Option<Instant> {
829        Some(self.state.read().timestamp(time)?.os_time())
830    }
831
832    /// Get the currently set timeout.
833    pub fn get_timeout(&self) -> Option<VmTime> {
834        self.timeout
835    }
836
837    /// Sets the timeout [`poll_timeout`](Self::poll_timeout) will return ready.
838    pub fn set_timeout(&mut self, time: VmTime) {
839        self.timeout = Some(time);
840        if self.waiting {
841            self.state.write().update_timeout(self.index, time);
842        }
843    }
844
845    /// Sets the timeout for [`poll_timeout`](Self::poll_timeout) will return ready,
846    /// but only if `time` is earlier than the current timeout.
847    pub fn set_timeout_if_before(&mut self, time: VmTime) {
848        if self.timeout.is_none_or(|timeout| time.is_before(timeout)) {
849            self.set_timeout(time);
850        }
851    }
852
853    /// Clears the current timeout for [`poll_timeout`](Self::poll_timeout).
854    pub fn cancel_timeout(&mut self) {
855        if self.waiting && self.timeout.is_some() {
856            self.state.write().cancel_timeout(self.index);
857        }
858        self.timeout = None;
859    }
860
861    /// Polls the current time against the current timeout.
862    ///
863    /// Returns `Poll::Ready(self.now())` if the current timeout is before now.
864    /// Returns `Poll::Pending` if there is no current timeout, or if the
865    /// current timeout is after now.
866    ///
867    /// Although this takes `&self`, note that it only stores a single waker,
868    /// meaning that if you poll this from multiple tasks concurrently, only one
869    /// task will be woken when the time elapses. Create another instance of
870    /// this type with [`VmTimeSource`] if you need to poll this from multiple
871    /// tasks.
872    pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
873        let now = Instant::now();
874        match self
875            .state
876            .write()
877            .poll_timeout(cx, self.index, now, self.timeout)
878        {
879            Poll::Ready(now) => {
880                self.waiting = false;
881                self.timeout = None;
882                Poll::Ready(now.vmtime)
883            }
884            Poll::Pending => {
885                self.waiting = true;
886                Poll::Pending
887            }
888        }
889    }
890}
891
892#[derive(Debug, Inspect)]
893#[inspect(tag = "state")]
894enum VmTimerPeriodicInner {
895    Stopped,
896    Running {
897        last_timeout: VmTime,
898        #[inspect(debug)]
899        period: Duration,
900    },
901}
902
903/// An abstraction over [`VmTimeAccess`] that streamlines the process of setting
904/// up a periodic timer.
905#[derive(Inspect)]
906pub struct VmTimerPeriodic {
907    vmtime: VmTimeAccess,
908    inner: VmTimerPeriodicInner,
909}
910
911impl VmTimerPeriodic {
912    /// Create a new periodic timer, backed by the given [`VmTimeAccess`].
913    pub fn new(vmtime_access: VmTimeAccess) -> Self {
914        Self {
915            vmtime: vmtime_access,
916            inner: VmTimerPeriodicInner::Stopped,
917        }
918    }
919
920    /// Cancel the timer.
921    ///
922    /// If the timer isn't running, this method is a no-op.
923    pub fn cancel(&mut self) {
924        self.vmtime.cancel_timeout();
925        self.inner = VmTimerPeriodicInner::Stopped;
926    }
927
928    /// Start the timer, configuring it to fire at the specified period.
929    ///
930    /// If the timer is currently running, the timer will be cancelled +
931    /// restarted.
932    pub fn start(&mut self, period: Duration) {
933        self.cancel();
934
935        let time = self.vmtime.now().wrapping_add(period);
936        self.vmtime.set_timeout(time);
937        self.inner = VmTimerPeriodicInner::Running {
938            last_timeout: time,
939            period,
940        }
941    }
942
943    /// Check if the timer is currently running.
944    pub fn is_running(&self) -> bool {
945        matches!(self.inner, VmTimerPeriodicInner::Running { .. })
946    }
947
948    /// Polls the timer.
949    ///
950    /// Returns `Poll::Ready(now)` when the timer is past-due, returning
951    /// `Poll::Pending` otherwise.
952    pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
953        match self.inner {
954            VmTimerPeriodicInner::Stopped => {
955                assert_eq!(self.vmtime.get_timeout(), None);
956                // Make sure the waker is still managed properly
957                // This is guaranteed to return Pending according to its documentation thanks to the above assert.
958                self.vmtime.poll_timeout(cx)
959            }
960            VmTimerPeriodicInner::Running {
961                ref mut last_timeout,
962                period,
963            } => {
964                let mut res = Poll::Pending;
965                while let Poll::Ready(now) = self.vmtime.poll_timeout(cx) {
966                    res = Poll::Ready(now);
967
968                    let time = last_timeout.wrapping_add(period);
969                    self.vmtime.set_timeout(time);
970                    *last_timeout = time;
971                }
972                res
973            }
974        }
975    }
976}
977
978#[cfg(test)]
979mod tests {
980    use super::VmTime;
981    use super::VmTimeKeeper;
982    use futures::FutureExt;
983    use pal_async::DefaultDriver;
984    use pal_async::async_test;
985    use pal_async::timer::PolledTimer;
986    use std::future::poll_fn;
987    use std::time::Duration;
988
989    #[async_test]
990    async fn test_vmtime(driver: DefaultDriver) {
991        let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
992        let mut access = keeper
993            .builder()
994            .build(&driver)
995            .await
996            .unwrap()
997            .access("test");
998        keeper.start().await;
999
1000        // Test long timeout.
1001        access.set_timeout(access.now().wrapping_add(Duration::from_secs(1000)));
1002        let mut timer = PolledTimer::new(&driver);
1003        futures::select! {
1004            _ = timer.sleep(Duration::from_millis(50)).fuse() => {}
1005            _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
1006        }
1007
1008        // Test short timeout.
1009        let deadline = access.now().wrapping_add(Duration::from_millis(10));
1010        access.set_timeout(deadline);
1011        futures::select! {
1012            _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1013            now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1014                assert!(now.is_after(deadline));
1015            }
1016        }
1017        // Timeout should be cleared by the successful poll.
1018        assert!(
1019            poll_fn(|cx| access.poll_timeout(cx))
1020                .now_or_never()
1021                .is_none()
1022        );
1023
1024        // Test changing timeout.
1025        let now = access.now();
1026        let deadline = now.wrapping_add(Duration::from_millis(2000));
1027        access.set_timeout(deadline);
1028        futures::select! {
1029            _ = timer.sleep(Duration::from_millis(30)).fuse() => {
1030                let deadline = now.wrapping_add(Duration::from_millis(50));
1031                access.set_timeout(deadline);
1032                futures::select! {
1033                    _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1034                    now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1035                        assert!(now.is_after(deadline));
1036                    }
1037                }
1038            }
1039            _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
1040        }
1041        keeper.stop().await;
1042    }
1043
1044    #[async_test]
1045    async fn test_multi_vmtime(driver: DefaultDriver) {
1046        let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
1047        let src1 = keeper.builder().build(&driver).await.unwrap();
1048        keeper.start().await;
1049        let src2 = src1.builder().build(&driver).await.unwrap();
1050        let acc1 = src1.access("test");
1051        let acc2 = src2.access("test");
1052        {
1053            let t1 = acc1.now();
1054            let t2 = acc2.now();
1055            let t3 = acc1.now();
1056            assert!(!t2.is_before(t1), "{t1:?} {t2:?}");
1057            assert!(!t3.is_before(t2), "{t2:?} {t3:?}");
1058        }
1059        let now = acc1.now();
1060        keeper.stop().await;
1061        let t1 = acc1.now();
1062        let t2 = acc2.now();
1063        assert!(!t1.is_before(now));
1064        assert_eq!(t1, t2);
1065        let zero = VmTime::from_100ns(0);
1066        // Even on very fast machines, at least _some_ time will have advanced.
1067        assert_ne!(t1, zero);
1068        keeper.reset().await;
1069        assert_eq!(acc1.now(), zero);
1070        assert_eq!(acc2.now(), zero);
1071    }
1072}