1pub use saved_state::SavedState;
25
26use futures::StreamExt;
27use futures::future::join_all;
28use futures_concurrency::future::Race;
29use futures_concurrency::stream::Merge;
30use inspect::Inspect;
31use inspect::InspectMut;
32use inspect::adhoc;
33use mesh::MeshPayload;
34use mesh::payload::Protobuf;
35use mesh::rpc::Rpc;
36use mesh::rpc::RpcSend;
37use pal_async::driver::Driver;
38use pal_async::driver::PollImpl;
39use pal_async::driver::SpawnDriver;
40use pal_async::task::Task;
41use pal_async::timer::Instant;
42use pal_async::timer::PollTimer;
43use parking_lot::RwLock;
44use save_restore_derive::SavedStateRoot;
45use slab::Slab;
46use std::future::poll_fn;
47use std::sync::Arc;
48use std::task::Context;
49use std::task::Poll;
50use std::task::Waker;
51use std::time::Duration;
52use thiserror::Error;
53
54#[derive(Debug, Copy, Clone, PartialEq, Eq, Protobuf, Inspect)]
56#[inspect(transparent)]
57#[mesh(transparent)]
58pub struct VmTime(#[inspect(hex)] u64);
59
60impl VmTime {
61 pub fn from_100ns(n: u64) -> Self {
63 Self(n)
64 }
65
66 pub const fn as_100ns(&self) -> u64 {
68 self.0
69 }
70
71 pub fn wrapping_add(self, d: Duration) -> Self {
73 Self((self.0 as u128).wrapping_add(d.as_nanos() / 100) as u64)
74 }
75
76 pub fn is_before(self, t: Self) -> bool {
82 let delta = self.0.wrapping_sub(t.0);
83 (delta as i64) < 0
84 }
85
86 pub fn is_after(self, t: Self) -> bool {
90 let delta = self.0.wrapping_sub(t.0);
91 (delta as i64) > 0
92 }
93
94 pub fn checked_sub(self, t: Self) -> Option<Duration> {
97 let delta = self.0.wrapping_sub(t.0);
98 if (delta as i64) >= 0 {
99 Some(duration_from_100ns(delta))
100 } else {
101 None
102 }
103 }
104
105 pub fn min(self, t: Self) -> Self {
107 if self.is_before(t) { self } else { t }
108 }
109
110 pub fn max(self, t: Self) -> Self {
112 if self.is_before(t) { t } else { self }
113 }
114}
115
116fn duration_from_100ns(n: u64) -> Duration {
117 const NUM_100NS_IN_SEC: u64 = 10 * 1000 * 1000;
118 Duration::new(n / NUM_100NS_IN_SEC, (n % NUM_100NS_IN_SEC) as u32 * 100)
119}
120
121#[derive(Inspect)]
122struct TimerState {
123 time: TimeState,
124 #[inspect(skip)]
125 timer: PollImpl<dyn PollTimer>,
126 #[inspect(with = "|x| inspect::iter_by_key(x.iter().map(|(_, w)| (&w.name, w)))")]
127 waiters: Slab<WaiterState>,
128 next: Option<VmTime>,
129 last: VmTime,
130}
131
132#[derive(Debug, Inspect)]
133struct WaiterState {
134 #[inspect(skip)] name: Arc<str>,
136 next: Option<VmTime>,
137 #[inspect(rename = "waiting", with = "Option::is_some")]
138 waker: Option<Waker>,
139}
140
141impl WaiterState {
142 fn new(name: Arc<str>) -> Self {
143 Self {
144 name,
145 next: None,
146 waker: None,
147 }
148 }
149}
150
151#[derive(Copy, Clone, Debug, Protobuf)]
152struct Timestamp {
153 vmtime: VmTime,
154 os_time: u64, }
156
157impl Timestamp {
158 fn new(vmtime: VmTime, os_time: Instant) -> Self {
159 Self {
160 vmtime,
161 os_time: os_time.as_nanos(),
162 }
163 }
164
165 fn os_time(&self) -> Instant {
166 Instant::from_nanos(self.os_time)
167 }
168}
169
170impl TimerState {
171 fn new(driver: &impl Driver, uptime: VmTime) -> Self {
172 Self {
173 time: TimeState::Stopped(uptime),
174 timer: driver.new_dyn_timer(),
175 waiters: Slab::new(),
176 next: None,
177 last: uptime,
178 }
179 }
180
181 fn start(&mut self, now: Timestamp) {
183 let vmtime = self.time.stop_time().expect("should be stopped");
184 assert_eq!(now.vmtime, vmtime);
185 self.time = TimeState::Started(now);
186 tracing::trace!(?now, "vmtime start");
187 self.wake(now);
188 }
189
190 fn stop(&mut self, now_os: Instant) -> VmTime {
192 assert!(self.time.is_started());
193 let now = self.now(now_os);
194 self.time = TimeState::Stopped(now.vmtime);
195 tracing::debug!(?now, "vmtime stop");
196 now.vmtime
197 }
198
199 fn reset(&mut self, time: VmTime) {
201 assert!(!self.time.is_started());
202 self.time = TimeState::Stopped(time);
203 self.last = time;
204 self.next = None;
205 for (_, waiter) in &mut self.waiters {
207 if let Some(waker) = waiter.waker.take() {
208 waker.wake();
209 }
210 }
211 }
212
213 fn timestamp(&self, time: VmTime) -> Option<Timestamp> {
218 let start_time = self.time.start_time()?;
219 let since = time
220 .checked_sub(start_time.vmtime)
221 .unwrap_or(Duration::ZERO);
222 Some(Timestamp::new(time, start_time.os_time() + since))
223 }
224
225 fn now(&self, now_os: Instant) -> Timestamp {
227 self.time.now(now_os)
228 }
229
230 fn set_next(&mut self, next: VmTime) {
231 if !self.time.is_started() {
232 return;
233 }
234 if self
235 .next
236 .is_none_or(|current_next| next.is_before(current_next))
237 {
238 let deadline = self.timestamp(next).unwrap().os_time();
239 tracing::trace!(?deadline, "updating deadline");
240 self.timer.set_deadline(deadline);
241 self.next = Some(next);
242 }
243 }
244
245 fn wake(&mut self, now: Timestamp) {
246 assert!(!now.vmtime.is_before(self.last));
247 self.last = now.vmtime;
248 let mut next = None;
249 for (_, state) in &mut self.waiters {
250 if let Some(this_next) = state.next {
251 if this_next.is_after(now.vmtime) {
252 if next.is_none_or(|next| this_next.is_before(next)) {
253 next = Some(this_next);
254 }
255 } else if let Some(waker) = state.waker.take() {
256 waker.wake();
257 }
258 }
259 }
260 if let Some(next) = next {
261 self.set_next(next);
262 }
263 }
264
265 fn cancel_timeout(&mut self, index: usize) {
266 self.waiters[index].next = None;
267 }
268
269 fn update_timeout(&mut self, index: usize, time: VmTime) {
271 let state = &mut self.waiters[index];
272 tracing::trace!(vmtime = ?time, user = state.name.as_ref(), "timeout update");
273 state.next = Some(time);
274 if time.is_before(self.last) {
275 if let Some(waker) = state.waker.take() {
278 waker.wake();
279 }
280 return;
281 }
282
283 if self.next.is_some_and(|next| next.is_before(time)) {
285 return;
286 }
287 self.set_next(time);
288 }
289
290 fn poll_timeout(
292 &mut self,
293 cx: &mut Context<'_>,
294 index: usize,
295 now_os: Instant,
296 next: Option<VmTime>,
297 ) -> Poll<Timestamp> {
298 let now = self.now(now_os);
299 let state = &mut self.waiters[index];
300 if next.is_some_and(|next| next.is_before(now.vmtime)) {
301 state.waker = None;
302 state.next = None;
303 return Poll::Ready(now);
304 }
305 state.next = next;
306 state.waker = Some(cx.waker().clone());
307 if let Some(next) = next {
308 self.set_next(next);
309 }
310 Poll::Pending
311 }
312
313 fn poll(&mut self, cx: &mut Context<'_>) {
315 while self.time.is_started() {
316 let next = match self.next {
317 Some(_) => {
318 tracing::trace!("polling existing deadline");
320 None
321 }
322 None => {
323 let deadline = Instant::now() + Duration::from_secs(86400);
325 tracing::trace!(?deadline, "polling with long timeout");
326 Some(deadline)
327 }
328 };
329 if let Poll::Ready(now) = self.timer.poll_timer(cx, next) {
330 self.next = None;
331 self.wake(self.now(now));
332 } else {
333 return;
334 }
335 }
336 }
337}
338
339#[derive(Debug)]
341pub struct VmTimeKeeper {
342 _task: Task<()>,
343 req_send: mesh::Sender<KeeperRequest>,
344 builder: VmTimeSourceBuilder,
345 time: TimeState,
346}
347
348#[expect(unsafe_code)]
350mod saved_state {
351 use super::*;
352
353 #[derive(Protobuf, SavedStateRoot)]
355 #[mesh(package = "vmtime")]
356 pub struct SavedState {
357 #[mesh(1)]
358 pub(super) vmtime: VmTime,
359 }
360
361 impl SavedState {
362 pub fn from_vmtime(vmtime: VmTime) -> Self {
364 SavedState { vmtime }
365 }
366 }
367}
368
369#[derive(Debug, MeshPayload, Copy, Clone)]
370enum TimeState {
371 Stopped(VmTime),
372 Started(Timestamp),
373}
374
375impl Inspect for TimeState {
376 fn inspect(&self, req: inspect::Request<'_>) {
377 let mut resp = req.respond();
378 let state = match *self {
379 TimeState::Stopped(_time) => "stopped",
380 TimeState::Started(time) => {
381 resp.field("start_time", time.vmtime);
382 "started"
383 }
384 };
385 resp.field("state", state)
386 .field("now", self.now(Instant::now()).vmtime);
387 }
388}
389
390impl TimeState {
391 fn is_started(&self) -> bool {
392 self.start_time().is_some()
393 }
394
395 fn stop_time(&self) -> Option<VmTime> {
396 match *self {
397 TimeState::Stopped(time) => Some(time),
398 TimeState::Started(_) => None,
399 }
400 }
401
402 fn start_time(&self) -> Option<Timestamp> {
403 match *self {
404 TimeState::Stopped(_) => None,
405 TimeState::Started(time) => Some(time),
406 }
407 }
408
409 fn now(&self, now_os: Instant) -> Timestamp {
410 match *self {
411 TimeState::Stopped(time) => Timestamp::new(time, now_os),
412 TimeState::Started(start_time) => {
413 if now_os >= start_time.os_time() {
414 Timestamp::new(
415 start_time
416 .vmtime
417 .wrapping_add(now_os - start_time.os_time()),
418 now_os,
419 )
420 } else {
421 let delta = start_time.os_time() - now_os;
429 if delta > Duration::from_secs(1) {
430 tracing::error!(
431 now = now_os.as_nanos(),
432 start_host = start_time.os_time().as_nanos(),
433 ?delta,
434 "time went backward"
435 );
436 }
437 start_time
438 }
439 }
440 }
441 }
442}
443
444impl InspectMut for VmTimeKeeper {
445 fn inspect_mut(&mut self, req: inspect::Request<'_>) {
446 self.req_send.send(KeeperRequest::Inspect(req.defer()));
447 }
448}
449
450impl VmTimeKeeper {
451 pub fn new(driver: &impl SpawnDriver, uptime: VmTime) -> Self {
453 let (new_send, new_recv) = mesh::mpsc_channel();
454 let (req_send, req_recv) = mesh::channel();
455 let time = TimeState::Stopped(uptime);
456 let task = driver.spawn("vm-time-keeper", async move {
457 let mut primary = PrimaryKeeper {
458 req_recv,
459 new_recv,
460 keepers: Vec::new(),
461 next_id: 0,
462 time,
463 };
464 primary.run().await;
465 });
466 Self {
467 time,
468 req_send,
469 builder: VmTimeSourceBuilder { new_send },
470 _task: task,
471 }
472 }
473
474 pub fn save(&self) -> SavedState {
476 SavedState {
477 vmtime: self.time.stop_time().expect("should be stopped"),
478 }
479 }
480
481 pub async fn restore(&mut self, state: SavedState) {
483 let SavedState { vmtime } = state;
484 self.reset_to(vmtime).await
485 }
486
487 async fn reset_to(&mut self, vmtime: VmTime) {
488 assert!(!self.time.is_started(), "should be stopped");
489 self.time = TimeState::Stopped(vmtime);
490 self.req_send
491 .call(KeeperRequest::Reset, vmtime)
492 .await
493 .unwrap();
494 }
495
496 pub async fn reset(&mut self) {
498 self.reset_to(VmTime::from_100ns(0)).await
499 }
500
501 pub async fn start(&mut self) {
503 let vmtime = self.time.stop_time().expect("should be stopped");
504 let timestamp = Timestamp::new(vmtime, Instant::now());
505 self.time = TimeState::Started(timestamp);
506 self.req_send
507 .call(KeeperRequest::Start, timestamp)
508 .await
509 .unwrap();
510 }
511
512 pub async fn stop(&mut self) {
514 assert!(self.time.is_started(), "should be running");
515 let stop_time = self.req_send.call(KeeperRequest::Stop, ()).await.unwrap();
516 self.time = TimeState::Stopped(stop_time);
517 }
518
519 pub fn builder(&self) -> &VmTimeSourceBuilder {
522 &self.builder
523 }
524}
525
526#[derive(MeshPayload, Clone, Debug)]
536pub struct VmTimeSourceBuilder {
537 new_send: mesh::Sender<NewKeeperRequest>,
538}
539
540#[derive(Debug, Error)]
543#[error("the time keeper has been torn down")]
544pub struct TimeKeeperIsGone;
545
546impl VmTimeSourceBuilder {
547 pub async fn build(&self, driver: &impl SpawnDriver) -> Result<VmTimeSource, TimeKeeperIsGone> {
551 let (send, recv) = mesh::channel();
552 let time = self
553 .new_send
554 .call(NewKeeperRequest::New, send)
555 .await
556 .map_err(|_| TimeKeeperIsGone)?;
557
558 let mut state = Arc::new(RwLock::new(TimerState::new(driver, VmTime::from_100ns(0))));
559 {
561 let state = Arc::get_mut(&mut state).unwrap().get_mut();
562 match time {
563 TimeState::Stopped(vmtime) => state.reset(vmtime),
564 TimeState::Started(timestamp) => state.start(timestamp),
565 }
566 }
567 let mut keeper = SecondaryKeeper {
568 state: state.clone(),
569 recv,
570 };
571 driver
572 .spawn("vm-time", async move { keeper.run().await })
573 .detach();
574 Ok(VmTimeSource {
575 state,
576 remote: self.clone(),
577 })
578 }
579}
580
581#[derive(Inspect)]
586#[inspect(extra = "Self::inspect_extra")]
587struct PrimaryKeeper {
588 #[inspect(skip)]
589 req_recv: mesh::Receiver<KeeperRequest>,
590 #[inspect(skip)]
591 new_recv: mesh::Receiver<NewKeeperRequest>,
592 #[inspect(skip)]
593 keepers: Vec<(u64, mesh::Sender<KeeperRequest>)>,
594 #[inspect(skip)]
595 next_id: u64,
596 time: TimeState,
597}
598
599#[derive(MeshPayload)]
600enum KeeperRequest {
601 Start(Rpc<Timestamp, ()>),
602 Stop(Rpc<(), VmTime>),
603 Reset(Rpc<VmTime, ()>),
604 Inspect(inspect::Deferred),
605}
606
607#[derive(MeshPayload)]
608enum NewKeeperRequest {
609 New(Rpc<mesh::Sender<KeeperRequest>, TimeState>),
610}
611
612impl PrimaryKeeper {
613 fn inspect_extra(&self, resp: &mut inspect::Response<'_>) {
614 resp.fields(
615 "keepers",
616 self.keepers
617 .iter()
618 .map(|&(id, ref s)| (id, adhoc(|req| s.send(KeeperRequest::Inspect(req.defer()))))),
619 );
620 }
621
622 async fn run(&mut self) {
623 enum Event {
624 New(NewKeeperRequest),
625 Request(KeeperRequest),
626 }
627
628 while let Some(event) = (
629 (&mut self.new_recv).map(Event::New),
630 (&mut self.req_recv).map(Event::Request),
631 )
632 .merge()
633 .next()
634 .await
635 {
636 self.keepers.retain(|(_, s)| !s.is_closed());
638 match event {
639 Event::New(req) => match req {
640 NewKeeperRequest::New(rpc) => rpc.handle_sync(|sender| {
641 self.keepers.push((self.next_id, sender));
642 self.next_id += 1;
643 self.time
644 }),
645 },
646 Event::Request(req) => {
647 match req {
648 KeeperRequest::Start(rpc) => {
649 rpc.handle(async |start_time| {
650 assert!(!self.time.is_started());
651 self.time = TimeState::Started(start_time);
652 join_all(self.keepers.iter().map(|(_, sender)| {
653 sender.call(KeeperRequest::Start, start_time)
654 }))
655 .await;
656 })
657 .await
658 }
659 KeeperRequest::Stop(rpc) => {
660 rpc.handle(async |()| {
661 let results = join_all(
662 self.keepers
663 .iter()
664 .map(|(_, sender)| sender.call(KeeperRequest::Stop, ())),
665 )
666 .await;
667
668 let start_time = self.time.start_time().expect("should be running");
669 let now = start_time
670 .vmtime
671 .wrapping_add(Instant::now() - start_time.os_time());
672
673 let stop_time = results
677 .into_iter()
678 .filter_map(|r| r.ok())
679 .fold(now, |a, b| a.max(b));
680
681 self.time = TimeState::Stopped(stop_time);
682
683 join_all(self.keepers.iter().map(|(_, sender)| {
686 sender.call(KeeperRequest::Reset, stop_time)
687 }))
688 .await;
689
690 stop_time
691 })
692 .await
693 }
694 KeeperRequest::Reset(rpc) => {
695 rpc.handle(async |time| {
696 assert!(!self.time.is_started(), "should not be running");
697 self.time = TimeState::Stopped(time);
698 join_all(
699 self.keepers
700 .iter()
701 .map(|(_, sender)| sender.call(KeeperRequest::Reset, time)),
702 )
703 .await;
704 })
705 .await
706 }
707 KeeperRequest::Inspect(deferred) => deferred.inspect(&self),
708 }
709 }
710 }
711 }
712 }
713}
714
715#[derive(InspectMut)]
720struct SecondaryKeeper {
721 #[inspect(flatten)]
722 state: Arc<RwLock<TimerState>>,
723 #[inspect(skip)]
724 recv: mesh::Receiver<KeeperRequest>,
725}
726
727impl SecondaryKeeper {
728 async fn run(&mut self) {
729 loop {
730 let r = {
731 let state = &self.state;
732 (
733 self.recv.next(),
734 poll_fn(|cx| {
735 state.write().poll(cx);
736 Poll::Pending
737 }),
738 )
739 .race()
740 .await
741 };
742 match r {
743 Some(req) => match req {
744 KeeperRequest::Start(rpc) => rpc.handle_sync(|start_time| {
745 let mut state = self.state.write();
746 state.start(start_time);
747 }),
748 KeeperRequest::Reset(rpc) => rpc.handle_sync(|vmtime| {
749 let mut state = self.state.write();
750 state.reset(vmtime);
751 }),
752 KeeperRequest::Stop(rpc) => rpc.handle_sync(|()| {
753 let mut state = self.state.write();
754 state.stop(Instant::now())
755 }),
756 KeeperRequest::Inspect(deferred) => deferred.inspect(&mut *self),
757 },
758 None => break,
759 }
760 }
761 }
762}
763
764#[derive(Clone)]
766pub struct VmTimeSource {
767 state: Arc<RwLock<TimerState>>,
768 remote: VmTimeSourceBuilder,
769}
770
771impl VmTimeSource {
772 pub fn access(&self, name: impl Into<Arc<str>>) -> VmTimeAccess {
776 let name = name.into();
777 VmTimeAccess {
778 timeout: None,
779 waiting: false,
780 index: self
781 .state
782 .write()
783 .waiters
784 .insert(WaiterState::new(name.clone())),
785 state: self.state.clone(),
786 name,
787 }
788 }
789
790 pub fn builder(&self) -> &VmTimeSourceBuilder {
793 &self.remote
794 }
795}
796
797#[derive(Inspect)]
799pub struct VmTimeAccess {
800 timeout: Option<VmTime>,
801 waiting: bool,
802 #[inspect(skip)]
803 index: usize,
804 #[inspect(skip)]
805 state: Arc<RwLock<TimerState>>,
806 name: Arc<str>,
807}
808
809impl Drop for VmTimeAccess {
810 fn drop(&mut self) {
811 self.state.write().waiters.remove(self.index);
812 }
813}
814
815impl VmTimeAccess {
816 pub fn now(&self) -> VmTime {
818 let now = Instant::now();
819 self.state.read().now(now).vmtime
820 }
821
822 pub fn host_time(&self, time: VmTime) -> Option<Instant> {
829 Some(self.state.read().timestamp(time)?.os_time())
830 }
831
832 pub fn get_timeout(&self) -> Option<VmTime> {
834 self.timeout
835 }
836
837 pub fn set_timeout(&mut self, time: VmTime) {
839 self.timeout = Some(time);
840 if self.waiting {
841 self.state.write().update_timeout(self.index, time);
842 }
843 }
844
845 pub fn set_timeout_if_before(&mut self, time: VmTime) {
848 if self.timeout.is_none_or(|timeout| time.is_before(timeout)) {
849 self.set_timeout(time);
850 }
851 }
852
853 pub fn cancel_timeout(&mut self) {
855 if self.waiting && self.timeout.is_some() {
856 self.state.write().cancel_timeout(self.index);
857 }
858 self.timeout = None;
859 }
860
861 pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
873 let now = Instant::now();
874 match self
875 .state
876 .write()
877 .poll_timeout(cx, self.index, now, self.timeout)
878 {
879 Poll::Ready(now) => {
880 self.waiting = false;
881 self.timeout = None;
882 Poll::Ready(now.vmtime)
883 }
884 Poll::Pending => {
885 self.waiting = true;
886 Poll::Pending
887 }
888 }
889 }
890}
891
892#[derive(Debug, Inspect)]
893#[inspect(tag = "state")]
894enum VmTimerPeriodicInner {
895 Stopped,
896 Running {
897 last_timeout: VmTime,
898 #[inspect(debug)]
899 period: Duration,
900 },
901}
902
903#[derive(Inspect)]
906pub struct VmTimerPeriodic {
907 vmtime: VmTimeAccess,
908 inner: VmTimerPeriodicInner,
909}
910
911impl VmTimerPeriodic {
912 pub fn new(vmtime_access: VmTimeAccess) -> Self {
914 Self {
915 vmtime: vmtime_access,
916 inner: VmTimerPeriodicInner::Stopped,
917 }
918 }
919
920 pub fn cancel(&mut self) {
924 self.vmtime.cancel_timeout();
925 self.inner = VmTimerPeriodicInner::Stopped;
926 }
927
928 pub fn start(&mut self, period: Duration) {
933 self.cancel();
934
935 let time = self.vmtime.now().wrapping_add(period);
936 self.vmtime.set_timeout(time);
937 self.inner = VmTimerPeriodicInner::Running {
938 last_timeout: time,
939 period,
940 }
941 }
942
943 pub fn is_running(&self) -> bool {
945 matches!(self.inner, VmTimerPeriodicInner::Running { .. })
946 }
947
948 pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
953 match self.inner {
954 VmTimerPeriodicInner::Stopped => {
955 assert_eq!(self.vmtime.get_timeout(), None);
956 self.vmtime.poll_timeout(cx)
959 }
960 VmTimerPeriodicInner::Running {
961 ref mut last_timeout,
962 period,
963 } => {
964 let mut res = Poll::Pending;
965 while let Poll::Ready(now) = self.vmtime.poll_timeout(cx) {
966 res = Poll::Ready(now);
967
968 let time = last_timeout.wrapping_add(period);
969 self.vmtime.set_timeout(time);
970 *last_timeout = time;
971 }
972 res
973 }
974 }
975 }
976}
977
978#[cfg(test)]
979mod tests {
980 use super::VmTime;
981 use super::VmTimeKeeper;
982 use futures::FutureExt;
983 use pal_async::DefaultDriver;
984 use pal_async::async_test;
985 use pal_async::timer::PolledTimer;
986 use std::future::poll_fn;
987 use std::time::Duration;
988
989 #[async_test]
990 async fn test_vmtime(driver: DefaultDriver) {
991 let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
992 let mut access = keeper
993 .builder()
994 .build(&driver)
995 .await
996 .unwrap()
997 .access("test");
998 keeper.start().await;
999
1000 access.set_timeout(access.now().wrapping_add(Duration::from_secs(1000)));
1002 let mut timer = PolledTimer::new(&driver);
1003 futures::select! {
1004 _ = timer.sleep(Duration::from_millis(50)).fuse() => {}
1005 _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
1006 }
1007
1008 let deadline = access.now().wrapping_add(Duration::from_millis(10));
1010 access.set_timeout(deadline);
1011 futures::select! {
1012 _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1013 now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1014 assert!(now.is_after(deadline));
1015 }
1016 }
1017 assert!(
1019 poll_fn(|cx| access.poll_timeout(cx))
1020 .now_or_never()
1021 .is_none()
1022 );
1023
1024 let now = access.now();
1026 let deadline = now.wrapping_add(Duration::from_millis(2000));
1027 access.set_timeout(deadline);
1028 futures::select! {
1029 _ = timer.sleep(Duration::from_millis(30)).fuse() => {
1030 let deadline = now.wrapping_add(Duration::from_millis(50));
1031 access.set_timeout(deadline);
1032 futures::select! {
1033 _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1034 now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1035 assert!(now.is_after(deadline));
1036 }
1037 }
1038 }
1039 _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
1040 }
1041 keeper.stop().await;
1042 }
1043
1044 #[async_test]
1045 async fn test_multi_vmtime(driver: DefaultDriver) {
1046 let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
1047 let src1 = keeper.builder().build(&driver).await.unwrap();
1048 keeper.start().await;
1049 let src2 = src1.builder().build(&driver).await.unwrap();
1050 let acc1 = src1.access("test");
1051 let acc2 = src2.access("test");
1052 {
1053 let t1 = acc1.now();
1054 let t2 = acc2.now();
1055 let t3 = acc1.now();
1056 assert!(!t2.is_before(t1), "{t1:?} {t2:?}");
1057 assert!(!t3.is_before(t2), "{t2:?} {t3:?}");
1058 }
1059 let now = acc1.now();
1060 keeper.stop().await;
1061 let t1 = acc1.now();
1062 let t2 = acc2.now();
1063 assert!(!t1.is_before(now));
1064 assert_eq!(t1, t2);
1065 let zero = VmTime::from_100ns(0);
1066 assert_ne!(t1, zero);
1068 keeper.reset().await;
1069 assert_eq!(acc1.now(), zero);
1070 assert_eq!(acc2.now(), zero);
1071 }
1072}