1#![warn(missing_docs)]
25
26use futures::StreamExt;
27use futures::future::join_all;
28use futures_concurrency::future::Race;
29use futures_concurrency::stream::Merge;
30use inspect::Inspect;
31use inspect::InspectMut;
32use inspect::adhoc;
33use mesh::MeshPayload;
34use mesh::payload::Protobuf;
35use mesh::rpc::Rpc;
36use mesh::rpc::RpcSend;
37use pal_async::driver::Driver;
38use pal_async::driver::PollImpl;
39use pal_async::driver::SpawnDriver;
40use pal_async::task::Task;
41use pal_async::timer::Instant;
42use pal_async::timer::PollTimer;
43use parking_lot::RwLock;
44use save_restore_derive::SavedStateRoot;
45use slab::Slab;
46use std::future::poll_fn;
47use std::sync::Arc;
48use std::task::Context;
49use std::task::Poll;
50use std::task::Waker;
51use std::time::Duration;
52use thiserror::Error;
53
54#[derive(Debug, Copy, Clone, PartialEq, Eq, Protobuf, Inspect)]
56#[inspect(transparent)]
57#[mesh(transparent)]
58pub struct VmTime(#[inspect(hex)] u64);
59
60impl VmTime {
61 pub fn from_100ns(n: u64) -> Self {
63 Self(n)
64 }
65
66 pub const fn as_100ns(&self) -> u64 {
68 self.0
69 }
70
71 pub fn wrapping_add(self, d: Duration) -> Self {
73 Self((self.0 as u128).wrapping_add(d.as_nanos() / 100) as u64)
74 }
75
76 pub fn is_before(self, t: Self) -> bool {
82 let delta = self.0.wrapping_sub(t.0);
83 (delta as i64) < 0
84 }
85
86 pub fn is_after(self, t: Self) -> bool {
90 let delta = self.0.wrapping_sub(t.0);
91 (delta as i64) > 0
92 }
93
94 pub fn checked_sub(self, t: Self) -> Option<Duration> {
97 let delta = self.0.wrapping_sub(t.0);
98 if (delta as i64) >= 0 {
99 Some(duration_from_100ns(delta))
100 } else {
101 None
102 }
103 }
104
105 pub fn min(self, t: Self) -> Self {
107 if self.is_before(t) { self } else { t }
108 }
109
110 pub fn max(self, t: Self) -> Self {
112 if self.is_before(t) { t } else { self }
113 }
114}
115
116fn duration_from_100ns(n: u64) -> Duration {
117 const NUM_100NS_IN_SEC: u64 = 10 * 1000 * 1000;
118 Duration::new(n / NUM_100NS_IN_SEC, (n % NUM_100NS_IN_SEC) as u32 * 100)
119}
120
121#[derive(Inspect)]
122struct TimerState {
123 time: TimeState,
124 #[inspect(skip)]
125 timer: PollImpl<dyn PollTimer>,
126 #[inspect(with = "|x| inspect::iter_by_key(x.iter().map(|(_, w)| (&w.name, w)))")]
127 waiters: Slab<WaiterState>,
128 next: Option<VmTime>,
129 last: VmTime,
130}
131
132#[derive(Debug, Inspect)]
133struct WaiterState {
134 #[inspect(skip)] name: Arc<str>,
136 next: Option<VmTime>,
137 #[inspect(rename = "waiting", with = "Option::is_some")]
138 waker: Option<Waker>,
139}
140
141impl WaiterState {
142 fn new(name: Arc<str>) -> Self {
143 Self {
144 name,
145 next: None,
146 waker: None,
147 }
148 }
149}
150
151#[derive(Copy, Clone, Debug, Protobuf)]
152struct Timestamp {
153 vmtime: VmTime,
154 os_time: u64, }
156
157impl Timestamp {
158 fn new(vmtime: VmTime, os_time: Instant) -> Self {
159 Self {
160 vmtime,
161 os_time: os_time.as_nanos(),
162 }
163 }
164
165 fn os_time(&self) -> Instant {
166 Instant::from_nanos(self.os_time)
167 }
168}
169
170impl TimerState {
171 fn new(driver: &impl Driver, uptime: VmTime) -> Self {
172 Self {
173 time: TimeState::Stopped(uptime),
174 timer: driver.new_dyn_timer(),
175 waiters: Slab::new(),
176 next: None,
177 last: uptime,
178 }
179 }
180
181 fn start(&mut self, now: Timestamp) {
183 let vmtime = self.time.stop_time().expect("should be stopped");
184 assert_eq!(now.vmtime, vmtime);
185 self.time = TimeState::Started(now);
186 tracing::trace!(?now, "vmtime start");
187 self.wake(now);
188 }
189
190 fn stop(&mut self, now_os: Instant) -> VmTime {
192 assert!(self.time.is_started());
193 let now = self.now(now_os);
194 self.time = TimeState::Stopped(now.vmtime);
195 tracing::debug!(?now, "vmtime stop");
196 now.vmtime
197 }
198
199 fn reset(&mut self, time: VmTime) {
201 assert!(!self.time.is_started());
202 self.time = TimeState::Stopped(time);
203 self.last = time;
204 self.next = None;
205 for (_, waiter) in &mut self.waiters {
207 if let Some(waker) = waiter.waker.take() {
208 waker.wake();
209 }
210 }
211 }
212
213 fn timestamp(&self, time: VmTime) -> Option<Timestamp> {
218 let start_time = self.time.start_time()?;
219 let since = time
220 .checked_sub(start_time.vmtime)
221 .unwrap_or(Duration::ZERO);
222 Some(Timestamp::new(time, start_time.os_time() + since))
223 }
224
225 fn now(&self, now_os: Instant) -> Timestamp {
227 self.time.now(now_os)
228 }
229
230 fn set_next(&mut self, next: VmTime) {
231 if !self.time.is_started() {
232 return;
233 }
234 if self
235 .next
236 .is_none_or(|current_next| next.is_before(current_next))
237 {
238 let deadline = self.timestamp(next).unwrap().os_time();
239 tracing::trace!(?deadline, "updating deadline");
240 self.timer.set_deadline(deadline);
241 self.next = Some(next);
242 }
243 }
244
245 fn wake(&mut self, now: Timestamp) {
246 assert!(!now.vmtime.is_before(self.last));
247 self.last = now.vmtime;
248 let mut next = None;
249 for (_, state) in &mut self.waiters {
250 if let Some(this_next) = state.next {
251 if this_next.is_after(now.vmtime) {
252 if next.is_none_or(|next| this_next.is_before(next)) {
253 next = Some(this_next);
254 }
255 } else if let Some(waker) = state.waker.take() {
256 waker.wake();
257 }
258 }
259 }
260 if let Some(next) = next {
261 self.set_next(next);
262 }
263 }
264
265 fn cancel_timeout(&mut self, index: usize) {
266 self.waiters[index].next = None;
267 }
268
269 fn update_timeout(&mut self, index: usize, time: VmTime) {
271 let state = &mut self.waiters[index];
272 tracing::trace!(vmtime = ?time, user = state.name.as_ref(), "timeout update");
273 state.next = Some(time);
274 if time.is_before(self.last) {
275 if let Some(waker) = state.waker.take() {
278 waker.wake();
279 }
280 return;
281 }
282
283 if self.next.is_some_and(|next| next.is_before(time)) {
285 return;
286 }
287 self.set_next(time);
288 }
289
290 fn poll_timeout(
292 &mut self,
293 cx: &mut Context<'_>,
294 index: usize,
295 now_os: Instant,
296 next: Option<VmTime>,
297 ) -> Poll<Timestamp> {
298 let now = self.now(now_os);
299 let state = &mut self.waiters[index];
300 if next.is_some_and(|next| next.is_before(now.vmtime)) {
301 state.waker = None;
302 state.next = None;
303 return Poll::Ready(now);
304 }
305 state.next = next;
306 state.waker = Some(cx.waker().clone());
307 if let Some(next) = next {
308 self.set_next(next);
309 }
310 Poll::Pending
311 }
312
313 fn poll(&mut self, cx: &mut Context<'_>) {
315 while self.time.is_started() {
316 let next = match self.next {
317 Some(_) => {
318 tracing::trace!("polling existing deadline");
320 None
321 }
322 None => {
323 let deadline = Instant::now() + Duration::from_secs(86400);
325 tracing::trace!(?deadline, "polling with long timeout");
326 Some(deadline)
327 }
328 };
329 if let Poll::Ready(now) = self.timer.poll_timer(cx, next) {
330 self.next = None;
331 self.wake(self.now(now));
332 } else {
333 return;
334 }
335 }
336 }
337}
338
339#[derive(Debug)]
341pub struct VmTimeKeeper {
342 _task: Task<()>,
343 req_send: mesh::Sender<KeeperRequest>,
344 builder: VmTimeSourceBuilder,
345 time: TimeState,
346}
347
348#[derive(Protobuf, SavedStateRoot)]
350#[mesh(package = "vmtime")]
351pub struct SavedState {
352 #[mesh(1)]
353 vmtime: VmTime,
354}
355
356impl SavedState {
357 pub fn from_vmtime(vmtime: VmTime) -> Self {
359 SavedState { vmtime }
360 }
361}
362
363#[derive(Debug, MeshPayload, Copy, Clone)]
364enum TimeState {
365 Stopped(VmTime),
366 Started(Timestamp),
367}
368
369impl Inspect for TimeState {
370 fn inspect(&self, req: inspect::Request<'_>) {
371 let mut resp = req.respond();
372 let state = match *self {
373 TimeState::Stopped(_time) => "stopped",
374 TimeState::Started(time) => {
375 resp.field("start_time", time.vmtime);
376 "started"
377 }
378 };
379 resp.field("state", state)
380 .field("now", self.now(Instant::now()).vmtime);
381 }
382}
383
384impl TimeState {
385 fn is_started(&self) -> bool {
386 self.start_time().is_some()
387 }
388
389 fn stop_time(&self) -> Option<VmTime> {
390 match *self {
391 TimeState::Stopped(time) => Some(time),
392 TimeState::Started(_) => None,
393 }
394 }
395
396 fn start_time(&self) -> Option<Timestamp> {
397 match *self {
398 TimeState::Stopped(_) => None,
399 TimeState::Started(time) => Some(time),
400 }
401 }
402
403 fn now(&self, now_os: Instant) -> Timestamp {
404 match *self {
405 TimeState::Stopped(time) => Timestamp::new(time, now_os),
406 TimeState::Started(start_time) => {
407 if now_os >= start_time.os_time() {
408 Timestamp::new(
409 start_time
410 .vmtime
411 .wrapping_add(now_os - start_time.os_time()),
412 now_os,
413 )
414 } else {
415 let delta = start_time.os_time() - now_os;
423 if delta > Duration::from_secs(1) {
424 tracing::error!(
425 now = now_os.as_nanos(),
426 start_host = start_time.os_time().as_nanos(),
427 ?delta,
428 "time went backward"
429 );
430 }
431 start_time
432 }
433 }
434 }
435 }
436}
437
438impl InspectMut for VmTimeKeeper {
439 fn inspect_mut(&mut self, req: inspect::Request<'_>) {
440 self.req_send.send(KeeperRequest::Inspect(req.defer()));
441 }
442}
443
444impl VmTimeKeeper {
445 pub fn new(driver: &impl SpawnDriver, uptime: VmTime) -> Self {
447 let (new_send, new_recv) = mesh::mpsc_channel();
448 let (req_send, req_recv) = mesh::channel();
449 let time = TimeState::Stopped(uptime);
450 let task = driver.spawn("vm-time-keeper", async move {
451 let mut primary = PrimaryKeeper {
452 req_recv,
453 new_recv,
454 keepers: Vec::new(),
455 next_id: 0,
456 time,
457 };
458 primary.run().await;
459 });
460 Self {
461 time,
462 req_send,
463 builder: VmTimeSourceBuilder { new_send },
464 _task: task,
465 }
466 }
467
468 pub fn save(&self) -> SavedState {
470 SavedState {
471 vmtime: self.time.stop_time().expect("should be stopped"),
472 }
473 }
474
475 pub async fn restore(&mut self, state: SavedState) {
477 let SavedState { vmtime } = state;
478 self.reset_to(vmtime).await
479 }
480
481 async fn reset_to(&mut self, vmtime: VmTime) {
482 assert!(!self.time.is_started(), "should be stopped");
483 self.time = TimeState::Stopped(vmtime);
484 self.req_send
485 .call(KeeperRequest::Reset, vmtime)
486 .await
487 .unwrap();
488 }
489
490 pub async fn reset(&mut self) {
492 self.reset_to(VmTime::from_100ns(0)).await
493 }
494
495 pub async fn start(&mut self) {
497 let vmtime = self.time.stop_time().expect("should be stopped");
498 let timestamp = Timestamp::new(vmtime, Instant::now());
499 self.time = TimeState::Started(timestamp);
500 self.req_send
501 .call(KeeperRequest::Start, timestamp)
502 .await
503 .unwrap();
504 }
505
506 pub async fn stop(&mut self) {
508 assert!(self.time.is_started(), "should be running");
509 let stop_time = self.req_send.call(KeeperRequest::Stop, ()).await.unwrap();
510 self.time = TimeState::Stopped(stop_time);
511 }
512
513 pub fn builder(&self) -> &VmTimeSourceBuilder {
516 &self.builder
517 }
518}
519
520#[derive(MeshPayload, Clone, Debug)]
530pub struct VmTimeSourceBuilder {
531 new_send: mesh::Sender<NewKeeperRequest>,
532}
533
534#[derive(Debug, Error)]
537#[error("the time keeper has been torn down")]
538pub struct TimeKeeperIsGone;
539
540impl VmTimeSourceBuilder {
541 pub async fn build(&self, driver: &impl SpawnDriver) -> Result<VmTimeSource, TimeKeeperIsGone> {
545 let (send, recv) = mesh::channel();
546 let time = self
547 .new_send
548 .call(NewKeeperRequest::New, send)
549 .await
550 .map_err(|_| TimeKeeperIsGone)?;
551
552 let mut state = Arc::new(RwLock::new(TimerState::new(driver, VmTime::from_100ns(0))));
553 {
555 let state = Arc::get_mut(&mut state).unwrap().get_mut();
556 match time {
557 TimeState::Stopped(vmtime) => state.reset(vmtime),
558 TimeState::Started(timestamp) => state.start(timestamp),
559 }
560 }
561 let mut keeper = SecondaryKeeper {
562 state: state.clone(),
563 recv,
564 };
565 driver
566 .spawn("vm-time", async move { keeper.run().await })
567 .detach();
568 Ok(VmTimeSource {
569 state,
570 remote: self.clone(),
571 })
572 }
573}
574
575#[derive(Inspect)]
580#[inspect(extra = "Self::inspect_extra")]
581struct PrimaryKeeper {
582 #[inspect(skip)]
583 req_recv: mesh::Receiver<KeeperRequest>,
584 #[inspect(skip)]
585 new_recv: mesh::Receiver<NewKeeperRequest>,
586 #[inspect(skip)]
587 keepers: Vec<(u64, mesh::Sender<KeeperRequest>)>,
588 #[inspect(skip)]
589 next_id: u64,
590 time: TimeState,
591}
592
593#[derive(MeshPayload)]
594enum KeeperRequest {
595 Start(Rpc<Timestamp, ()>),
596 Stop(Rpc<(), VmTime>),
597 Reset(Rpc<VmTime, ()>),
598 Inspect(inspect::Deferred),
599}
600
601#[derive(MeshPayload)]
602enum NewKeeperRequest {
603 New(Rpc<mesh::Sender<KeeperRequest>, TimeState>),
604}
605
606impl PrimaryKeeper {
607 fn inspect_extra(&self, resp: &mut inspect::Response<'_>) {
608 resp.fields(
609 "keepers",
610 self.keepers
611 .iter()
612 .map(|&(id, ref s)| (id, adhoc(|req| s.send(KeeperRequest::Inspect(req.defer()))))),
613 );
614 }
615
616 async fn run(&mut self) {
617 enum Event {
618 New(NewKeeperRequest),
619 Request(KeeperRequest),
620 }
621
622 while let Some(event) = (
623 (&mut self.new_recv).map(Event::New),
624 (&mut self.req_recv).map(Event::Request),
625 )
626 .merge()
627 .next()
628 .await
629 {
630 self.keepers.retain(|(_, s)| !s.is_closed());
632 match event {
633 Event::New(req) => match req {
634 NewKeeperRequest::New(rpc) => rpc.handle_sync(|sender| {
635 self.keepers.push((self.next_id, sender));
636 self.next_id += 1;
637 self.time
638 }),
639 },
640 Event::Request(req) => {
641 match req {
642 KeeperRequest::Start(rpc) => {
643 rpc.handle(async |start_time| {
644 assert!(!self.time.is_started());
645 self.time = TimeState::Started(start_time);
646 join_all(self.keepers.iter().map(|(_, sender)| {
647 sender.call(KeeperRequest::Start, start_time)
648 }))
649 .await;
650 })
651 .await
652 }
653 KeeperRequest::Stop(rpc) => {
654 rpc.handle(async |()| {
655 let results = join_all(
656 self.keepers
657 .iter()
658 .map(|(_, sender)| sender.call(KeeperRequest::Stop, ())),
659 )
660 .await;
661
662 let start_time = self.time.start_time().expect("should be running");
663 let now = start_time
664 .vmtime
665 .wrapping_add(Instant::now() - start_time.os_time());
666
667 let stop_time = results
671 .into_iter()
672 .filter_map(|r| r.ok())
673 .fold(now, |a, b| a.max(b));
674
675 self.time = TimeState::Stopped(stop_time);
676
677 join_all(self.keepers.iter().map(|(_, sender)| {
680 sender.call(KeeperRequest::Reset, stop_time)
681 }))
682 .await;
683
684 stop_time
685 })
686 .await
687 }
688 KeeperRequest::Reset(rpc) => {
689 rpc.handle(async |time| {
690 assert!(!self.time.is_started(), "should not be running");
691 self.time = TimeState::Stopped(time);
692 join_all(
693 self.keepers
694 .iter()
695 .map(|(_, sender)| sender.call(KeeperRequest::Reset, time)),
696 )
697 .await;
698 })
699 .await
700 }
701 KeeperRequest::Inspect(deferred) => deferred.inspect(&self),
702 }
703 }
704 }
705 }
706 }
707}
708
709#[derive(InspectMut)]
714struct SecondaryKeeper {
715 #[inspect(flatten)]
716 state: Arc<RwLock<TimerState>>,
717 #[inspect(skip)]
718 recv: mesh::Receiver<KeeperRequest>,
719}
720
721impl SecondaryKeeper {
722 async fn run(&mut self) {
723 loop {
724 let r = {
725 let state = &self.state;
726 (
727 self.recv.next(),
728 poll_fn(|cx| {
729 state.write().poll(cx);
730 Poll::Pending
731 }),
732 )
733 .race()
734 .await
735 };
736 match r {
737 Some(req) => match req {
738 KeeperRequest::Start(rpc) => rpc.handle_sync(|start_time| {
739 let mut state = self.state.write();
740 state.start(start_time);
741 }),
742 KeeperRequest::Reset(rpc) => rpc.handle_sync(|vmtime| {
743 let mut state = self.state.write();
744 state.reset(vmtime);
745 }),
746 KeeperRequest::Stop(rpc) => rpc.handle_sync(|()| {
747 let mut state = self.state.write();
748 state.stop(Instant::now())
749 }),
750 KeeperRequest::Inspect(deferred) => deferred.inspect(&mut *self),
751 },
752 None => break,
753 }
754 }
755 }
756}
757
758#[derive(Clone)]
760pub struct VmTimeSource {
761 state: Arc<RwLock<TimerState>>,
762 remote: VmTimeSourceBuilder,
763}
764
765impl VmTimeSource {
766 pub fn access(&self, name: impl Into<Arc<str>>) -> VmTimeAccess {
770 let name = name.into();
771 VmTimeAccess {
772 timeout: None,
773 waiting: false,
774 index: self
775 .state
776 .write()
777 .waiters
778 .insert(WaiterState::new(name.clone())),
779 state: self.state.clone(),
780 name,
781 }
782 }
783
784 pub fn builder(&self) -> &VmTimeSourceBuilder {
787 &self.remote
788 }
789}
790
791#[derive(Inspect)]
793pub struct VmTimeAccess {
794 timeout: Option<VmTime>,
795 waiting: bool,
796 #[inspect(skip)]
797 index: usize,
798 #[inspect(skip)]
799 state: Arc<RwLock<TimerState>>,
800 name: Arc<str>,
801}
802
803impl Drop for VmTimeAccess {
804 fn drop(&mut self) {
805 self.state.write().waiters.remove(self.index);
806 }
807}
808
809impl VmTimeAccess {
810 pub fn now(&self) -> VmTime {
812 let now = Instant::now();
813 self.state.read().now(now).vmtime
814 }
815
816 pub fn host_time(&self, time: VmTime) -> Option<Instant> {
823 Some(self.state.read().timestamp(time)?.os_time())
824 }
825
826 pub fn get_timeout(&self) -> Option<VmTime> {
828 self.timeout
829 }
830
831 pub fn set_timeout(&mut self, time: VmTime) {
833 self.timeout = Some(time);
834 if self.waiting {
835 self.state.write().update_timeout(self.index, time);
836 }
837 }
838
839 pub fn set_timeout_if_before(&mut self, time: VmTime) {
842 if self.timeout.is_none_or(|timeout| time.is_before(timeout)) {
843 self.set_timeout(time);
844 }
845 }
846
847 pub fn cancel_timeout(&mut self) {
849 if self.waiting && self.timeout.is_some() {
850 self.state.write().cancel_timeout(self.index);
851 }
852 self.timeout = None;
853 }
854
855 pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
867 let now = Instant::now();
868 match self
869 .state
870 .write()
871 .poll_timeout(cx, self.index, now, self.timeout)
872 {
873 Poll::Ready(now) => {
874 self.waiting = false;
875 self.timeout = None;
876 Poll::Ready(now.vmtime)
877 }
878 Poll::Pending => {
879 self.waiting = true;
880 Poll::Pending
881 }
882 }
883 }
884}
885
886#[derive(Debug, Inspect)]
887#[inspect(tag = "state")]
888enum VmTimerPeriodicInner {
889 Stopped,
890 Running {
891 last_timeout: VmTime,
892 #[inspect(debug)]
893 period: Duration,
894 },
895}
896
897#[derive(Inspect)]
900pub struct VmTimerPeriodic {
901 vmtime: VmTimeAccess,
902 inner: VmTimerPeriodicInner,
903}
904
905impl VmTimerPeriodic {
906 pub fn new(vmtime_access: VmTimeAccess) -> Self {
908 Self {
909 vmtime: vmtime_access,
910 inner: VmTimerPeriodicInner::Stopped,
911 }
912 }
913
914 pub fn cancel(&mut self) {
918 self.vmtime.cancel_timeout();
919 self.inner = VmTimerPeriodicInner::Stopped;
920 }
921
922 pub fn start(&mut self, period: Duration) {
927 self.cancel();
928
929 let time = self.vmtime.now().wrapping_add(period);
930 self.vmtime.set_timeout(time);
931 self.inner = VmTimerPeriodicInner::Running {
932 last_timeout: time,
933 period,
934 }
935 }
936
937 pub fn is_running(&self) -> bool {
939 matches!(self.inner, VmTimerPeriodicInner::Running { .. })
940 }
941
942 pub fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Poll<VmTime> {
947 match self.inner {
948 VmTimerPeriodicInner::Stopped => {
949 assert_eq!(self.vmtime.get_timeout(), None);
950 self.vmtime.poll_timeout(cx)
953 }
954 VmTimerPeriodicInner::Running {
955 ref mut last_timeout,
956 period,
957 } => {
958 let mut res = Poll::Pending;
959 while let Poll::Ready(now) = self.vmtime.poll_timeout(cx) {
960 res = Poll::Ready(now);
961
962 let time = last_timeout.wrapping_add(period);
963 self.vmtime.set_timeout(time);
964 *last_timeout = time;
965 }
966 res
967 }
968 }
969 }
970}
971
972#[cfg(test)]
973mod tests {
974 use super::VmTime;
975 use super::VmTimeKeeper;
976 use futures::FutureExt;
977 use pal_async::DefaultDriver;
978 use pal_async::async_test;
979 use pal_async::timer::PolledTimer;
980 use std::future::poll_fn;
981 use std::time::Duration;
982
983 #[async_test]
984 async fn test_vmtime(driver: DefaultDriver) {
985 let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
986 let mut access = keeper
987 .builder()
988 .build(&driver)
989 .await
990 .unwrap()
991 .access("test");
992 keeper.start().await;
993
994 access.set_timeout(access.now().wrapping_add(Duration::from_secs(1000)));
996 let mut timer = PolledTimer::new(&driver);
997 futures::select! {
998 _ = timer.sleep(Duration::from_millis(50)).fuse() => {}
999 _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
1000 }
1001
1002 let deadline = access.now().wrapping_add(Duration::from_millis(10));
1004 access.set_timeout(deadline);
1005 futures::select! {
1006 _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1007 now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1008 assert!(now.is_after(deadline));
1009 }
1010 }
1011 assert!(
1013 poll_fn(|cx| access.poll_timeout(cx))
1014 .now_or_never()
1015 .is_none()
1016 );
1017
1018 let now = access.now();
1020 let deadline = now.wrapping_add(Duration::from_millis(2000));
1021 access.set_timeout(deadline);
1022 futures::select! {
1023 _ = timer.sleep(Duration::from_millis(30)).fuse() => {
1024 let deadline = now.wrapping_add(Duration::from_millis(50));
1025 access.set_timeout(deadline);
1026 futures::select! {
1027 _ = timer.sleep(Duration::from_millis(1000)).fuse() => panic!("unexpected timeout"),
1028 now = poll_fn(|cx| access.poll_timeout(cx)).fuse() => {
1029 assert!(now.is_after(deadline));
1030 }
1031 }
1032 }
1033 _ = poll_fn(|cx| access.poll_timeout(cx)).fuse() => panic!("unexpected wait completion"),
1034 }
1035 keeper.stop().await;
1036 }
1037
1038 #[async_test]
1039 async fn test_multi_vmtime(driver: DefaultDriver) {
1040 let mut keeper = VmTimeKeeper::new(&driver, VmTime::from_100ns(0));
1041 let src1 = keeper.builder().build(&driver).await.unwrap();
1042 keeper.start().await;
1043 let src2 = src1.builder().build(&driver).await.unwrap();
1044 let acc1 = src1.access("test");
1045 let acc2 = src2.access("test");
1046 {
1047 let t1 = acc1.now();
1048 let t2 = acc2.now();
1049 let t3 = acc1.now();
1050 assert!(!t2.is_before(t1), "{t1:?} {t2:?}");
1051 assert!(!t3.is_before(t2), "{t2:?} {t3:?}");
1052 }
1053 let now = acc1.now();
1054 keeper.stop().await;
1055 let t1 = acc1.now();
1056 let t2 = acc2.now();
1057 assert!(!t1.is_before(now));
1058 assert_eq!(t1, t2);
1059 let zero = VmTime::from_100ns(0);
1060 assert_ne!(t1, zero);
1062 keeper.reset().await;
1063 assert_eq!(acc1.now(), zero);
1064 assert_eq!(acc2.now(), zero);
1065 }
1066}