vmcore/
vmtime.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Support for VM time.
5//!
6//! This is a VM-specific timeline, which monotonically increases when and only
7//! when the VM is running. This module provides types used to access this time
8//! and to wait for it to reach target times. This can be used in device
9//! emulators to implement VM timers.
10//!
11//! This is related to the idea of the hypervisor reference time, but it is not
12//! guaranteed to be the same value (and is likely not, except when the
13//! hypervisor reference time is emulated using VM time).
14//!
15//! The root of VM time keeping is the [`VmTimeKeeper`]. It manages a clock that
16//! can be shared via use of [`VmTimeAccess`] objects. Internally, this clock is
17//! based on an offset from the OS's monotonic clock while the VM is running, and
18//! a fixed time when the VM is not running.
19//!
20//! The infrastructure here supports access of VM time across multiple processes
21//! in the same OS (but not across machines, virtual or physical). See the
22//! comments on [`VmTimeSourceBuilder`] for more information.
23
24#![warn(missing_docs)]
25
26use futures::StreamExt;
27use futures::future::join_all;
28use futures_concurrency::future::Race;
29use futures_concurrency::stream::Merge;
30use inspect::Inspect;
31use inspect::InspectMut;
32use inspect::adhoc;
33use mesh::MeshPayload;
34use mesh::payload::Protobuf;
35use mesh::rpc::Rpc;
36use mesh::rpc::RpcSend;
37use pal_async::driver::Driver;
38use pal_async::driver::PollImpl;
39use pal_async::driver::SpawnDriver;
40use pal_async::task::Task;
41use pal_async::timer::Instant;
42use pal_async::timer::PollTimer;
43use parking_lot::RwLock;
44use save_restore_derive::SavedStateRoot;
45use slab::Slab;
46use std::future::poll_fn;
47use std::sync::Arc;
48use std::task::Context;
49use std::task::Poll;
50use std::task::Waker;
51use std::time::Duration;
52use thiserror::Error;
53
54/// Roughly analogous to [`std::time::Instant`], but for VM time.
55#[derive(Debug, Copy, Clone, PartialEq, Eq, Protobuf, Inspect)]
56#[inspect(transparent)]
57#[mesh(transparent)]
58pub struct VmTime(#[inspect(hex)] u64);
59
60impl VmTime {
61    /// Converts from a time in 100ns units.
62    pub fn from_100ns(n: u64) -> Self {
63        Self(n)
64    }
65
66    /// Gets the time from VM boot (or some other origin) in 100ns units.
67    pub const fn as_100ns(&self) -> u64 {
68        self.0
69    }
70
71    /// Adds `d` to the time.
72    pub fn wrapping_add(self, d: Duration) -> Self {
73        Self((self.0 as u128).wrapping_add(d.as_nanos() / 100) as u64)
74    }
75
76    /// Returns whether `self` is before `t`.
77    ///
78    /// Note that this is a relative comparison in the 64-bit space and is not
79    /// transitive: if `a` is before `b`, and `b` is before `c`, `a` still may
80    /// be after `c`.
81    pub fn is_before(self, t: Self) -> bool {
82        let delta = self.0.wrapping_sub(t.0);
83        (delta as i64) < 0
84    }
85
86    /// Returns whether `self` is after `t`.
87    ///
88    /// See the comment about transitivity in [`Self::is_before`].
89    pub fn is_after(self, t: Self) -> bool {
90        let delta = self.0.wrapping_sub(t.0);
91        (delta as i64) > 0
92    }
93
94    /// Returns the time between `self` and `t`, returning `None` if `self` is
95    /// before `t`.
96    pub fn checked_sub(self, t: Self) -> Option<Duration> {
97        let delta = self.0.wrapping_sub(t.0);
98        if (delta as i64) >= 0 {
99            Some(duration_from_100ns(delta))
100        } else {
101            None
102        }
103    }
104
105    /// Returns `self` or `t`, whichever is earlier.
106    pub fn min(self, t: Self) -> Self {
107        if self.is_before(t) { self } else { t }
108    }
109
110    /// Returns `self` or `t`, whichever is later.
111    pub fn max(self, t: Self) -> Self {
112        if self.is_before(t) { t } else { self }
113    }
114}
115
116fn duration_from_100ns(n: u64) -> Duration {
117    const NUM_100NS_IN_SEC: u64 = 10 * 1000 * 1000;
118    Duration::new(n / NUM_100NS_IN_SEC, (n % NUM_100NS_IN_SEC) as u32 * 100)
119}
120
121#[derive(Inspect)]
122struct TimerState {
123    time: TimeState,
124    #[inspect(skip)]
125    timer: PollImpl<dyn PollTimer>,
126    #[inspect(with = "|x| inspect::iter_by_key(x.iter().map(|(_, w)| (&w.name, w)))")]
127    waiters: Slab<WaiterState>,
128    next: Option<VmTime>,
129    last: VmTime,
130}
131
132#[derive(Debug, Inspect)]
133struct WaiterState {
134    #[inspect(skip)] // used as a key
135    name: Arc<str>,
136    next: Option<VmTime>,
137    #[inspect(rename = "waiting", with = "Option::is_some")]
138    waker: Option<Waker>,
139}
140
141impl WaiterState {
142    fn new(name: Arc<str>) -> Self {
143        Self {
144            name,
145            next: None,
146            waker: None,
147        }
148    }
149}
150
151#[derive(Copy, Clone, Debug, Protobuf)]
152struct Timestamp {
153    vmtime: VmTime,
154    os_time: u64, // Instant::as_nanos()
155}
156
157impl Timestamp {
158    fn new(vmtime: VmTime, os_time: Instant) -> Self {
159        Self {
160            vmtime,
161            os_time: os_time.as_nanos(),
162        }
163    }
164
165    fn os_time(&self) -> Instant {
166        Instant::from_nanos(self.os_time)
167    }
168}
169
170impl TimerState {
171    fn new(driver: &impl Driver, uptime: VmTime) -> Self {
172        Self {
173            time: TimeState::Stopped(uptime),
174            timer: driver.new_dyn_timer(),
175            waiters: Slab::new(),
176            next: None,
177            last: uptime,
178        }
179    }
180
181    /// Starts the timer.
182    fn start(&mut self, now: Timestamp) {
183        let vmtime = self.time.stop_time().expect("should be stopped");
184        assert_eq!(now.vmtime, vmtime);
185        self.time = TimeState::Started(now);
186        tracing::trace!(?now, "vmtime start");
187        self.wake(now);
188    }
189
190    /// Stops the timer.
191    fn stop(&mut self, now_os: Instant) -> VmTime {
192        assert!(self.time.is_started());
193        let now = self.now(now_os);
194        self.time = TimeState::Stopped(now.vmtime);
195        tracing::debug!(?now, "vmtime stop");
196        now.vmtime
197    }
198
199    /// Resets the current time to `time`.
200    fn reset(&mut self, time: VmTime) {
201        assert!(!self.time.is_started());
202        self.time = TimeState::Stopped(time);
203        self.last = time;
204        self.next = None;
205        // Wake all the wakers to re-evaluate things.
206        for (_, waiter) in &mut self.waiters {
207            if let Some(waker) = waiter.waker.take() {
208                waker.wake();
209            }
210        }
211    }
212
213    /// Returns the timestamp corresponding to the given VM time.
214    ///
215    /// If the VM time is before the last start time, then the timestamp is at
216    /// the host time when the VM last started.
217    fn timestamp(&self, time: VmTime) -> Option<Timestamp> {
218        let start_time = self.time.start_time()?;
219        let since = time
220            .checked_sub(start_time.vmtime)
221            .unwrap_or(Duration::ZERO);
222        Some(Timestamp::new(time, start_time.os_time() + since))
223    }
224
225    /// Returns the current guest time given a host time.
226    fn now(&self, now_os: Instant) -> Timestamp {
227        self.time.now(now_os)
228    }
229
230    fn set_next(&mut self, next: VmTime) {
231        if !self.time.is_started() {
232            return;
233        }
234        if self
235            .next
236            .is_none_or(|current_next| next.is_before(current_next))
237        {
238            let deadline = self.timestamp(next).unwrap().os_time();
239            tracing::trace!(?deadline, "updating deadline");
240            self.timer.set_deadline(deadline);
241            self.next = Some(next);
242        }
243    }
244
245    fn wake(&mut self, now: Timestamp) {
246        assert!(!now.vmtime.is_before(self.last));
247        self.last = now.vmtime;
248        let mut next = None;
249        for (_, state) in &mut self.waiters {
250            if let Some(this_next) = state.next {
251                if this_next.is_after(now.vmtime) {
252                    if next.is_none_or(|next| this_next.is_before(next)) {
253                        next = Some(this_next);
254                    }
255                } else if let Some(waker) = state.waker.take() {
256                    waker.wake();
257                }
258            }
259        }
260        if let Some(next) = next {
261            self.set_next(next);
262        }
263    }
264
265    fn cancel_timeout(&mut self, index: usize) {
266        self.waiters[index].next = None;
267    }
268
269    /// Updates the next timeout for an individual waiter.
270    fn update_timeout(&mut self, index: usize, time: VmTime) {
271        let state = &mut self.waiters[index];
272        tracing::trace!(vmtime = ?time, user = state.name.as_ref(), "timeout update");
273        state.next = Some(time);
274        if time.is_before(self.last) {
275            // The wake time is even before the last timer wake, so just wake
276            // the waiter and skip updating the timer.
277            if let Some(waker) = state.waker.take() {
278                waker.wake();
279            }
280            return;
281        }
282
283        // Update the timer if needed.
284        if self.next.is_some_and(|next| next.is_before(time)) {
285            return;
286        }
287        self.set_next(time);
288    }
289
290    /// Polls a single waiter.
291    fn poll_timeout(
292        &mut self,
293        cx: &mut Context<'_>,
294        index: usize,
295        now_os: Instant,
296        next: Option<VmTime>,
297    ) -> Poll<Timestamp> {
298        let now = self.now(now_os);
299        let state = &mut self.waiters[index];
300        if next.is_some_and(|next| next.is_before(now.vmtime)) {
301            state.waker = None;
302            state.next = None;
303            return Poll::Ready(now);
304        }
305        state.next = next;
306        state.waker = Some(cx.waker().clone());
307        if let Some(next) = next {
308            self.set_next(next);
309        }
310        Poll::Pending
311    }
312
313    /// Polls the timer, waking any waiters whose timeout has expired.
314    fn poll(&mut self, cx: &mut Context<'_>) {
315        while self.time.is_started() {
316            let next = match self.next {
317                Some(_) => {
318                    // The timer's deadline is already set.
319                    tracing::trace!("polling existing deadline");
320                    None
321                }
322                None => {
323                    // Set the timer far out in the future.
324                    let deadline = Instant::now() + Duration::from_secs(86400);
325                    tracing::trace!(?deadline, "polling with long timeout");
326                    Some(deadline)
327                }
328            };
329            if let Poll::Ready(now) = self.timer.poll_timer(cx, next) {
330                self.next = None;
331                self.wake(self.now(now));
332            } else {
333                return;
334            }
335        }
336    }
337}
338
339/// A time keeper, which tracks the current time and all waiters.
340#[derive(Debug)]
341pub struct VmTimeKeeper {
342    _task: Task<()>,
343    req_send: mesh::Sender<KeeperRequest>,
344    builder: VmTimeSourceBuilder,
345    time: TimeState,
346}
347
348/// Saved state for [`VmTimeKeeper`].
349#[derive(Protobuf, SavedStateRoot)]
350#[mesh(package = "vmtime")]
351pub struct SavedState {
352    #[mesh(1)]
353    vmtime: VmTime,
354}
355
356impl SavedState {
357    /// Create a new instance of `SavedState` from an existing `VmTime`.
358    pub fn from_vmtime(vmtime: VmTime) -> Self {
359        SavedState { vmtime }
360    }
361}
362
363#[derive(Debug, MeshPayload, Copy, Clone)]
364enum TimeState {
365    Stopped(VmTime),
366    Started(Timestamp),
367}
368
369impl Inspect for TimeState {
370    fn inspect(&self, req: inspect::Request<'_>) {
371        let mut resp = req.respond();
372        let state = match *self {
373            TimeState::Stopped(_time) => "stopped",
374            TimeState::Started(time) => {
375                resp.field("start_time", time.vmtime);
376                "started"
377            }
378        };
379        resp.field("state", state)
380            .field("now", self.now(Instant::now()).vmtime);
381    }
382}
383
384impl TimeState {
385    fn is_started(&self) -> bool {
386        self.start_time().is_some()
387    }
388
389    fn stop_time(&self) -> Option<VmTime> {
390        match *self {
391            TimeState::Stopped(time) => Some(time),
392            TimeState::Started(_) => None,
393        }
394    }
395
396    fn start_time(&self) -> Option<Timestamp> {
397        match *self {
398            TimeState::Stopped(_) => None,
399            TimeState::Started(time) => Some(time),
400        }
401    }
402
403    fn now(&self, now_os: Instant) -> Timestamp {
404        match *self {
405            TimeState::Stopped(time) => Timestamp::new(time, now_os),
406            TimeState::Started(start_time) => {
407                if now_os >= start_time.os_time() {
408                    Timestamp::new(
409                        start_time
410                            .vmtime
411                            .wrapping_add(now_os - start_time.os_time()),
412                        now_os,
413                    )
414                } else {
415                    // `now` can be before `running.start_host` if it was captured
416                    // outside the lock and raced with the call to `start()`. Treat
417                    // this as `now` being the same as `start_host`.
418                    //
419                    // But if `now` is too much before `running.start_host`, then
420                    // there is probably some serious OS timekeeping bug, or maybe
421                    // the debugger broke in at just the wrong time.
422                    let delta = start_time.os_time() - now_os;
423                    if delta > Duration::from_secs(1) {
424                        tracing::error!(
425                            now = now_os.as_nanos(),
426                            start_host = start_time.os_time().as_nanos(),
427                            ?delta,
428                            "time went backward"
429                        );
430                    }
431                    start_time
432                }
433            }
434        }
435    }
436}
437
438impl InspectMut for VmTimeKeeper {
439    fn inspect_mut(&mut self, req: inspect::Request<'_>) {
440        self.req_send.send(KeeperRequest::Inspect(req.defer()));
441    }
442}
443
444impl VmTimeKeeper {
445    /// Creates a new time keeper with the specified current guest time.
446    pub fn new(driver: &impl SpawnDriver, uptime: VmTime) -> Self {
447        let (new_send, new_recv) = mesh::mpsc_channel();
448        let (req_send, req_recv) = mesh::channel();
449        let time = TimeState::Stopped(uptime);
450        let task = driver.spawn("vm-time-keeper", async move {
451            let mut primary = PrimaryKeeper {
452                req_recv,
453                new_recv,
454                keepers: Vec::new(),
455                next_id: 0,
456                time,
457            };
458            primary.run().await;
459        });
460        Self {
461            time,
462            req_send,
463            builder: VmTimeSourceBuilder { new_send },
464            _task: task,
465        }
466    }
467
468    /// Saves the time state.
469    pub fn save(&self) -> SavedState {
470        SavedState {
471            vmtime: self.time.stop_time().expect("should be stopped"),
472        }
473    }
474
475    /// Restores the time state.
476    pub async fn restore(&mut self, state: SavedState) {
477        let SavedState { vmtime } = state;
478        self.reset_to(vmtime).await
479    }
480
481    async fn reset_to(&mut self, vmtime: VmTime) {
482        assert!(!self.time.is_started(), "should be stopped");
483        self.time = TimeState::Stopped(vmtime);
484        self.req_send
485            .call(KeeperRequest::Reset, vmtime)
486            .await
487            .unwrap();
488    }
489
490    /// Reset the VM time to 0.
491    pub async fn reset(&mut self) {
492        self.reset_to(VmTime::from_100ns(0)).await
493    }
494
495    /// Starts the timer, so that the current time will increase.
496    pub async fn start(&mut self) {
497        let vmtime = self.time.stop_time().expect("should be stopped");
498        let timestamp = Timestamp::new(vmtime, Instant::now());
499        self.time = TimeState::Started(timestamp);
500        self.req_send
501            .call(KeeperRequest::Start, timestamp)
502            .await
503            .unwrap();
504    }
505
506    /// Stops the timer, so that the current time will stop increasing.
507    pub async fn stop(&mut self) {
508        assert!(self.time.is_started(), "should be running");
509        let stop_time = self.req_send.call(KeeperRequest::Stop, ()).await.unwrap();
510        self.time = TimeState::Stopped(stop_time);
511    }
512
513    /// Returns a time source builder, which can be used to spawn tasks that
514    /// back [`VmTimeSource`] instances, all backed by this time keeper's clock.
515    pub fn builder(&self) -> &VmTimeSourceBuilder {
516        &self.builder
517    }
518}
519
520/// A time source builder, used to spawn tasks that back [`VmTimeSource`]
521/// instances.
522///
523/// Note that this can be sent across processes via `mesh`.
524///
525/// However, the time keeping infrastructure assumes that all time keeping tasks
526/// share a single global monotonic OS clock. This means that if you send this
527/// across a VM/kernel/network boundary, the resulting time sources will not be
528/// in sync with each other.
529#[derive(MeshPayload, Clone, Debug)]
530pub struct VmTimeSourceBuilder {
531    new_send: mesh::Sender<NewKeeperRequest>,
532}
533
534/// Error returned by [`VmTimeSourceBuilder::build`] when the time keeper has
535/// been torn down.
536#[derive(Debug, Error)]
537#[error("the time keeper has been torn down")]
538pub struct TimeKeeperIsGone;
539
540impl VmTimeSourceBuilder {
541    /// Builds and spawns a backing task for [`VmTimeSource`]s. All
542    /// [`VmTimeSource`] instances cloned from the first one will share a
543    /// backing task.
544    pub async fn build(&self, driver: &impl SpawnDriver) -> Result<VmTimeSource, TimeKeeperIsGone> {
545        let (send, recv) = mesh::channel();
546        let time = self
547            .new_send
548            .call(NewKeeperRequest::New, send)
549            .await
550            .map_err(|_| TimeKeeperIsGone)?;
551
552        let mut state = Arc::new(RwLock::new(TimerState::new(driver, VmTime::from_100ns(0))));
553        // Synchronize the time.
554        {
555            let state = Arc::get_mut(&mut state).unwrap().get_mut();
556            match time {
557                TimeState::Stopped(vmtime) => state.reset(vmtime),
558                TimeState::Started(timestamp) => state.start(timestamp),
559            }
560        }
561        let mut keeper = SecondaryKeeper {
562            state: state.clone(),
563            recv,
564        };
565        driver
566            .spawn("vm-time", async move { keeper.run().await })
567            .detach();
568        Ok(VmTimeSource {
569            state,
570            remote: self.clone(),
571        })
572    }
573}
574
575/// Task that stores the current time state and manages the list of secondary
576/// keepers.
577///
578/// There is one of these per VM time clock (i.e. one per VM).
579#[derive(Inspect)]
580#[inspect(extra = "Self::inspect_extra")]
581struct PrimaryKeeper {
582    #[inspect(skip)]
583    req_recv: mesh::Receiver<KeeperRequest>,
584    #[inspect(skip)]
585    new_recv: mesh::Receiver<NewKeeperRequest>,
586    #[inspect(skip)]
587    keepers: Vec<(u64, mesh::Sender<KeeperRequest>)>,
588    #[inspect(skip)]
589    next_id: u64,
590    time: TimeState,
591}
592
593#[derive(MeshPayload)]
594enum KeeperRequest {
595    Start(Rpc<Timestamp, ()>),
596    Stop(Rpc<(), VmTime>),
597    Reset(Rpc<VmTime, ()>),
598    Inspect(inspect::Deferred),
599}
600
601#[derive(MeshPayload)]
602enum NewKeeperRequest {
603    New(Rpc<mesh::Sender<KeeperRequest>, TimeState>),
604}
605
606impl PrimaryKeeper {
607    fn inspect_extra(&self, resp: &mut inspect::Response<'_>) {
608        resp.fields(
609            "keepers",
610            self.keepers
611                .iter()
612                .map(|&(id, ref s)| (id, adhoc(|req| s.send(KeeperRequest::Inspect(req.defer()))))),
613        );
614    }
615
616    async fn run(&mut self) {
617        enum Event {
618            New(NewKeeperRequest),
619            Request(KeeperRequest),
620        }
621
622        while let Some(event) = (
623            (&mut self.new_recv).map(Event::New),
624            (&mut self.req_recv).map(Event::Request),
625        )
626            .merge()
627            .next()
628            .await
629        {
630            // Garbage collect the existing keepers.
631            self.keepers.retain(|(_, s)| !s.is_closed());
632            match event {
633                Event::New(req) => match req {
634                    NewKeeperRequest::New(rpc) => rpc.handle_sync(|sender| {
635                        self.keepers.push((self.next_id, sender));
636                        self.next_id += 1;
637                        self.time
638                    }),
639                },
640                Event::Request(req) => {
641                    match req {
642                        KeeperRequest::Start(rpc) => {
643                            rpc.handle(async |start_time| {
644                                assert!(!self.time.is_started());
645                                self.time = TimeState::Started(start_time);
646                                join_all(self.keepers.iter().map(|(_, sender)| {
647                                    sender.call(KeeperRequest::Start, start_time)
648                                }))
649                                .await;
650                            })
651                            .await
652                        }
653                        KeeperRequest::Stop(rpc) => {
654                            rpc.handle(async |()| {
655                                let results = join_all(
656                                    self.keepers
657                                        .iter()
658                                        .map(|(_, sender)| sender.call(KeeperRequest::Stop, ())),
659                                )
660                                .await;
661
662                                let start_time = self.time.start_time().expect("should be running");
663                                let now = start_time
664                                    .vmtime
665                                    .wrapping_add(Instant::now() - start_time.os_time());
666
667                                // Compute the stop time as the max of all stop
668                                // times so that no keeper goes backwards next
669                                // start.
670                                let stop_time = results
671                                    .into_iter()
672                                    .filter_map(|r| r.ok())
673                                    .fold(now, |a, b| a.max(b));
674
675                                self.time = TimeState::Stopped(stop_time);
676
677                                // Update all the keepers with the stop time so that
678                                // it's consistent.
679                                join_all(self.keepers.iter().map(|(_, sender)| {
680                                    sender.call(KeeperRequest::Reset, stop_time)
681                                }))
682                                .await;
683
684                                stop_time
685                            })
686                            .await
687                        }
688                        KeeperRequest::Reset(rpc) => {
689                            rpc.handle(async |time| {
690                                assert!(!self.time.is_started(), "should not be running");
691                                self.time = TimeState::Stopped(time);
692                                join_all(
693                                    self.keepers
694                                        .iter()
695                                        .map(|(_, sender)| sender.call(KeeperRequest::Reset, time)),
696                                )
697                                .await;
698                            })
699                            .await
700                        }
701                        KeeperRequest::Inspect(deferred) => deferred.inspect(&self),
702                    }
703                }
704            }
705        }
706    }
707}
708
709/// Task that provides access to the current VM time.
710///
711/// There can be multiple of these per VM, across multiple processes. They are
712/// all backed by the same clock and report the same time.
713#[derive(InspectMut)]
714struct SecondaryKeeper {
715    #[inspect(flatten)]
716    state: Arc<RwLock<TimerState>>,
717    #[inspect(skip)]
718    recv: mesh::Receiver<KeeperRequest>,
719}
720
721impl SecondaryKeeper {
722    async fn run(&mut self) {
723        loop {
724            let r = {
725                let state = &self.state;
726                (
727                    self.recv.next(),
728                    poll_fn(|cx| {
729                        state.write().poll(cx);
730                        Poll::Pending
731                    }),
732                )
733                    .race()
734                    .await
735            };
736            match r {
737                Some(req) => match req {
738                    KeeperRequest::Start(rpc) => rpc.handle_sync(|start_time| {
739                        let mut state = self.state.write();
740                        state.start(start_time);
741                    }),
742                    KeeperRequest::Reset(rpc) => rpc.handle_sync(|vmtime| {
743                        let mut state = self.state.write();
744                        state.reset(vmtime);
745                    }),
746                    KeeperRequest::Stop(rpc) => rpc.handle_sync(|()| {
747                        let mut state = self.state.write();
748                        state.stop(Instant::now())
749                    }),
750                    KeeperRequest::Inspect(deferred) => deferred.inspect(&mut *self),
751                },
752                None => break,
753            }
754        }
755    }
756}
757
758/// A time source, used to instantiate [`VmTimeAccess`].
759#[derive(Clone)]
760pub struct VmTimeSource {
761    state: Arc<RwLock<TimerState>>,
762    remote: VmTimeSourceBuilder,
763}
764
765impl VmTimeSource {
766    /// Gets a time accessor.
767    ///
768    /// `name` is used for diagnostics via `inspect`.
769    pub fn access(&self, name: impl Into<Arc<str>>) -> VmTimeAccess {
770        let name = name.into();
771        VmTimeAccess {
772            timeout: None,
773            waiting: false,
774            index: self
775                .state
776                .write()
777                .waiters
778                .insert(WaiterState::new(name.clone())),
779            state: self.state.clone(),
780            name,
781        }
782    }
783
784    /// Gets the builder for creating additional time sources backing tasks
785    /// whose times are in sync with this one.
786    pub fn builder(&self) -> &VmTimeSourceBuilder {
787        &self.remote
788    }
789}
790
791/// An individual time accessor, used to query and wait for time.
792#[derive(Inspect)]
793pub struct VmTimeAccess {
794    timeout: Option<VmTime>,
795    waiting: bool,
796    #[inspect(skip)]
797    index: usize,
798    #[inspect(skip)]
799    state: Arc<RwLock<TimerState>>,
800    name: Arc<str>,
801}
802
803impl Drop for VmTimeAccess {
804    fn drop(&mut self) {
805        self.state.write().waiters.remove(self.index);
806    }
807}
808
809impl VmTimeAccess {
810    /// Gets the current time.
811    pub fn now(&self) -> VmTime {
812        let now = Instant::now();
813        self.state.read().now(now).vmtime
814    }
815
816    /// Returns the host time corresponding to a guest time.
817    ///
818    /// If the guest time is before the VM last resumed, then returns the time
819    /// the VM last resumed.
820    ///
821    /// If the VM is not running, returns `None`.
822    pub fn host_time(&self, time: VmTime) -> Option<Instant> {
823        Some(self.state.read().timestamp(time)?.os_time())
824    }
825
826    /// Get the currently set timeout.
827    pub fn get_timeout(&self) -> Option<VmTime> {
828        self.timeout
829    }
830
831    /// Sets the timeout [`poll_timeout`](Self::poll_timeout) will return ready.
832    pub fn set_timeout(&mut self, time: VmTime) {
833        self.timeout = Some(time);
834        if self.waiting {
835            self.state.write().update_timeout(self.index, time);
836        }
837    }
838
839    /// Sets the timeout for [`poll_timeout`](Self::poll_timeout) will return ready,
840    /// but only if `time` is earlier than the current timeout.
841    pub fn set_timeout_if_before(&mut self, time: VmTime) {
842        if self.timeout.is_none_or(|timeout| time.is_before(timeout)) {
843            self.set_timeout(time);
844        }
845    }
846
847    /// Clears the current timeout for [`poll_timeout`](Self::poll_timeout).
848    pub fn cancel_timeout(&mut self) {
849        if self.waiting && self.timeout.is_some() {
850            self.state.write().cancel_timeout(self.index);
851        }
852        self.timeout = None;
853    }
854
855    /// Polls the current time against the current timeout.
856    ///
857    /// Returns `Poll::Ready(self.now())` if the current timeout is before now.
858    /// Returns `Poll::Pending` if there is no current timeout, or if the
859    /// current timeout is after now.
860    ///
861    /// Although this takes `&self`, note that it only stores a single waker,
862    /// meaning that if you poll this from multiple tasks concurrently, only one
863    /// task will be woken when the time elapses. Create another instance of
864    /// this type with [`VmTimeSource`] if you need to poll this from multiple
865    /// tasks.
866    pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
867        let now = Instant::now();
868        match self
869            .state
870            .write()
871            .poll_timeout(cx, self.index, now, self.timeout)
872        {
873            Poll::Ready(now) => {
874                self.waiting = false;
875                self.timeout = None;
876                Poll::Ready(now.vmtime)
877            }
878            Poll::Pending => {
879                self.waiting = true;
880                Poll::Pending
881            }
882        }
883    }
884}
885
886#[derive(Debug, Inspect)]
887#[inspect(tag = "state")]
888enum VmTimerPeriodicInner {
889    Stopped,
890    Running {
891        last_timeout: VmTime,
892        #[inspect(debug)]
893        period: Duration,
894    },
895}
896
897/// An abstraction over [`VmTimeAccess`] that streamlines the process of setting
898/// up a periodic timer.
899#[derive(Inspect)]
900pub struct VmTimerPeriodic {
901    vmtime: VmTimeAccess,
902    inner: VmTimerPeriodicInner,
903}
904
905impl VmTimerPeriodic {
906    /// Create a new periodic timer, backed by the given [`VmTimeAccess`].
907    pub fn new(vmtime_access: VmTimeAccess) -> Self {
908        Self {
909            vmtime: vmtime_access,
910            inner: VmTimerPeriodicInner::Stopped,
911        }
912    }
913
914    /// Cancel the timer.
915    ///
916    /// If the timer isn't running, this method is a no-op.
917    pub fn cancel(&mut self) {
918        self.vmtime.cancel_timeout();
919        self.inner = VmTimerPeriodicInner::Stopped;
920    }
921
922    /// Start the timer, configuring it to fire at the specified period.
923    ///
924    /// If the timer is currently running, the timer will be cancelled +
925    /// restarted.
926    pub fn start(&mut self, period: Duration) {
927        self.cancel();
928
929        let time = self.vmtime.now().wrapping_add(period);
930        self.vmtime.set_timeout(time);
931        self.inner = VmTimerPeriodicInner::Running {
932            last_timeout: time,
933            period,
934        }
935    }
936
937    /// Check if the timer is currently running.
938    pub fn is_running(&self) -> bool {
939        matches!(self.inner, VmTimerPeriodicInner::Running { .. })
940    }
941
942    /// Polls the timer.
943    ///
944    /// Returns `Poll::Ready(now)` when the timer is past-due, returning
945    /// `Poll::Pending` otherwise.
946    pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
947        match self.inner {
948            VmTimerPeriodicInner::Stopped => {
949                assert_eq!(self.vmtime.get_timeout(), None);
950                // Make sure the waker is still managed properly
951                // This is guaranteed to return Pending according to its documentation thanks to the above assert.
952                self.vmtime.poll_timeout(cx)
953            }
954            VmTimerPeriodicInner::Running {
955                ref mut last_timeout,
956                period,
957            } => {
958                let mut res = Poll::Pending;
959                while let Poll::Ready(now) = self.vmtime.poll_timeout(cx) {
960                    res = Poll::Ready(now);
961
962                    let time = last_timeout.wrapping_add(period);
963                    self.vmtime.set_timeout(time);
964                    *last_timeout = time;
965                }
966                res
967            }
968        }
969    }
970}
971
972#[cfg(test)]
973mod tests {
974    use super::VmTime;
975    use super::VmTimeKeeper;
976    use futures::FutureExt;
977    use pal_async::DefaultDriver;
978    use pal_async::async_test;
979    use pal_async::timer::PolledTimer;
980    use std::future::poll_fn;
981    use std::time::Duration;
982
983    #[async_test]
984    async fn test_vmtime(driver: DefaultDriver) {
985        let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
986        let mut access = keeper
987            .builder()
988            .build(&driver)
989            .await
990            .unwrap()
991            .access("test");
992        keeper.start().await;
993
994        // Test long timeout.
995        access.set_timeout(access.now().wrapping_add(Duration::from_secs(1000)));
996        let mut timer = PolledTimer::new(&driver);
997        futures::select! {
998            _ = timer.sleep(Duration::from_millis(50)).fuse() => {}
999            _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
1000        }
1001
1002        // Test short timeout.
1003        let deadline = access.now().wrapping_add(Duration::from_millis(10));
1004        access.set_timeout(deadline);
1005        futures::select! {
1006            _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1007            now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1008                assert!(now.is_after(deadline));
1009            }
1010        }
1011        // Timeout should be cleared by the successful poll.
1012        assert!(
1013            poll_fn(|cx| access.poll_timeout(cx))
1014                .now_or_never()
1015                .is_none()
1016        );
1017
1018        // Test changing timeout.
1019        let now = access.now();
1020        let deadline = now.wrapping_add(Duration::from_millis(2000));
1021        access.set_timeout(deadline);
1022        futures::select! {
1023            _ = timer.sleep(Duration::from_millis(30)).fuse() => {
1024                let deadline = now.wrapping_add(Duration::from_millis(50));
1025                access.set_timeout(deadline);
1026                futures::select! {
1027                    _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1028                    now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1029                        assert!(now.is_after(deadline));
1030                    }
1031                }
1032            }
1033            _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
1034        }
1035        keeper.stop().await;
1036    }
1037
1038    #[async_test]
1039    async fn test_multi_vmtime(driver: DefaultDriver) {
1040        let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
1041        let src1 = keeper.builder().build(&driver).await.unwrap();
1042        keeper.start().await;
1043        let src2 = src1.builder().build(&driver).await.unwrap();
1044        let acc1 = src1.access("test");
1045        let acc2 = src2.access("test");
1046        {
1047            let t1 = acc1.now();
1048            let t2 = acc2.now();
1049            let t3 = acc1.now();
1050            assert!(!t2.is_before(t1), "{t1:?} {t2:?}");
1051            assert!(!t3.is_before(t2), "{t2:?} {t3:?}");
1052        }
1053        let now = acc1.now();
1054        keeper.stop().await;
1055        let t1 = acc1.now();
1056        let t2 = acc2.now();
1057        assert!(!t1.is_before(now));
1058        assert_eq!(t1, t2);
1059        let zero = VmTime::from_100ns(0);
1060        // Even on very fast machines, at least _some_ time will have advanced.
1061        assert_ne!(t1, zero);
1062        keeper.reset().await;
1063        assert_eq!(acc1.now(), zero);
1064        assert_eq!(acc2.now(), zero);
1065    }
1066}