pal_async/
timer.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Timer-related functionality.
5
6use crate::driver::Driver;
7use crate::driver::PollImpl;
8use crate::sparsevec::SparseVec;
9use crate::waker::WakerList;
10use std::future::Future;
11use std::pin::Pin;
12use std::task::Context;
13use std::task::Poll;
14use std::task::Waker;
15use std::time::Duration;
16
17/// Instant represents a number of nanoseconds since some process-specific
18/// epoch.
19///
20/// On Windows this is backed by QueryUnbiasedInterruptTimePrecise. On
21/// Linux this is backed by CLOCK_MONOTONIC.
22///
23/// This is modeled after std::time::Instant but uses a different clock source
24/// on Windows, and it allows access to the raw value.
25#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
26pub struct Instant(u64);
27
28impl Instant {
29    /// The current time as measured by this clock.
30    pub fn now() -> Self {
31        Self(crate::sys::monotonic_nanos_now())
32    }
33
34    /// Returns the value of the underlying clock in nanosecond resolution.
35    pub fn as_nanos(&self) -> u64 {
36        self.0
37    }
38
39    /// Creates an instant from an underlying clock value in nanosecond resolution.
40    pub fn from_nanos(nanos: u64) -> Self {
41        Self(nanos)
42    }
43
44    /// Adds a duration to this instant, saturating at the maximum value of the
45    /// clock.
46    pub fn saturating_add(self, duration: Duration) -> Self {
47        Self(
48            self.0
49                .saturating_add(duration.as_nanos().try_into().unwrap_or(u64::MAX)),
50        )
51    }
52
53    /// Calculate the duration between this instant and another instant,
54    /// saturating at zero if the other instant is later than this one.
55    pub fn saturating_sub(self, rhs: Instant) -> Duration {
56        Duration::from_nanos(self.0.saturating_sub(rhs.0))
57    }
58}
59
60impl std::ops::Sub for Instant {
61    type Output = Duration;
62    fn sub(self, rhs: Instant) -> Self::Output {
63        Duration::from_nanos(
64            self.0.checked_sub(rhs.0).unwrap_or_else(|| {
65                panic!("supplied instant {:#x} is later than {:#x}", rhs.0, self.0)
66            }),
67        )
68    }
69}
70
71impl std::ops::Add<Duration> for Instant {
72    type Output = Instant;
73    fn add(self, rhs: Duration) -> Self::Output {
74        Self(
75            self.0
76                .checked_add(rhs.as_nanos().try_into().expect("duration too large"))
77                .expect("supplied duration causes overflow"),
78        )
79    }
80}
81
82impl std::ops::Sub<Duration> for Instant {
83    type Output = Instant;
84    fn sub(self, rhs: Duration) -> Self::Output {
85        Self(
86            self.0
87                .checked_sub(rhs.as_nanos().try_into().expect("duration too large"))
88                .expect("supplied instant is later than self"),
89        )
90    }
91}
92
93/// A trait for driving timers.
94pub trait TimerDriver: Unpin {
95    /// The timer type.
96    type Timer: 'static + PollTimer;
97
98    /// Returns a new timer.
99    fn new_timer(&self) -> Self::Timer;
100}
101
102/// A trait for polling timers.
103pub trait PollTimer: Unpin + Send + Sync {
104    /// Polls the timer, optionally updating the deadline first.
105    ///
106    /// Returns ready with the current time when the set deadline <=
107    /// `Instant::now()`.
108    fn poll_timer(&mut self, cx: &mut Context<'_>, deadline: Option<Instant>) -> Poll<Instant>;
109
110    /// Updates the timer's deadline.
111    fn set_deadline(&mut self, deadline: Instant);
112}
113
114/// An asynchronous timer.
115pub struct PolledTimer(PollImpl<dyn PollTimer>);
116
117impl PolledTimer {
118    /// Creates a new timer.
119    pub fn new(driver: &(impl ?Sized + Driver)) -> Self {
120        Self(driver.new_dyn_timer())
121    }
122
123    /// Delays the current task for `duration`.
124    pub fn sleep(&mut self, duration: Duration) -> Sleep<'_> {
125        self.sleep_until(Instant::now() + duration)
126    }
127
128    /// Delays the current task until `deadline`.
129    pub fn sleep_until(&mut self, deadline: Instant) -> Sleep<'_> {
130        self.0.set_deadline(deadline);
131        Sleep {
132            timer: self,
133            deadline,
134        }
135    }
136
137    /// Returns `Pending` until the current time is later than `deadline`. Then
138    /// returns `Ready` with the current time.
139    pub fn poll_until(&mut self, cx: &mut Context<'_>, deadline: Instant) -> Poll<Instant> {
140        self.0.poll_timer(cx, Some(deadline))
141    }
142}
143
144/// [`Future`] implementation for [`PolledTimer::sleep`] and
145/// [`PolledTimer::sleep_until`].
146#[must_use]
147pub struct Sleep<'a> {
148    timer: &'a mut PolledTimer,
149    deadline: Instant,
150}
151
152impl Future for Sleep<'_> {
153    type Output = Instant;
154
155    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156        let deadline = self.deadline;
157        self.timer.0.poll_timer(cx, Some(deadline))
158    }
159}
160
161#[derive(Debug)]
162struct TimerEntry {
163    deadline: Instant,
164    waker: Option<Waker>,
165}
166
167/// A queue of timers.
168///
169/// Used to multiplex multiple timers on a single timeout operation.
170#[derive(Debug, Default)]
171pub(crate) struct TimerQueue {
172    timers: SparseVec<TimerEntry>,
173}
174
175/// A timer ID.
176#[derive(Debug, Copy, Clone)]
177pub(crate) struct TimerQueueId(usize);
178
179pub(crate) enum TimerResult {
180    TimedOut(Instant),
181    Pending(Instant),
182}
183
184impl TimerQueue {
185    /// Adds a new timer.
186    pub fn add(&mut self) -> TimerQueueId {
187        TimerQueueId(self.timers.add(TimerEntry {
188            deadline: Instant::from_nanos(0),
189            waker: None,
190        }))
191    }
192
193    /// Removes a timer.
194    ///
195    /// Don't wake the returned waker, just ensure it's not dropped while
196    /// holding a lock.
197    #[must_use]
198    pub fn remove(&mut self, id: TimerQueueId) -> Option<Waker> {
199        self.timers.remove(id.0).waker
200    }
201
202    /// Polls a timer for completion.
203    pub fn poll_deadline(&mut self, cx: &mut Context<'_>, id: TimerQueueId) -> TimerResult {
204        let timer = &mut self.timers[id.0];
205        let now = Instant::now();
206        if timer.deadline <= now {
207            TimerResult::TimedOut(now)
208        } else {
209            let waker = cx.waker();
210            if let Some(old_waker) = &mut timer.waker {
211                old_waker.clone_from(waker);
212            } else {
213                timer.waker = Some(waker.clone());
214            }
215            TimerResult::Pending(timer.deadline)
216        }
217    }
218
219    /// Sets the deadline for a timer.
220    ///
221    /// Returns true if the backing timer may need to be adjusted.
222    pub fn set_deadline(&mut self, id: TimerQueueId, deadline: Instant) -> bool {
223        let timer = &mut self.timers[id.0];
224        let update = timer.waker.is_some() && timer.deadline > deadline;
225        timer.deadline = deadline;
226        update
227    }
228
229    /// Returns wakers for any expired timers.
230    pub fn wake_expired(&mut self, wakers: &mut WakerList) {
231        let mut now = None;
232        wakers.extend(self.timers.iter_mut().filter_map(|(_, timer)| {
233            if timer.waker.is_some() && timer.deadline <= *now.get_or_insert_with(Instant::now) {
234                let waker = timer.waker.take().unwrap();
235                Some(waker)
236            } else {
237                None
238            }
239        }))
240    }
241
242    /// Returns the deadline of the next timer, or `None` if there are no unexpired timers.
243    pub fn next_deadline(&self) -> Option<Instant> {
244        self.timers
245            .iter()
246            .filter_map(|(_, entry)| entry.waker.is_some().then_some(entry.deadline))
247            .min()
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::Instant;
254    use std::time::Duration;
255
256    #[test]
257    fn test_instant() {
258        let start = Instant::now();
259        std::thread::sleep(Duration::from_millis(100));
260        let end = Instant::now();
261        assert!(end - start >= Duration::from_millis(100));
262        assert!(end - start < Duration::from_millis(400));
263    }
264}