pal_async/
timer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! Timer-related functionality.

use crate::driver::Driver;
use crate::driver::PollImpl;
use crate::sparsevec::SparseVec;
use crate::waker::WakerList;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
use std::time::Duration;

/// Instant represents a number of nanoseconds since some process-specific
/// epoch.
///
/// On Windows this is backed by QueryUnbiasedInterruptTimePrecise. On
/// Linux this is backed by CLOCK_MONOTONIC.
///
/// This is modeled after std::time::Instant but uses a different clock source
/// on Windows, and it allows access to the raw value.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Instant(u64);

impl Instant {
    /// The current time as measured by this clock.
    pub fn now() -> Self {
        Self(crate::sys::monotonic_nanos_now())
    }

    /// Returns the value of the underlying clock in nanosecond resolution.
    pub fn as_nanos(&self) -> u64 {
        self.0
    }

    /// Creates an instant from an underlying clock value in nanosecond resolution.
    pub fn from_nanos(nanos: u64) -> Self {
        Self(nanos)
    }

    /// Adds a duration to this instant, saturating at the maximum value of the
    /// clock.
    pub fn saturating_add(self, duration: Duration) -> Self {
        Self(
            self.0
                .saturating_add(duration.as_nanos().try_into().unwrap_or(u64::MAX)),
        )
    }
}

impl std::ops::Sub for Instant {
    type Output = Duration;
    fn sub(self, rhs: Instant) -> Self::Output {
        Duration::from_nanos(
            self.0.checked_sub(rhs.0).unwrap_or_else(|| {
                panic!("supplied instant {:#x} is later than {:#x}", rhs.0, self.0)
            }),
        )
    }
}

impl std::ops::Add<Duration> for Instant {
    type Output = Instant;
    fn add(self, rhs: Duration) -> Self::Output {
        Self(
            self.0
                .checked_add(rhs.as_nanos().try_into().expect("duration too large"))
                .expect("supplied duration causes overflow"),
        )
    }
}

impl std::ops::Sub<Duration> for Instant {
    type Output = Instant;
    fn sub(self, rhs: Duration) -> Self::Output {
        Self(
            self.0
                .checked_sub(rhs.as_nanos().try_into().expect("duration too large"))
                .expect("supplied instant is later than self"),
        )
    }
}

/// A trait for driving timers.
pub trait TimerDriver: Unpin {
    /// The timer type.
    type Timer: 'static + PollTimer;

    /// Returns a new timer.
    fn new_timer(&self) -> Self::Timer;
}

/// A trait for polling timers.
pub trait PollTimer: Unpin + Send + Sync {
    /// Polls the timer, optionally updating the deadline first.
    ///
    /// Returns ready with the current time when the set deadline <=
    /// `Instant::now()`.
    fn poll_timer(&mut self, cx: &mut Context<'_>, deadline: Option<Instant>) -> Poll<Instant>;

    /// Updates the timer's deadline.
    fn set_deadline(&mut self, deadline: Instant);
}

/// An asynchronous timer.
pub struct PolledTimer(PollImpl<dyn PollTimer>);

impl PolledTimer {
    /// Creates a new timer.
    pub fn new(driver: &(impl ?Sized + Driver)) -> Self {
        Self(driver.new_dyn_timer())
    }

    /// Delays the current task for `duration`.
    pub fn sleep(&mut self, duration: Duration) -> Sleep<'_> {
        self.sleep_until(Instant::now() + duration)
    }

    /// Delays the current task until `deadline`.
    pub fn sleep_until(&mut self, deadline: Instant) -> Sleep<'_> {
        self.0.set_deadline(deadline);
        Sleep {
            timer: self,
            deadline,
        }
    }
}

/// [`Future`] implementation for [`PolledTimer::sleep`] and
/// [`PolledTimer::sleep_until`].
#[must_use]
pub struct Sleep<'a> {
    timer: &'a mut PolledTimer,
    deadline: Instant,
}

impl Future for Sleep<'_> {
    type Output = Instant;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let deadline = self.deadline;
        self.timer.0.poll_timer(cx, Some(deadline))
    }
}

#[derive(Debug)]
struct TimerEntry {
    deadline: Instant,
    waker: Option<Waker>,
}

/// A queue of timers.
///
/// Used to multiplex multiple timers on a single timeout operation.
#[derive(Debug, Default)]
pub(crate) struct TimerQueue {
    timers: SparseVec<TimerEntry>,
}

/// A timer ID.
#[derive(Debug, Copy, Clone)]
pub(crate) struct TimerQueueId(usize);

pub(crate) enum TimerResult {
    TimedOut(Instant),
    Pending(Instant),
}

impl TimerQueue {
    /// Adds a new timer.
    pub fn add(&mut self) -> TimerQueueId {
        TimerQueueId(self.timers.add(TimerEntry {
            deadline: Instant::from_nanos(0),
            waker: None,
        }))
    }

    /// Removes a timer.
    ///
    /// Don't wake the returned waker, just ensure it's not dropped while
    /// holding a lock.
    #[must_use]
    pub fn remove(&mut self, id: TimerQueueId) -> Option<Waker> {
        self.timers.remove(id.0).waker
    }

    /// Polls a timer for completion.
    pub fn poll_deadline(&mut self, cx: &mut Context<'_>, id: TimerQueueId) -> TimerResult {
        let timer = &mut self.timers[id.0];
        let now = Instant::now();
        if timer.deadline <= now {
            TimerResult::TimedOut(now)
        } else {
            let waker = cx.waker();
            if let Some(old_waker) = &mut timer.waker {
                old_waker.clone_from(waker);
            } else {
                timer.waker = Some(waker.clone());
            }
            TimerResult::Pending(timer.deadline)
        }
    }

    /// Sets the deadline for a timer.
    ///
    /// Returns true if the backing timer may need to be adjusted.
    pub fn set_deadline(&mut self, id: TimerQueueId, deadline: Instant) -> bool {
        let timer = &mut self.timers[id.0];
        let update = timer.waker.is_some() && timer.deadline > deadline;
        timer.deadline = deadline;
        update
    }

    /// Returns wakers for any expired timers.
    pub fn wake_expired(&mut self, wakers: &mut WakerList) {
        let mut now = None;
        wakers.extend(self.timers.iter_mut().filter_map(|(_, timer)| {
            if timer.waker.is_some() && timer.deadline <= *now.get_or_insert_with(Instant::now) {
                let waker = timer.waker.take().unwrap();
                Some(waker)
            } else {
                None
            }
        }))
    }

    /// Returns the deadline of the next timer, or `None` if there are no unexpired timers.
    pub fn next_deadline(&self) -> Option<Instant> {
        self.timers
            .iter()
            .filter_map(|(_, entry)| entry.waker.is_some().then_some(entry.deadline))
            .min()
    }
}

#[cfg(test)]
mod tests {
    use super::Instant;
    use std::time::Duration;

    #[test]
    fn test_instant() {
        let start = Instant::now();
        std::thread::sleep(Duration::from_millis(100));
        let end = Instant::now();
        assert!(end - start >= Duration::from_millis(100));
        assert!(end - start < Duration::from_millis(400));
    }
}