hyperv_ic/
timesync.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! The timesync IC.
5//!
6//! TODO:
7//! * When the device is paused+resumed, this is an indicator that time may have
8//!   stopped for the guest. We should send another sync message to update the
9//!   guest, or potentially just reoffer the vmbus channel like Hyper-V does.
10//! * Saved state support.
11
12use crate::common::IcPipe;
13use crate::common::NegotiateState;
14use crate::common::Versions;
15use async_trait::async_trait;
16use guestmem::GuestMemory;
17use hyperv_ic_protocol::timesync as proto;
18use inspect::Inspect;
19use inspect::InspectMut;
20use pal_async::driver::Driver;
21use pal_async::timer::Instant;
22use pal_async::timer::PolledTimer;
23use std::future::pending;
24use task_control::Cancelled;
25use task_control::StopTask;
26use vmbus_channel::RawAsyncChannel;
27use vmbus_channel::bus::ChannelType;
28use vmbus_channel::bus::OfferParams;
29use vmbus_channel::channel::ChannelOpenError;
30use vmbus_channel::gpadl_ring::GpadlRingMem;
31use vmbus_channel::simple::SaveRestoreSimpleVmbusDevice;
32use vmbus_channel::simple::SimpleVmbusDevice;
33use vmcore::reference_time::ReferenceTimeSource;
34use vmcore::save_restore::NoSavedState;
35use zerocopy::IntoBytes;
36
37const TIMESYNC_VERSIONS: &[hyperv_ic_protocol::Version] = &[proto::TIMESYNC_VERSION_4];
38
39/// Send samples every 5 seconds.
40const SAMPLE_PERIOD: std::time::Duration = std::time::Duration::from_secs(5);
41
42/// Timesync IC device.
43#[derive(InspectMut)]
44#[non_exhaustive]
45pub struct TimesyncIc {
46    #[inspect(skip)]
47    timer: PolledTimer,
48    #[inspect(skip)]
49    ref_time: ReferenceTimeSource,
50}
51
52#[doc(hidden)]
53#[derive(InspectMut)]
54pub struct TimesyncChannel {
55    #[inspect(mut)]
56    pipe: IcPipe,
57    state: ChannelState,
58}
59
60#[derive(Inspect)]
61#[inspect(external_tag)]
62enum ChannelState {
63    Negotiate(#[inspect(rename = "state")] NegotiateState),
64    Ready {
65        versions: Versions,
66        state: ReadyState,
67    },
68    Failed,
69}
70
71#[derive(Inspect)]
72#[inspect(external_tag)]
73enum ReadyState {
74    SleepUntilNextSample {
75        #[inspect(with = "inspect_instant")]
76        next_sample: Instant,
77    },
78    SendMessage {
79        is_sync: bool,
80    },
81    WaitForResponse,
82}
83
84fn inspect_instant(&instant: &Instant) -> inspect::AsDisplay<jiff::Timestamp> {
85    let now = Instant::now();
86    let time = jiff::Timestamp::now();
87    let sd = if now <= instant {
88        jiff::SignedDuration::try_from(instant - now).unwrap()
89    } else {
90        -jiff::SignedDuration::try_from(now - instant).unwrap()
91    };
92    inspect::AsDisplay(time + sd)
93}
94
95impl TimesyncIc {
96    /// Create a new timesync IC.
97    pub fn new(driver: &(impl Driver + ?Sized), ref_time: ReferenceTimeSource) -> Self {
98        Self {
99            timer: PolledTimer::new(driver),
100            ref_time,
101        }
102    }
103}
104
105#[async_trait]
106impl SimpleVmbusDevice for TimesyncIc {
107    type SavedState = NoSavedState;
108    type Runner = TimesyncChannel;
109
110    fn offer(&self) -> OfferParams {
111        OfferParams {
112            interface_name: "timesync_ic".to_owned(),
113            instance_id: proto::INSTANCE_ID,
114            interface_id: proto::INTERFACE_ID,
115            channel_type: ChannelType::Pipe { message_mode: true },
116            ..Default::default()
117        }
118    }
119
120    fn inspect(&mut self, req: inspect::Request<'_>, runner: Option<&mut Self::Runner>) {
121        req.respond().merge(self).merge(runner);
122    }
123
124    fn open(
125        &mut self,
126        channel: RawAsyncChannel<GpadlRingMem>,
127        _guest_memory: GuestMemory,
128    ) -> Result<Self::Runner, ChannelOpenError> {
129        TimesyncChannel::new(channel, None)
130    }
131
132    async fn run(
133        &mut self,
134        stop: &mut StopTask<'_>,
135        runner: &mut Self::Runner,
136    ) -> Result<(), Cancelled> {
137        stop.until_stopped(async { runner.process(self).await })
138            .await
139    }
140
141    fn supports_save_restore(
142        &mut self,
143    ) -> Option<
144        &mut dyn SaveRestoreSimpleVmbusDevice<SavedState = Self::SavedState, Runner = Self::Runner>,
145    > {
146        None
147    }
148}
149
150impl TimesyncChannel {
151    fn new(
152        channel: RawAsyncChannel<GpadlRingMem>,
153        restore_state: Option<ChannelState>,
154    ) -> Result<Self, ChannelOpenError> {
155        let pipe = IcPipe::new(channel)?;
156        Ok(Self {
157            pipe,
158            state: restore_state.unwrap_or(ChannelState::Negotiate(NegotiateState::default())),
159        })
160    }
161
162    async fn process(&mut self, ic: &mut TimesyncIc) -> ! {
163        loop {
164            if let Err(err) = self.process_state_machine(ic).await {
165                tracing::error!(
166                    error = err.as_ref() as &dyn std::error::Error,
167                    "timesync ic error"
168                );
169                self.state = ChannelState::Failed;
170            }
171        }
172    }
173
174    async fn process_state_machine(&mut self, ic: &mut TimesyncIc) -> anyhow::Result<()> {
175        match self.state {
176            ChannelState::Negotiate(ref mut state) => {
177                if let Some(versions) = self.pipe.negotiate(state, TIMESYNC_VERSIONS).await? {
178                    tracelimit::info_ratelimited!(
179                        framework = %versions.framework_version,
180                        version = %versions.message_version,
181                        "timesync versions negotiated"
182                    );
183                    // Send a sync message to provide the initial time.
184                    self.state = ChannelState::Ready {
185                        versions,
186                        state: ReadyState::SendMessage { is_sync: true },
187                    };
188                }
189            }
190            ChannelState::Ready {
191                ref versions,
192                ref mut state,
193            } => match *state {
194                ReadyState::SleepUntilNextSample { next_sample } => {
195                    ic.timer.sleep_until(next_sample).await;
196                    *state = ReadyState::SendMessage { is_sync: false };
197                }
198                ReadyState::SendMessage { is_sync } => {
199                    // Wait for space in the ring before computing the next time to ensure that there is not
200                    // too much drift before the guest sees it.
201                    let message_size = size_of::<hyperv_ic_protocol::Header>()
202                        + size_of::<proto::TimesyncMessageV4>();
203                    self.pipe.pipe.wait_write_ready(message_size).await?;
204
205                    // In case the backend doesn't provide a system time
206                    // snapshot, capture the system time as soon as possible to
207                    // avoid drift.
208                    let r = ic.ref_time.now();
209                    let ref_time = r.ref_time;
210                    let time = r.system_time.unwrap_or_else(jiff::Timestamp::now);
211
212                    let message = proto::TimesyncMessageV4 {
213                        parent_time: ((time.duration_since(proto::EPOCH).as_nanos() / 100) as u64)
214                            .into(),
215                        vm_reference_time: ref_time,
216                        flags: proto::TimesyncFlags::new()
217                            .with_sync(is_sync)
218                            .with_sample(!is_sync),
219                        leap_indicator: 0,
220                        stratum: 0,
221                        reserved: [0; 5],
222                    };
223                    self.pipe
224                        .write_message(
225                            versions,
226                            hyperv_ic_protocol::MessageType::TIME_SYNC,
227                            hyperv_ic_protocol::HeaderFlags::new()
228                                .with_request(true)
229                                .with_transaction(true),
230                            message.as_bytes(),
231                        )
232                        .await?;
233
234                    if is_sync {
235                        tracelimit::info_ratelimited!(%time, ref_time, "sent time sync");
236                    } else {
237                        tracing::debug!(%time, ref_time, "sent time sample");
238                    }
239
240                    // This was sent as a transaction, which is kind of
241                    // pointless (we don't need the response), but Windows
242                    // ignores non-transactional time sync requests.
243                    *state = ReadyState::WaitForResponse;
244                }
245                ReadyState::WaitForResponse => {
246                    self.pipe.read_response().await?;
247                    // Send another sample in a few seconds.
248                    *state = ReadyState::SleepUntilNextSample {
249                        next_sample: Instant::now() + SAMPLE_PERIOD,
250                    };
251                }
252            },
253            ChannelState::Failed => pending().await,
254        }
255        Ok(())
256    }
257}