1use crate::common::IcPipe;
13use crate::common::NegotiateState;
14use crate::common::Versions;
15use async_trait::async_trait;
16use guestmem::GuestMemory;
17use hyperv_ic_protocol::timesync as proto;
18use inspect::Inspect;
19use inspect::InspectMut;
20use pal_async::driver::Driver;
21use pal_async::timer::Instant;
22use pal_async::timer::PolledTimer;
23use std::future::pending;
24use task_control::Cancelled;
25use task_control::StopTask;
26use vmbus_channel::RawAsyncChannel;
27use vmbus_channel::bus::ChannelType;
28use vmbus_channel::bus::OfferParams;
29use vmbus_channel::channel::ChannelOpenError;
30use vmbus_channel::gpadl_ring::GpadlRingMem;
31use vmbus_channel::simple::SaveRestoreSimpleVmbusDevice;
32use vmbus_channel::simple::SimpleVmbusDevice;
33use vmcore::reference_time::ReferenceTimeSource;
34use vmcore::save_restore::NoSavedState;
35use zerocopy::IntoBytes;
36
37const TIMESYNC_VERSIONS: &[hyperv_ic_protocol::Version] = &[proto::TIMESYNC_VERSION_4];
38
39const SAMPLE_PERIOD: std::time::Duration = std::time::Duration::from_secs(5);
41
42#[derive(InspectMut)]
44#[non_exhaustive]
45pub struct TimesyncIc {
46 #[inspect(skip)]
47 timer: PolledTimer,
48 #[inspect(skip)]
49 ref_time: ReferenceTimeSource,
50}
51
52#[doc(hidden)]
53#[derive(InspectMut)]
54pub struct TimesyncChannel {
55 #[inspect(mut)]
56 pipe: IcPipe,
57 state: ChannelState,
58}
59
60#[derive(Inspect)]
61#[inspect(external_tag)]
62enum ChannelState {
63 Negotiate(#[inspect(rename = "state")] NegotiateState),
64 Ready {
65 versions: Versions,
66 state: ReadyState,
67 },
68 Failed,
69}
70
71#[derive(Inspect)]
72#[inspect(external_tag)]
73enum ReadyState {
74 SleepUntilNextSample {
75 #[inspect(with = "inspect_instant")]
76 next_sample: Instant,
77 },
78 SendMessage {
79 is_sync: bool,
80 },
81 WaitForResponse,
82}
83
84fn inspect_instant(&instant: &Instant) -> inspect::AsDisplay<jiff::Timestamp> {
85 let now = Instant::now();
86 let time = jiff::Timestamp::now();
87 let sd = if now <= instant {
88 jiff::SignedDuration::try_from(instant - now).unwrap()
89 } else {
90 -jiff::SignedDuration::try_from(now - instant).unwrap()
91 };
92 inspect::AsDisplay(time + sd)
93}
94
95impl TimesyncIc {
96 pub fn new(driver: &(impl Driver + ?Sized), ref_time: ReferenceTimeSource) -> Self {
98 Self {
99 timer: PolledTimer::new(driver),
100 ref_time,
101 }
102 }
103}
104
105#[async_trait]
106impl SimpleVmbusDevice for TimesyncIc {
107 type SavedState = NoSavedState;
108 type Runner = TimesyncChannel;
109
110 fn offer(&self) -> OfferParams {
111 OfferParams {
112 interface_name: "timesync_ic".to_owned(),
113 instance_id: proto::INSTANCE_ID,
114 interface_id: proto::INTERFACE_ID,
115 channel_type: ChannelType::Pipe { message_mode: true },
116 ..Default::default()
117 }
118 }
119
120 fn inspect(&mut self, req: inspect::Request<'_>, runner: Option<&mut Self::Runner>) {
121 req.respond().merge(self).merge(runner);
122 }
123
124 fn open(
125 &mut self,
126 channel: RawAsyncChannel<GpadlRingMem>,
127 _guest_memory: GuestMemory,
128 ) -> Result<Self::Runner, ChannelOpenError> {
129 TimesyncChannel::new(channel, None)
130 }
131
132 async fn run(
133 &mut self,
134 stop: &mut StopTask<'_>,
135 runner: &mut Self::Runner,
136 ) -> Result<(), Cancelled> {
137 stop.until_stopped(async { runner.process(self).await })
138 .await
139 }
140
141 fn supports_save_restore(
142 &mut self,
143 ) -> Option<
144 &mut dyn SaveRestoreSimpleVmbusDevice<SavedState = Self::SavedState, Runner = Self::Runner>,
145 > {
146 None
147 }
148}
149
150impl TimesyncChannel {
151 fn new(
152 channel: RawAsyncChannel<GpadlRingMem>,
153 restore_state: Option<ChannelState>,
154 ) -> Result<Self, ChannelOpenError> {
155 let pipe = IcPipe::new(channel)?;
156 Ok(Self {
157 pipe,
158 state: restore_state.unwrap_or(ChannelState::Negotiate(NegotiateState::default())),
159 })
160 }
161
162 async fn process(&mut self, ic: &mut TimesyncIc) -> ! {
163 loop {
164 if let Err(err) = self.process_state_machine(ic).await {
165 tracing::error!(
166 error = err.as_ref() as &dyn std::error::Error,
167 "timesync ic error"
168 );
169 self.state = ChannelState::Failed;
170 }
171 }
172 }
173
174 async fn process_state_machine(&mut self, ic: &mut TimesyncIc) -> anyhow::Result<()> {
175 match self.state {
176 ChannelState::Negotiate(ref mut state) => {
177 if let Some(versions) = self.pipe.negotiate(state, TIMESYNC_VERSIONS).await? {
178 tracelimit::info_ratelimited!(
179 framework = %versions.framework_version,
180 version = %versions.message_version,
181 "timesync versions negotiated"
182 );
183 self.state = ChannelState::Ready {
185 versions,
186 state: ReadyState::SendMessage { is_sync: true },
187 };
188 }
189 }
190 ChannelState::Ready {
191 ref versions,
192 ref mut state,
193 } => match *state {
194 ReadyState::SleepUntilNextSample { next_sample } => {
195 ic.timer.sleep_until(next_sample).await;
196 *state = ReadyState::SendMessage { is_sync: false };
197 }
198 ReadyState::SendMessage { is_sync } => {
199 let message_size = size_of::<hyperv_ic_protocol::Header>()
202 + size_of::<proto::TimesyncMessageV4>();
203 self.pipe.pipe.wait_write_ready(message_size).await?;
204
205 let r = ic.ref_time.now();
209 let ref_time = r.ref_time;
210 let time = r.system_time.unwrap_or_else(jiff::Timestamp::now);
211
212 let message = proto::TimesyncMessageV4 {
213 parent_time: ((time.duration_since(proto::EPOCH).as_nanos() / 100) as u64)
214 .into(),
215 vm_reference_time: ref_time,
216 flags: proto::TimesyncFlags::new()
217 .with_sync(is_sync)
218 .with_sample(!is_sync),
219 leap_indicator: 0,
220 stratum: 0,
221 reserved: [0; 5],
222 };
223 self.pipe
224 .write_message(
225 versions,
226 hyperv_ic_protocol::MessageType::TIME_SYNC,
227 hyperv_ic_protocol::HeaderFlags::new()
228 .with_request(true)
229 .with_transaction(true),
230 message.as_bytes(),
231 )
232 .await?;
233
234 if is_sync {
235 tracelimit::info_ratelimited!(%time, ref_time, "sent time sync");
236 } else {
237 tracing::debug!(%time, ref_time, "sent time sample");
238 }
239
240 *state = ReadyState::WaitForResponse;
244 }
245 ReadyState::WaitForResponse => {
246 self.pipe.read_response().await?;
247 *state = ReadyState::SleepUntilNextSample {
249 next_sample: Instant::now() + SAMPLE_PERIOD,
250 };
251 }
252 },
253 ChannelState::Failed => pending().await,
254 }
255 Ok(())
256 }
257}