1pub use saved_state::SavedState;
25
26use futures::StreamExt;
27use futures::future::join_all;
28use futures_concurrency::future::Race;
29use futures_concurrency::stream::Merge;
30use inspect::Inspect;
31use inspect::InspectMut;
32use mesh::MeshPayload;
33use mesh::payload::Protobuf;
34use mesh::rpc::Rpc;
35use mesh::rpc::RpcSend;
36use pal_async::driver::Driver;
37use pal_async::driver::PollImpl;
38use pal_async::driver::SpawnDriver;
39use pal_async::task::Task;
40use pal_async::timer::Instant;
41use pal_async::timer::PollTimer;
42use parking_lot::RwLock;
43use save_restore_derive::SavedStateRoot;
44use slab::Slab;
45use std::future::poll_fn;
46use std::sync::Arc;
47use std::task::Context;
48use std::task::Poll;
49use std::task::Waker;
50use std::time::Duration;
51use thiserror::Error;
52
53#[derive(Debug, Copy, Clone, PartialEq, Eq, Protobuf, Inspect)]
55#[inspect(transparent)]
56#[mesh(transparent)]
57pub struct VmTime(#[inspect(hex)] u64);
58
59impl VmTime {
60 pub fn from_100ns(n: u64) -> Self {
62 Self(n)
63 }
64
65 pub const fn as_100ns(&self) -> u64 {
67 self.0
68 }
69
70 pub fn wrapping_add(self, d: Duration) -> Self {
72 Self((self.0 as u128).wrapping_add(d.as_nanos() / 100) as u64)
73 }
74
75 pub fn is_before(self, t: Self) -> bool {
81 let delta = self.0.wrapping_sub(t.0);
82 (delta as i64) < 0
83 }
84
85 pub fn is_after(self, t: Self) -> bool {
89 let delta = self.0.wrapping_sub(t.0);
90 (delta as i64) > 0
91 }
92
93 pub fn checked_sub(self, t: Self) -> Option<Duration> {
96 let delta = self.0.wrapping_sub(t.0);
97 if (delta as i64) >= 0 {
98 Some(duration_from_100ns(delta))
99 } else {
100 None
101 }
102 }
103
104 pub fn min(self, t: Self) -> Self {
106 if self.is_before(t) { self } else { t }
107 }
108
109 pub fn max(self, t: Self) -> Self {
111 if self.is_before(t) { t } else { self }
112 }
113}
114
115fn duration_from_100ns(n: u64) -> Duration {
116 const NUM_100NS_IN_SEC: u64 = 10 * 1000 * 1000;
117 Duration::new(n / NUM_100NS_IN_SEC, (n % NUM_100NS_IN_SEC) as u32 * 100)
118}
119
120#[derive(Inspect)]
121struct TimerState {
122 time: TimeState,
123 #[inspect(skip)]
124 timer: PollImpl<dyn PollTimer>,
125 #[inspect(with = "|x| inspect::iter_by_key(x.iter().map(|(_, w)| (&w.name, w)))")]
126 waiters: Slab<WaiterState>,
127 next: Option<VmTime>,
128 last: VmTime,
129}
130
131#[derive(Debug, Inspect)]
132struct WaiterState {
133 #[inspect(skip)] name: Arc<str>,
135 next: Option<VmTime>,
136 #[inspect(rename = "waiting", with = "Option::is_some")]
137 waker: Option<Waker>,
138}
139
140impl WaiterState {
141 fn new(name: Arc<str>) -> Self {
142 Self {
143 name,
144 next: None,
145 waker: None,
146 }
147 }
148}
149
150#[derive(Copy, Clone, Debug, Protobuf)]
151struct Timestamp {
152 vmtime: VmTime,
153 os_time: u64, }
155
156impl Timestamp {
157 fn new(vmtime: VmTime, os_time: Instant) -> Self {
158 Self {
159 vmtime,
160 os_time: os_time.as_nanos(),
161 }
162 }
163
164 fn os_time(&self) -> Instant {
165 Instant::from_nanos(self.os_time)
166 }
167}
168
169impl TimerState {
170 fn new(driver: &impl Driver, uptime: VmTime) -> Self {
171 Self {
172 time: TimeState::Stopped(uptime),
173 timer: driver.new_dyn_timer(),
174 waiters: Slab::new(),
175 next: None,
176 last: uptime,
177 }
178 }
179
180 fn start(&mut self, now: Timestamp) {
182 let vmtime = self.time.stop_time().expect("should be stopped");
183 assert_eq!(now.vmtime, vmtime);
184 self.time = TimeState::Started(now);
185 tracing::trace!(?now, "vmtime start");
186 self.wake(now);
187 }
188
189 fn stop(&mut self, now_os: Instant) -> VmTime {
191 assert!(self.time.is_started());
192 let now = self.now(now_os);
193 self.time = TimeState::Stopped(now.vmtime);
194 tracing::debug!(?now, "vmtime stop");
195 now.vmtime
196 }
197
198 fn reset(&mut self, time: VmTime) {
200 assert!(!self.time.is_started());
201 self.time = TimeState::Stopped(time);
202 self.last = time;
203 self.next = None;
204 for (_, waiter) in &mut self.waiters {
206 if let Some(waker) = waiter.waker.take() {
207 waker.wake();
208 }
209 }
210 }
211
212 fn timestamp(&self, time: VmTime) -> Option<Timestamp> {
217 let start_time = self.time.start_time()?;
218 let since = time
219 .checked_sub(start_time.vmtime)
220 .unwrap_or(Duration::ZERO);
221 Some(Timestamp::new(time, start_time.os_time() + since))
222 }
223
224 fn now(&self, now_os: Instant) -> Timestamp {
226 self.time.now(now_os)
227 }
228
229 fn set_next(&mut self, next: VmTime) {
230 if !self.time.is_started() {
231 return;
232 }
233 if self
234 .next
235 .is_none_or(|current_next| next.is_before(current_next))
236 {
237 let deadline = self.timestamp(next).unwrap().os_time();
238 tracing::trace!(?deadline, "updating deadline");
239 self.timer.set_deadline(deadline);
240 self.next = Some(next);
241 }
242 }
243
244 fn wake(&mut self, now: Timestamp) {
245 assert!(!now.vmtime.is_before(self.last));
246 self.last = now.vmtime;
247 let mut next = None;
248 for (_, state) in &mut self.waiters {
249 if let Some(this_next) = state.next {
250 if this_next.is_after(now.vmtime) {
251 if next.is_none_or(|next| this_next.is_before(next)) {
252 next = Some(this_next);
253 }
254 } else if let Some(waker) = state.waker.take() {
255 waker.wake();
256 }
257 }
258 }
259 if let Some(next) = next {
260 self.set_next(next);
261 }
262 }
263
264 fn cancel_timeout(&mut self, index: usize) {
265 self.waiters[index].next = None;
266 }
267
268 fn update_timeout(&mut self, index: usize, time: VmTime) {
270 let state = &mut self.waiters[index];
271 tracing::trace!(vmtime = ?time, user = state.name.as_ref(), "timeout update");
272 state.next = Some(time);
273 if time.is_before(self.last) {
274 if let Some(waker) = state.waker.take() {
277 waker.wake();
278 }
279 return;
280 }
281
282 if self.next.is_some_and(|next| next.is_before(time)) {
284 return;
285 }
286 self.set_next(time);
287 }
288
289 fn poll_timeout(
291 &mut self,
292 cx: &mut Context<'_>,
293 index: usize,
294 now_os: Instant,
295 next: Option<VmTime>,
296 ) -> Poll<Timestamp> {
297 let now = self.now(now_os);
298 let state = &mut self.waiters[index];
299 if next.is_some_and(|next| next.is_before(now.vmtime)) {
300 state.waker = None;
301 state.next = None;
302 return Poll::Ready(now);
303 }
304 state.next = next;
305 state.waker = Some(cx.waker().clone());
306 if let Some(next) = next {
307 self.set_next(next);
308 }
309 Poll::Pending
310 }
311
312 fn poll(&mut self, cx: &mut Context<'_>) {
314 while self.time.is_started() {
315 let next = match self.next {
316 Some(_) => {
317 tracing::trace!("polling existing deadline");
319 None
320 }
321 None => {
322 let deadline = Instant::now() + Duration::from_secs(86400);
324 tracing::trace!(?deadline, "polling with long timeout");
325 Some(deadline)
326 }
327 };
328 if let Poll::Ready(now) = self.timer.poll_timer(cx, next) {
329 self.next = None;
330 self.wake(self.now(now));
331 } else {
332 return;
333 }
334 }
335 }
336}
337
338#[derive(Debug, InspectMut)]
340pub struct VmTimeKeeper {
341 #[inspect(skip)]
342 _task: Task<()>,
343 #[inspect(flatten, send = "KeeperRequest::Inspect")]
344 req_send: mesh::Sender<KeeperRequest>,
345 #[inspect(skip)]
346 builder: VmTimeSourceBuilder,
347 #[inspect(skip)]
348 time: TimeState,
349}
350
351#[expect(unsafe_code)]
353mod saved_state {
354 use super::*;
355
356 #[derive(Protobuf, SavedStateRoot)]
358 #[mesh(package = "vmtime")]
359 pub struct SavedState {
360 #[mesh(1)]
361 pub(super) vmtime: VmTime,
362 }
363
364 impl SavedState {
365 pub fn from_vmtime(vmtime: VmTime) -> Self {
367 SavedState { vmtime }
368 }
369 }
370}
371
372#[derive(Debug, MeshPayload, Copy, Clone)]
373enum TimeState {
374 Stopped(VmTime),
375 Started(Timestamp),
376}
377
378impl Inspect for TimeState {
379 fn inspect(&self, req: inspect::Request<'_>) {
380 let mut resp = req.respond();
381 let state = match *self {
382 TimeState::Stopped(_time) => "stopped",
383 TimeState::Started(time) => {
384 resp.field("start_time", time.vmtime);
385 "started"
386 }
387 };
388 resp.field("state", state)
389 .field("now", self.now(Instant::now()).vmtime);
390 }
391}
392
393impl TimeState {
394 fn is_started(&self) -> bool {
395 self.start_time().is_some()
396 }
397
398 fn stop_time(&self) -> Option<VmTime> {
399 match *self {
400 TimeState::Stopped(time) => Some(time),
401 TimeState::Started(_) => None,
402 }
403 }
404
405 fn start_time(&self) -> Option<Timestamp> {
406 match *self {
407 TimeState::Stopped(_) => None,
408 TimeState::Started(time) => Some(time),
409 }
410 }
411
412 fn now(&self, now_os: Instant) -> Timestamp {
413 match *self {
414 TimeState::Stopped(time) => Timestamp::new(time, now_os),
415 TimeState::Started(start_time) => {
416 if now_os >= start_time.os_time() {
417 Timestamp::new(
418 start_time
419 .vmtime
420 .wrapping_add(now_os - start_time.os_time()),
421 now_os,
422 )
423 } else {
424 let delta = start_time.os_time() - now_os;
432 if delta > Duration::from_secs(1) {
433 tracing::error!(
434 now = now_os.as_nanos(),
435 start_host = start_time.os_time().as_nanos(),
436 ?delta,
437 "time went backward"
438 );
439 }
440 start_time
441 }
442 }
443 }
444 }
445}
446
447impl VmTimeKeeper {
448 pub fn new(driver: &impl SpawnDriver, uptime: VmTime) -> Self {
450 let (new_send, new_recv) = mesh::mpsc_channel();
451 let (req_send, req_recv) = mesh::channel();
452 let time = TimeState::Stopped(uptime);
453 let task = driver.spawn("vm-time-keeper", async move {
454 let mut primary = PrimaryKeeper {
455 req_recv,
456 new_recv,
457 keepers: Vec::new(),
458 next_id: 0,
459 time,
460 };
461 primary.run().await;
462 });
463 Self {
464 time,
465 req_send,
466 builder: VmTimeSourceBuilder { new_send },
467 _task: task,
468 }
469 }
470
471 pub fn save(&self) -> SavedState {
473 SavedState {
474 vmtime: self.time.stop_time().expect("should be stopped"),
475 }
476 }
477
478 pub async fn restore(&mut self, state: SavedState) {
480 let SavedState { vmtime } = state;
481 self.reset_to(vmtime).await
482 }
483
484 async fn reset_to(&mut self, vmtime: VmTime) {
485 assert!(!self.time.is_started(), "should be stopped");
486 self.time = TimeState::Stopped(vmtime);
487 self.req_send
488 .call(KeeperRequest::Reset, vmtime)
489 .await
490 .unwrap();
491 }
492
493 pub async fn reset(&mut self) {
495 self.reset_to(VmTime::from_100ns(0)).await
496 }
497
498 pub async fn start(&mut self) {
500 let vmtime = self.time.stop_time().expect("should be stopped");
501 let timestamp = Timestamp::new(vmtime, Instant::now());
502 self.time = TimeState::Started(timestamp);
503 self.req_send
504 .call(KeeperRequest::Start, timestamp)
505 .await
506 .unwrap();
507 }
508
509 pub async fn stop(&mut self) {
511 assert!(self.time.is_started(), "should be running");
512 let stop_time = self.req_send.call(KeeperRequest::Stop, ()).await.unwrap();
513 self.time = TimeState::Stopped(stop_time);
514 }
515
516 pub fn builder(&self) -> &VmTimeSourceBuilder {
519 &self.builder
520 }
521}
522
523#[derive(MeshPayload, Clone, Debug)]
533pub struct VmTimeSourceBuilder {
534 new_send: mesh::Sender<NewKeeperRequest>,
535}
536
537#[derive(Debug, Error)]
540#[error("the time keeper has been torn down")]
541pub struct TimeKeeperIsGone;
542
543impl VmTimeSourceBuilder {
544 pub async fn build(&self, driver: &impl SpawnDriver) -> Result<VmTimeSource, TimeKeeperIsGone> {
548 let (send, recv) = mesh::channel();
549 let time = self
550 .new_send
551 .call(NewKeeperRequest::New, send)
552 .await
553 .map_err(|_| TimeKeeperIsGone)?;
554
555 let mut state = Arc::new(RwLock::new(TimerState::new(driver, VmTime::from_100ns(0))));
556 {
558 let state = Arc::get_mut(&mut state).unwrap().get_mut();
559 match time {
560 TimeState::Stopped(vmtime) => state.reset(vmtime),
561 TimeState::Started(timestamp) => state.start(timestamp),
562 }
563 }
564 let mut keeper = SecondaryKeeper {
565 state: state.clone(),
566 recv,
567 };
568 driver
569 .spawn("vm-time", async move { keeper.run().await })
570 .detach();
571 Ok(VmTimeSource {
572 state,
573 remote: self.clone(),
574 })
575 }
576}
577
578#[derive(Inspect)]
583struct PrimaryKeeper {
584 #[inspect(skip)]
585 req_recv: mesh::Receiver<KeeperRequest>,
586 #[inspect(skip)]
587 new_recv: mesh::Receiver<NewKeeperRequest>,
588 #[inspect(
589 with = "|x| inspect::iter_by_key(x.iter().map(|(id, sender)| (id, inspect::send(sender, KeeperRequest::Inspect))))"
590 )]
591 keepers: Vec<(u64, mesh::Sender<KeeperRequest>)>,
592 #[inspect(skip)]
593 next_id: u64,
594 time: TimeState,
595}
596
597#[derive(MeshPayload)]
598enum KeeperRequest {
599 Start(Rpc<Timestamp, ()>),
600 Stop(Rpc<(), VmTime>),
601 Reset(Rpc<VmTime, ()>),
602 Inspect(inspect::Deferred),
603}
604
605#[derive(MeshPayload)]
606enum NewKeeperRequest {
607 New(Rpc<mesh::Sender<KeeperRequest>, TimeState>),
608}
609
610impl PrimaryKeeper {
611 async fn run(&mut self) {
612 enum Event {
613 New(NewKeeperRequest),
614 Request(KeeperRequest),
615 }
616
617 while let Some(event) = (
618 (&mut self.new_recv).map(Event::New),
619 (&mut self.req_recv).map(Event::Request),
620 )
621 .merge()
622 .next()
623 .await
624 {
625 self.keepers.retain(|(_, s)| !s.is_closed());
627 match event {
628 Event::New(req) => match req {
629 NewKeeperRequest::New(rpc) => rpc.handle_sync(|sender| {
630 self.keepers.push((self.next_id, sender));
631 self.next_id += 1;
632 self.time
633 }),
634 },
635 Event::Request(req) => {
636 match req {
637 KeeperRequest::Start(rpc) => {
638 rpc.handle(async |start_time| {
639 assert!(!self.time.is_started());
640 self.time = TimeState::Started(start_time);
641 join_all(self.keepers.iter().map(|(_, sender)| {
642 sender.call(KeeperRequest::Start, start_time)
643 }))
644 .await;
645 })
646 .await
647 }
648 KeeperRequest::Stop(rpc) => {
649 rpc.handle(async |()| {
650 let results = join_all(
651 self.keepers
652 .iter()
653 .map(|(_, sender)| sender.call(KeeperRequest::Stop, ())),
654 )
655 .await;
656
657 let start_time = self.time.start_time().expect("should be running");
658 let now = start_time
659 .vmtime
660 .wrapping_add(Instant::now() - start_time.os_time());
661
662 let stop_time = results
666 .into_iter()
667 .filter_map(|r| r.ok())
668 .fold(now, |a, b| a.max(b));
669
670 self.time = TimeState::Stopped(stop_time);
671
672 join_all(self.keepers.iter().map(|(_, sender)| {
675 sender.call(KeeperRequest::Reset, stop_time)
676 }))
677 .await;
678
679 stop_time
680 })
681 .await
682 }
683 KeeperRequest::Reset(rpc) => {
684 rpc.handle(async |time| {
685 assert!(!self.time.is_started(), "should not be running");
686 self.time = TimeState::Stopped(time);
687 join_all(
688 self.keepers
689 .iter()
690 .map(|(_, sender)| sender.call(KeeperRequest::Reset, time)),
691 )
692 .await;
693 })
694 .await
695 }
696 KeeperRequest::Inspect(deferred) => deferred.inspect(&self),
697 }
698 }
699 }
700 }
701 }
702}
703
704#[derive(InspectMut)]
709struct SecondaryKeeper {
710 #[inspect(flatten)]
711 state: Arc<RwLock<TimerState>>,
712 #[inspect(skip)]
713 recv: mesh::Receiver<KeeperRequest>,
714}
715
716impl SecondaryKeeper {
717 async fn run(&mut self) {
718 loop {
719 let r = {
720 let state = &self.state;
721 (
722 self.recv.next(),
723 poll_fn(|cx| {
724 state.write().poll(cx);
725 Poll::Pending
726 }),
727 )
728 .race()
729 .await
730 };
731 match r {
732 Some(req) => match req {
733 KeeperRequest::Start(rpc) => rpc.handle_sync(|start_time| {
734 let mut state = self.state.write();
735 state.start(start_time);
736 }),
737 KeeperRequest::Reset(rpc) => rpc.handle_sync(|vmtime| {
738 let mut state = self.state.write();
739 state.reset(vmtime);
740 }),
741 KeeperRequest::Stop(rpc) => rpc.handle_sync(|()| {
742 let mut state = self.state.write();
743 state.stop(Instant::now())
744 }),
745 KeeperRequest::Inspect(deferred) => deferred.inspect(&mut *self),
746 },
747 None => break,
748 }
749 }
750 }
751}
752
753#[derive(Clone)]
755pub struct VmTimeSource {
756 state: Arc<RwLock<TimerState>>,
757 remote: VmTimeSourceBuilder,
758}
759
760impl VmTimeSource {
761 pub fn access(&self, name: impl Into<Arc<str>>) -> VmTimeAccess {
765 let name = name.into();
766 VmTimeAccess {
767 timeout: None,
768 waiting: false,
769 index: self
770 .state
771 .write()
772 .waiters
773 .insert(WaiterState::new(name.clone())),
774 state: self.state.clone(),
775 name,
776 }
777 }
778
779 pub fn builder(&self) -> &VmTimeSourceBuilder {
782 &self.remote
783 }
784}
785
786#[derive(Inspect)]
788pub struct VmTimeAccess {
789 timeout: Option<VmTime>,
790 waiting: bool,
791 #[inspect(skip)]
792 index: usize,
793 #[inspect(skip)]
794 state: Arc<RwLock<TimerState>>,
795 name: Arc<str>,
796}
797
798impl Drop for VmTimeAccess {
799 fn drop(&mut self) {
800 self.state.write().waiters.remove(self.index);
801 }
802}
803
804impl VmTimeAccess {
805 pub fn now(&self) -> VmTime {
807 let now = Instant::now();
808 self.state.read().now(now).vmtime
809 }
810
811 pub fn host_time(&self, time: VmTime) -> Option<Instant> {
818 Some(self.state.read().timestamp(time)?.os_time())
819 }
820
821 pub fn get_timeout(&self) -> Option<VmTime> {
823 self.timeout
824 }
825
826 pub fn set_timeout(&mut self, time: VmTime) {
828 self.timeout = Some(time);
829 if self.waiting {
830 self.state.write().update_timeout(self.index, time);
831 }
832 }
833
834 pub fn set_timeout_if_before(&mut self, time: VmTime) {
837 if self.timeout.is_none_or(|timeout| time.is_before(timeout)) {
838 self.set_timeout(time);
839 }
840 }
841
842 pub fn cancel_timeout(&mut self) {
844 if self.waiting && self.timeout.is_some() {
845 self.state.write().cancel_timeout(self.index);
846 }
847 self.timeout = None;
848 }
849
850 pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
862 let now = Instant::now();
863 match self
864 .state
865 .write()
866 .poll_timeout(cx, self.index, now, self.timeout)
867 {
868 Poll::Ready(now) => {
869 self.waiting = false;
870 self.timeout = None;
871 Poll::Ready(now.vmtime)
872 }
873 Poll::Pending => {
874 self.waiting = true;
875 Poll::Pending
876 }
877 }
878 }
879}
880
881#[derive(Debug, Inspect)]
882#[inspect(tag = "state")]
883enum VmTimerPeriodicInner {
884 Stopped,
885 Running {
886 last_timeout: VmTime,
887 period: Duration,
888 },
889}
890
891#[derive(Inspect)]
894pub struct VmTimerPeriodic {
895 vmtime: VmTimeAccess,
896 inner: VmTimerPeriodicInner,
897}
898
899impl VmTimerPeriodic {
900 pub fn new(vmtime_access: VmTimeAccess) -> Self {
902 Self {
903 vmtime: vmtime_access,
904 inner: VmTimerPeriodicInner::Stopped,
905 }
906 }
907
908 pub fn cancel(&mut self) {
912 self.vmtime.cancel_timeout();
913 self.inner = VmTimerPeriodicInner::Stopped;
914 }
915
916 pub fn start(&mut self, period: Duration) {
921 self.cancel();
922
923 let time = self.vmtime.now().wrapping_add(period);
924 self.vmtime.set_timeout(time);
925 self.inner = VmTimerPeriodicInner::Running {
926 last_timeout: time,
927 period,
928 }
929 }
930
931 pub fn is_running(&self) -> bool {
933 matches!(self.inner, VmTimerPeriodicInner::Running { .. })
934 }
935
936 pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
941 match self.inner {
942 VmTimerPeriodicInner::Stopped => {
943 assert_eq!(self.vmtime.get_timeout(), None);
944 self.vmtime.poll_timeout(cx)
947 }
948 VmTimerPeriodicInner::Running {
949 ref mut last_timeout,
950 period,
951 } => {
952 let mut res = Poll::Pending;
953 while let Poll::Ready(now) = self.vmtime.poll_timeout(cx) {
954 res = Poll::Ready(now);
955
956 let time = last_timeout.wrapping_add(period);
957 self.vmtime.set_timeout(time);
958 *last_timeout = time;
959 }
960 res
961 }
962 }
963 }
964}
965
966#[cfg(test)]
967mod tests {
968 use super::VmTime;
969 use super::VmTimeKeeper;
970 use futures::FutureExt;
971 use pal_async::DefaultDriver;
972 use pal_async::async_test;
973 use pal_async::timer::PolledTimer;
974 use std::future::poll_fn;
975 use std::time::Duration;
976
977 #[async_test]
978 async fn test_vmtime(driver: DefaultDriver) {
979 let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
980 let mut access = keeper
981 .builder()
982 .build(&driver)
983 .await
984 .unwrap()
985 .access("test");
986 keeper.start().await;
987
988 access.set_timeout(access.now().wrapping_add(Duration::from_secs(1000)));
990 let mut timer = PolledTimer::new(&driver);
991 futures::select! {
992 _ = timer.sleep(Duration::from_millis(50)).fuse() => {}
993 _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
994 }
995
996 let deadline = access.now().wrapping_add(Duration::from_millis(10));
998 access.set_timeout(deadline);
999 futures::select! {
1000 _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1001 now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1002 assert!(now.is_after(deadline));
1003 }
1004 }
1005 assert!(
1007 poll_fn(|cx| access.poll_timeout(cx))
1008 .now_or_never()
1009 .is_none()
1010 );
1011
1012 let now = access.now();
1014 let deadline = now.wrapping_add(Duration::from_millis(2000));
1015 access.set_timeout(deadline);
1016 futures::select! {
1017 _ = timer.sleep(Duration::from_millis(30)).fuse() => {
1018 let deadline = now.wrapping_add(Duration::from_millis(50));
1019 access.set_timeout(deadline);
1020 futures::select! {
1021 _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1022 now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1023 assert!(now.is_after(deadline));
1024 }
1025 }
1026 }
1027 _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
1028 }
1029 keeper.stop().await;
1030 }
1031
1032 #[async_test]
1033 async fn test_multi_vmtime(driver: DefaultDriver) {
1034 let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
1035 let src1 = keeper.builder().build(&driver).await.unwrap();
1036 keeper.start().await;
1037 let src2 = src1.builder().build(&driver).await.unwrap();
1038 let acc1 = src1.access("test");
1039 let acc2 = src2.access("test");
1040 {
1041 let t1 = acc1.now();
1042 let t2 = acc2.now();
1043 let t3 = acc1.now();
1044 assert!(!t2.is_before(t1), "{t1:?} {t2:?}");
1045 assert!(!t3.is_before(t2), "{t2:?} {t3:?}");
1046 }
1047 let now = acc1.now();
1048 keeper.stop().await;
1049 let t1 = acc1.now();
1050 let t2 = acc2.now();
1051 assert!(!t1.is_before(now));
1052 assert_eq!(t1, t2);
1053 let zero = VmTime::from_100ns(0);
1054 assert_ne!(t1, zero);
1056 keeper.reset().await;
1057 assert_eq!(acc1.now(), zero);
1058 assert_eq!(acc2.now(), zero);
1059 }
1060}