pal_async/
timer.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Timer-related functionality.
5
6use crate::driver::Driver;
7use crate::driver::PollImpl;
8use crate::sparsevec::SparseVec;
9use crate::waker::WakerList;
10use std::future::Future;
11use std::pin::Pin;
12use std::task::Context;
13use std::task::Poll;
14use std::task::Waker;
15use std::time::Duration;
16
17/// Instant represents a number of nanoseconds since some process-specific
18/// epoch.
19///
20/// On Windows this is backed by QueryUnbiasedInterruptTimePrecise. On
21/// Linux this is backed by CLOCK_MONOTONIC.
22///
23/// This is modeled after std::time::Instant but uses a different clock source
24/// on Windows, and it allows access to the raw value.
25#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
26pub struct Instant(u64);
27
28impl Instant {
29    /// The current time as measured by this clock.
30    pub fn now() -> Self {
31        Self(crate::sys::monotonic_nanos_now())
32    }
33
34    /// Returns the value of the underlying clock in nanosecond resolution.
35    pub fn as_nanos(&self) -> u64 {
36        self.0
37    }
38
39    /// Creates an instant from an underlying clock value in nanosecond resolution.
40    pub fn from_nanos(nanos: u64) -> Self {
41        Self(nanos)
42    }
43
44    /// Adds a duration to this instant, saturating at the maximum value of the
45    /// clock.
46    pub fn saturating_add(self, duration: Duration) -> Self {
47        Self(
48            self.0
49                .saturating_add(duration.as_nanos().try_into().unwrap_or(u64::MAX)),
50        )
51    }
52}
53
54impl std::ops::Sub for Instant {
55    type Output = Duration;
56    fn sub(self, rhs: Instant) -> Self::Output {
57        Duration::from_nanos(
58            self.0.checked_sub(rhs.0).unwrap_or_else(|| {
59                panic!("supplied instant {:#x} is later than {:#x}", rhs.0, self.0)
60            }),
61        )
62    }
63}
64
65impl std::ops::Add<Duration> for Instant {
66    type Output = Instant;
67    fn add(self, rhs: Duration) -> Self::Output {
68        Self(
69            self.0
70                .checked_add(rhs.as_nanos().try_into().expect("duration too large"))
71                .expect("supplied duration causes overflow"),
72        )
73    }
74}
75
76impl std::ops::Sub<Duration> for Instant {
77    type Output = Instant;
78    fn sub(self, rhs: Duration) -> Self::Output {
79        Self(
80            self.0
81                .checked_sub(rhs.as_nanos().try_into().expect("duration too large"))
82                .expect("supplied instant is later than self"),
83        )
84    }
85}
86
87/// A trait for driving timers.
88pub trait TimerDriver: Unpin {
89    /// The timer type.
90    type Timer: 'static + PollTimer;
91
92    /// Returns a new timer.
93    fn new_timer(&self) -> Self::Timer;
94}
95
96/// A trait for polling timers.
97pub trait PollTimer: Unpin + Send + Sync {
98    /// Polls the timer, optionally updating the deadline first.
99    ///
100    /// Returns ready with the current time when the set deadline <=
101    /// `Instant::now()`.
102    fn poll_timer(&mut self, cx: &mut Context<'_>, deadline: Option<Instant>) -> Poll<Instant>;
103
104    /// Updates the timer's deadline.
105    fn set_deadline(&mut self, deadline: Instant);
106}
107
108/// An asynchronous timer.
109pub struct PolledTimer(PollImpl<dyn PollTimer>);
110
111impl PolledTimer {
112    /// Creates a new timer.
113    pub fn new(driver: &(impl ?Sized + Driver)) -> Self {
114        Self(driver.new_dyn_timer())
115    }
116
117    /// Delays the current task for `duration`.
118    pub fn sleep(&mut self, duration: Duration) -> Sleep<'_> {
119        self.sleep_until(Instant::now() + duration)
120    }
121
122    /// Delays the current task until `deadline`.
123    pub fn sleep_until(&mut self, deadline: Instant) -> Sleep<'_> {
124        self.0.set_deadline(deadline);
125        Sleep {
126            timer: self,
127            deadline,
128        }
129    }
130
131    /// Returns `Pending` until the current time is later than `deadline`. Then
132    /// returns `Ready` with the current time.
133    pub fn poll_until(&mut self, cx: &mut Context<'_>, deadline: Instant) -> Poll<Instant> {
134        self.0.poll_timer(cx, Some(deadline))
135    }
136}
137
138/// [`Future`] implementation for [`PolledTimer::sleep`] and
139/// [`PolledTimer::sleep_until`].
140#[must_use]
141pub struct Sleep<'a> {
142    timer: &'a mut PolledTimer,
143    deadline: Instant,
144}
145
146impl Future for Sleep<'_> {
147    type Output = Instant;
148
149    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
150        let deadline = self.deadline;
151        self.timer.0.poll_timer(cx, Some(deadline))
152    }
153}
154
155#[derive(Debug)]
156struct TimerEntry {
157    deadline: Instant,
158    waker: Option<Waker>,
159}
160
161/// A queue of timers.
162///
163/// Used to multiplex multiple timers on a single timeout operation.
164#[derive(Debug, Default)]
165pub(crate) struct TimerQueue {
166    timers: SparseVec<TimerEntry>,
167}
168
169/// A timer ID.
170#[derive(Debug, Copy, Clone)]
171pub(crate) struct TimerQueueId(usize);
172
173pub(crate) enum TimerResult {
174    TimedOut(Instant),
175    Pending(Instant),
176}
177
178impl TimerQueue {
179    /// Adds a new timer.
180    pub fn add(&mut self) -> TimerQueueId {
181        TimerQueueId(self.timers.add(TimerEntry {
182            deadline: Instant::from_nanos(0),
183            waker: None,
184        }))
185    }
186
187    /// Removes a timer.
188    ///
189    /// Don't wake the returned waker, just ensure it's not dropped while
190    /// holding a lock.
191    #[must_use]
192    pub fn remove(&mut self, id: TimerQueueId) -> Option<Waker> {
193        self.timers.remove(id.0).waker
194    }
195
196    /// Polls a timer for completion.
197    pub fn poll_deadline(&mut self, cx: &mut Context<'_>, id: TimerQueueId) -> TimerResult {
198        let timer = &mut self.timers[id.0];
199        let now = Instant::now();
200        if timer.deadline <= now {
201            TimerResult::TimedOut(now)
202        } else {
203            let waker = cx.waker();
204            if let Some(old_waker) = &mut timer.waker {
205                old_waker.clone_from(waker);
206            } else {
207                timer.waker = Some(waker.clone());
208            }
209            TimerResult::Pending(timer.deadline)
210        }
211    }
212
213    /// Sets the deadline for a timer.
214    ///
215    /// Returns true if the backing timer may need to be adjusted.
216    pub fn set_deadline(&mut self, id: TimerQueueId, deadline: Instant) -> bool {
217        let timer = &mut self.timers[id.0];
218        let update = timer.waker.is_some() && timer.deadline > deadline;
219        timer.deadline = deadline;
220        update
221    }
222
223    /// Returns wakers for any expired timers.
224    pub fn wake_expired(&mut self, wakers: &mut WakerList) {
225        let mut now = None;
226        wakers.extend(self.timers.iter_mut().filter_map(|(_, timer)| {
227            if timer.waker.is_some() && timer.deadline <= *now.get_or_insert_with(Instant::now) {
228                let waker = timer.waker.take().unwrap();
229                Some(waker)
230            } else {
231                None
232            }
233        }))
234    }
235
236    /// Returns the deadline of the next timer, or `None` if there are no unexpired timers.
237    pub fn next_deadline(&self) -> Option<Instant> {
238        self.timers
239            .iter()
240            .filter_map(|(_, entry)| entry.waker.is_some().then_some(entry.deadline))
241            .min()
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::Instant;
248    use std::time::Duration;
249
250    #[test]
251    fn test_instant() {
252        let start = Instant::now();
253        std::thread::sleep(Duration::from_millis(100));
254        let end = Instant::now();
255        assert!(end - start >= Duration::from_millis(100));
256        assert!(end - start < Duration::from_millis(400));
257    }
258}