1use crate::common::IcPipe;
7use crate::common::NegotiateState;
8use crate::common::Versions;
9use anyhow::Context;
10use async_trait::async_trait;
11use futures::FutureExt;
12use futures::StreamExt;
13use futures::stream::FusedStream;
14use futures::stream::once;
15use futures_concurrency::stream::Merge;
16use hyperv_ic_protocol::HeaderFlags;
17use hyperv_ic_protocol::Status;
18use hyperv_ic_protocol::kvp as proto;
19use hyperv_ic_resources::kvp::AddressOrigin;
20use hyperv_ic_resources::kvp::IpInfo;
21use hyperv_ic_resources::kvp::Ipv4AddressInfo;
22use hyperv_ic_resources::kvp::Ipv6AddressInfo;
23use hyperv_ic_resources::kvp::KeyValue;
24use hyperv_ic_resources::kvp::KvpConnectRpc;
25use hyperv_ic_resources::kvp::KvpRpc;
26use hyperv_ic_resources::kvp::Value;
27use inspect::Inspect;
28use inspect::InspectMut;
29use mesh::rpc::FailableRpc;
30use std::pin::pin;
31use task_control::Cancelled;
32use task_control::StopTask;
33use thiserror::Error;
34use vmbus_channel::RawAsyncChannel;
35use vmbus_channel::bus::ChannelType;
36use vmbus_channel::bus::OfferParams;
37use vmbus_channel::channel::ChannelOpenError;
38use vmbus_channel::gpadl_ring::GpadlRingMem;
39use vmbus_channel::simple::SaveRestoreSimpleVmbusDevice;
40use vmbus_channel::simple::SimpleVmbusDevice;
41use vmcore::save_restore::NoSavedState;
42use zerocopy::FromBytes;
43use zerocopy::FromZeros;
44use zerocopy::Immutable;
45use zerocopy::IntoBytes;
46
47const KVP_VERSIONS: &[hyperv_ic_protocol::Version] = &[
48 proto::KVP_VERSION_3,
49 proto::KVP_VERSION_4,
50 proto::KVP_VERSION_5,
51];
52
53#[derive(InspectMut)]
55pub struct KvpIc {
56 #[inspect(skip)]
57 recv: mesh::Receiver<KvpConnectRpc>,
58 #[inspect(skip)]
59 wait_ready: Vec<FailableRpc<(), (mesh::Sender<KvpRpc>, mesh::OneshotReceiver<()>)>>,
60}
61
62#[doc(hidden)]
63#[derive(InspectMut)]
64pub struct KvpChannel {
65 #[inspect(mut)]
66 pipe: IcPipe,
67 state: ChannelState,
68}
69
70#[derive(Inspect)]
71#[inspect(external_tag)]
72enum ChannelState {
73 Negotiate(#[inspect(rename = "state")] NegotiateState),
74 Ready {
75 versions: Versions,
76 #[inspect(with = "|x| x.len()")]
77 clients: Vec<mesh::OneshotSender<()>>,
78 #[inspect(skip)]
79 rpc_recv: mesh::Receiver<KvpRpc>,
80 state: ReadyState,
81 },
82 Failed,
83}
84
85#[derive(Inspect)]
86#[inspect(external_tag)]
87enum ReadyState {
88 Ready,
89 SendingRequest(#[inspect(skip)] KvpRpc),
90 WaitingResponse(#[inspect(skip)] KvpRpc),
91}
92
93#[async_trait]
94impl SimpleVmbusDevice for KvpIc {
95 type SavedState = NoSavedState;
96 type Runner = KvpChannel;
97
98 fn offer(&self) -> OfferParams {
99 OfferParams {
100 interface_name: "kvp_ic".to_owned(),
101 instance_id: proto::INSTANCE_ID,
102 interface_id: proto::INTERFACE_ID,
103 channel_type: ChannelType::Pipe { message_mode: true },
104 ..Default::default()
105 }
106 }
107
108 fn inspect(&mut self, req: inspect::Request<'_>, runner: Option<&mut Self::Runner>) {
109 req.respond().merge(self).merge(runner);
110 }
111
112 fn open(
113 &mut self,
114 channel: RawAsyncChannel<GpadlRingMem>,
115 _guest_memory: guestmem::GuestMemory,
116 ) -> Result<Self::Runner, ChannelOpenError> {
117 KvpChannel::new(channel, None)
118 }
119
120 async fn run(
121 &mut self,
122 stop: &mut StopTask<'_>,
123 runner: &mut Self::Runner,
124 ) -> Result<(), Cancelled> {
125 stop.until_stopped(async { runner.process(self).await })
126 .await
127 }
128
129 fn supports_save_restore(
130 &mut self,
131 ) -> Option<
132 &mut dyn SaveRestoreSimpleVmbusDevice<SavedState = Self::SavedState, Runner = Self::Runner>,
133 > {
134 None
135 }
136}
137
138impl KvpIc {
139 pub fn new(recv: mesh::Receiver<KvpConnectRpc>) -> Self {
141 Self {
142 recv,
143 wait_ready: Vec::new(),
144 }
145 }
146}
147
148impl KvpChannel {
149 fn new(
150 channel: RawAsyncChannel<GpadlRingMem>,
151 restore_state: Option<ChannelState>,
152 ) -> Result<Self, ChannelOpenError> {
153 let pipe = IcPipe::new(channel)?;
154 Ok(Self {
155 pipe,
156 state: restore_state.unwrap_or(ChannelState::Negotiate(NegotiateState::default())),
157 })
158 }
159
160 async fn process(&mut self, ic: &mut KvpIc) -> ! {
161 enum Event {
162 StateMachine(anyhow::Result<()>),
163 Request(KvpConnectRpc),
164 }
165
166 loop {
167 let event = pin!(
168 (
169 once(
170 self.process_state_machine(&mut ic.wait_ready)
171 .map(Event::StateMachine)
172 ),
173 (&mut ic.recv).map(Event::Request),
174 )
175 .merge()
176 )
177 .next()
178 .await
179 .unwrap();
180 match event {
181 Event::StateMachine(r) => {
182 if let Err(err) = r {
183 tracing::error!(
184 error = err.as_ref() as &dyn std::error::Error,
185 "kvp ic error"
186 );
187 self.state = ChannelState::Failed;
188 for rpc in ic.wait_ready.drain(..) {
189 rpc.fail(anyhow::anyhow!("kvp channel failed"));
190 }
191 }
192 }
193 Event::Request(req) => match req {
194 KvpConnectRpc::WaitForGuest(rpc) => match &mut self.state {
195 ChannelState::Negotiate(_) => ic.wait_ready.push(rpc),
196 ChannelState::Ready {
197 clients, rpc_recv, ..
198 } => {
199 let rpc_send = rpc_recv.sender();
200 let (send, recv) = mesh::oneshot();
201 clients.retain(|c| !c.is_closed());
202 clients.push(send);
203 rpc.complete(Ok((rpc_send, recv)));
204 }
205 ChannelState::Failed => {
206 rpc.fail(anyhow::anyhow!("kvp channel failed"));
207 }
208 },
209 },
210 }
211 }
212 }
213
214 async fn process_state_machine(
215 &mut self,
216 wait_ready: &mut Vec<FailableRpc<(), (mesh::Sender<KvpRpc>, mesh::OneshotReceiver<()>)>>,
217 ) -> anyhow::Result<()> {
218 match self.state {
219 ChannelState::Negotiate(ref mut state) => {
220 if let Some(versions) = self.pipe.negotiate(state, KVP_VERSIONS).await? {
221 let mut rpc_recv = mesh::Receiver::new();
222 let clients = wait_ready
223 .drain(..)
224 .map(|rpc| {
225 let rpc_send = rpc_recv.sender();
226 let (send, recv) = mesh::oneshot();
227 rpc.complete(Ok((rpc_send, recv)));
228 send
229 })
230 .collect();
231
232 self.state = ChannelState::Ready {
233 versions,
234 clients,
235 rpc_recv,
236 state: ReadyState::Ready,
237 };
238 }
239 }
240 ChannelState::Ready {
241 ref versions,
242 rpc_recv: ref mut recv,
243 ref mut state,
244 clients: _,
245 } => match state {
246 ReadyState::Ready => {
247 if recv.is_terminated() {
248 return std::future::pending().await;
249 }
250 if let Some(rpc) = recv.next().await {
251 *state = ReadyState::SendingRequest(rpc);
252 }
253 }
254 ReadyState::SendingRequest(rpc) => {
255 match rpc {
256 KvpRpc::Set(rpc) => {
257 let params = rpc.input();
258 let mut message = proto::MessageGetSet::new_zeroed();
259 message.value.key_size =
260 write_str(&mut message.value.key, ¶ms.key)?;
261
262 (message.value.value_type, message.value.value_size) =
263 write_value(&mut message.value.value, ¶ms.value)?;
264
265 self.pipe
266 .write_message(
267 versions,
268 hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
269 HeaderFlags::new().with_request(true).with_transaction(true),
270 msg(proto::KvpOperation::SET, pool_cvt(params.pool), message)
271 .as_bytes(),
272 )
273 .await?;
274 }
275 KvpRpc::Delete(rpc) => {
276 let params = rpc.input();
277 let mut message = proto::MessageDelete::new_zeroed();
278 message.key_size = write_str(&mut message.key, ¶ms.key)?;
279 self.pipe
280 .write_message(
281 versions,
282 hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
283 HeaderFlags::new().with_request(true).with_transaction(true),
284 msg(
285 proto::KvpOperation::DELETE,
286 pool_cvt(params.pool),
287 message,
288 )
289 .as_bytes(),
290 )
291 .await?;
292 }
293 KvpRpc::Enumerate(rpc) => {
294 let params = rpc.input();
295 let message = proto::MessageEnumerate {
296 index: params.index,
297 value: proto::Value::new_zeroed(),
298 };
299
300 self.pipe
301 .write_message(
302 versions,
303 hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
304 HeaderFlags::new().with_request(true).with_transaction(true),
305 msg(
306 proto::KvpOperation::ENUMERATE,
307 pool_cvt(params.pool),
308 message,
309 )
310 .as_bytes(),
311 )
312 .await?;
313 }
314 KvpRpc::GetIpInfo(rpc) => {
315 let params = rpc.input();
316 let message = match prepare_get_ip_info(versions, ¶ms.adapter_id) {
317 Ok(message) => message,
318 Err(err) => {
319 let ReadyState::SendingRequest(KvpRpc::GetIpInfo(rpc)) =
320 std::mem::replace(state, ReadyState::Ready)
321 else {
322 unreachable!()
323 };
324 rpc.fail(err);
325 return Ok(());
326 }
327 };
328 self.pipe
329 .write_message(
330 versions,
331 hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
332 HeaderFlags::new().with_request(true).with_transaction(true),
333 message.as_bytes(),
334 )
335 .await?;
336 }
337 KvpRpc::SetIpInfo(rpc) => {
338 let params = rpc.input();
339 let message = match prepare_set_ip_info(
340 versions,
341 ¶ms.adapter_id,
342 ¶ms.info,
343 ) {
344 Ok(message) => message,
345 Err(err) => {
346 let ReadyState::SendingRequest(KvpRpc::SetIpInfo(rpc)) =
347 std::mem::replace(state, ReadyState::Ready)
348 else {
349 unreachable!()
350 };
351 rpc.fail(err);
352 return Ok(());
353 }
354 };
355 self.pipe
356 .write_message(
357 versions,
358 hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
359 HeaderFlags::new().with_request(true).with_transaction(true),
360 message.as_bytes(),
361 )
362 .await?;
363 }
364 }
365 let ReadyState::SendingRequest(rpc) =
366 std::mem::replace(state, ReadyState::Ready)
367 else {
368 unreachable!()
369 };
370 *state = ReadyState::WaitingResponse(rpc);
371 }
372 ReadyState::WaitingResponse(_) => {
373 let (status, response) = self.pipe.read_response().await?;
374 let r = if status == Status::SUCCESS {
375 Ok(())
376 } else {
377 Err(RequestError(status))
378 };
379 let ReadyState::WaitingResponse(rpc) =
380 std::mem::replace(state, ReadyState::Ready)
381 else {
382 unreachable!()
383 };
384 match rpc {
385 KvpRpc::Set(rpc) => rpc.handle_failable_sync(|_| r),
386 KvpRpc::Delete(rpc) => rpc.handle_failable_sync(|_| r),
387 KvpRpc::Enumerate(rpc) => match r {
388 Ok(()) => {
389 let v = parse_enumerate_response(response)?;
390 rpc.complete(Ok(Some(v)));
391 }
392 Err(RequestError(Status::NO_MORE_ITEMS)) => {
393 rpc.complete(Ok(None));
394 }
395 Err(err) => rpc.fail(err),
396 },
397 KvpRpc::GetIpInfo(rpc) => match r {
398 Ok(()) => {
399 let v = parse_ip_info_response(response)?;
400 rpc.complete(Ok(v))
401 }
402 Err(err) => rpc.fail(err),
403 },
404 KvpRpc::SetIpInfo(rpc) => rpc.handle_failable_sync(|_| r),
405 }
406 }
407 },
408 ChannelState::Failed => std::future::pending().await,
409 }
410 Ok(())
411 }
412}
413
414fn parse_response<T: FromBytes + Immutable>(response: &[u8]) -> Result<T, anyhow::Error> {
415 let response = response
416 .get(align_of::<T>().max(size_of::<proto::KvpHeader>())..)
417 .context("missing header")?;
418 let (response, _) = T::read_from_prefix(response)
419 .ok()
420 .context("missing response")?;
421 Ok(response)
422}
423
424fn parse_enumerate_response(response: &[u8]) -> Result<KeyValue, anyhow::Error> {
425 let response = parse_response::<proto::MessageEnumerate>(response)?;
426 let key = parse_str(&response.value.key, response.value.key_size)?;
427 let value = match response.value.value_type {
428 proto::ValueType::DWORD => {
429 if response.value.value_size != 4 {
430 anyhow::bail!("invalid dword value size");
431 }
432 let value = u32::from_le_bytes(response.value.value[..4].try_into().unwrap());
433 Value::U32(value)
434 }
435 proto::ValueType::QWORD => {
436 if response.value.value_size != 8 {
437 anyhow::bail!("invalid qword value size");
438 }
439 let value = u64::from_le_bytes(response.value.value[..8].try_into().unwrap());
440 Value::U64(value)
441 }
442 proto::ValueType::STRING | proto::ValueType::EXPAND_STRING => {
443 let value = parse_str(
444 <[u16]>::ref_from_bytes(&response.value.value).unwrap(),
445 response.value.value_size,
446 )?;
447 Value::String(value)
448 }
449 proto::ValueType(v) => {
450 anyhow::bail!("invalid value type {v:#x}")
451 }
452 };
453
454 Ok(KeyValue { key, value })
455}
456
457fn parse_ip_info_response(response: &[u8]) -> Result<IpInfo, anyhow::Error> {
458 fn parse_ipv4(
459 addresses: &[proto::IpAddressV4],
460 count: u32,
461 ) -> anyhow::Result<impl Iterator<Item = std::net::Ipv4Addr> + '_> {
462 anyhow::Ok(
463 addresses
464 .get(..count as usize)
465 .context("invalid ipv4 address count")?
466 .iter()
467 .map(|x| x.0.into()),
468 )
469 }
470 fn parse_ipv6(
471 addresses: &[proto::IpAddressV6],
472 count: u32,
473 ) -> anyhow::Result<impl Iterator<Item = std::net::Ipv6Addr> + '_> {
474 anyhow::Ok(
475 addresses
476 .get(..count as usize)
477 .context("invalid ipv6 address count")?
478 .iter()
479 .map(|x| x.0.into()),
480 )
481 }
482 fn parse_origin(origin: proto::IpAddressOrigin) -> AddressOrigin {
483 match origin {
484 proto::IpAddressOrigin::UNKNOWN => AddressOrigin::Unknown,
485 proto::IpAddressOrigin::OTHER => AddressOrigin::Other,
486 proto::IpAddressOrigin::STATIC => AddressOrigin::Static,
487 _ => AddressOrigin::Unknown,
488 }
489 }
490
491 let response = parse_response::<proto::MessageIpAddressInfoBinary>(response)?;
492 let (ipv4_origins, rest) = response
493 .ip_address_origins
494 .split_at_checked(response.ipv4_address_count as usize)
495 .context("invalid ipv4 address count")?;
496
497 let ipv6_origins = rest
498 .get(..response.ipv6_address_count as usize)
499 .context("invalid ipv6 address count")?;
500
501 let info = IpInfo {
502 ipv4: matches!(
503 response.address_family,
504 proto::AddressFamily::IPV4 | proto::AddressFamily::IPV4V6
505 ),
506 ipv6: matches!(
507 response.address_family,
508 proto::AddressFamily::IPV6 | proto::AddressFamily::IPV4V6
509 ),
510 dhcp_enabled: response.dhcp_enabled != 0,
511 ipv4_addresses: parse_ipv4(&response.ipv4_addresses, response.ipv4_address_count)?
512 .zip(parse_ipv4(
513 &response.ipv4_subnets,
514 response.ipv4_address_count,
515 )?)
516 .zip(ipv4_origins)
517 .map(|((address, subnet), &origin)| Ipv4AddressInfo {
518 address,
519 subnet,
520 origin: parse_origin(origin),
521 })
522 .collect(),
523 ipv6_addresses: parse_ipv6(&response.ipv6_addresses, response.ipv6_address_count)?
524 .zip(
525 response
526 .ipv6_subnets
527 .get(..response.ipv6_address_count as usize)
528 .context("invalid ipv6 address count")?,
529 )
530 .zip(ipv6_origins)
531 .map(|((address, &subnet), &origin)| Ipv6AddressInfo {
532 address,
533 subnet,
534 origin: parse_origin(origin),
535 })
536 .collect(),
537 ipv4_dns_servers: parse_ipv4(&response.ipv4_dns_servers, response.ipv4_dns_server_count)?
538 .collect(),
539 ipv6_dns_servers: parse_ipv6(&response.ipv6_dns_servers, response.ipv6_dns_server_count)?
540 .collect(),
541 ipv4_gateways: parse_ipv4(&response.ipv4_gateways, response.ipv4_gateway_count)?.collect(),
542 ipv6_gateways: parse_ipv6(&response.ipv6_gateways, response.ipv6_gateway_count)?.collect(),
543 };
544
545 Ok(info)
546}
547
548fn prepare_get_ip_info(
549 versions: &Versions,
550 adapter_id: &str,
551) -> anyhow::Result<Box<proto::KvpMessage2>> {
552 if versions.message_version < proto::KVP_VERSION_5 {
553 anyhow::bail!("non-binary protocol not supported");
554 }
555 let mut message = proto::MessageIpAddressInfoBinary::new_zeroed();
556 write_str(&mut message.adapter_id, adapter_id)?;
557 Ok(msg2(
558 proto::KvpOperation::GET_IP_ADDRESS_INFO,
559 proto::KvpPool::GUEST,
560 message,
561 ))
562}
563
564fn prepare_set_ip_info(
565 versions: &Versions,
566 adapter_id: &str,
567 info: &IpInfo,
568) -> anyhow::Result<Box<proto::KvpMessage2>> {
569 if versions.message_version < proto::KVP_VERSION_5 {
570 anyhow::bail!("non-binary protocol not supported");
571 }
572 let mut message = proto::MessageIpAddressInfoBinary::new_zeroed();
573 write_str(&mut message.adapter_id, adapter_id)?;
574 message.dhcp_enabled = info.dhcp_enabled as u8;
575 message.address_family = match (info.ipv4, info.ipv6) {
576 (true, true) => proto::AddressFamily::IPV4V6,
577 (true, false) => proto::AddressFamily::IPV4,
578 (false, true) => proto::AddressFamily::IPV6,
579 (false, false) => proto::AddressFamily::NONE,
580 };
581
582 for ((da, ds), a) in message
583 .ipv4_addresses
584 .get_mut(..info.ipv4_addresses.len())
585 .context("invalid ipv4 address count")?
586 .iter_mut()
587 .zip(&mut message.ipv4_subnets)
588 .zip(&info.ipv4_addresses)
589 {
590 da.0 = a.address.octets();
591 ds.0 = a.subnet.octets();
592 }
593 message.ipv4_address_count = info.ipv4_addresses.len() as u32;
594
595 for ((da, ds), a) in message
596 .ipv6_addresses
597 .get_mut(..info.ipv6_addresses.len())
598 .context("invalid ipv6 address count")?
599 .iter_mut()
600 .zip(&mut message.ipv6_subnets)
601 .zip(&info.ipv6_addresses)
602 {
603 da.0 = a.address.octets();
604 *ds = a.subnet;
605 }
606 message.ipv6_address_count = info.ipv6_addresses.len() as u32;
607
608 for (da, a) in message
609 .ipv4_gateways
610 .get_mut(..info.ipv4_gateways.len())
611 .context("invalid ipv4 gateway count")?
612 .iter_mut()
613 .zip(&info.ipv4_gateways)
614 {
615 da.0 = a.octets();
616 }
617 message.ipv4_gateway_count = info.ipv4_gateways.len() as u32;
618
619 for (da, a) in message
620 .ipv6_gateways
621 .get_mut(..info.ipv6_gateways.len())
622 .context("invalid ipv6 gateway count")?
623 .iter_mut()
624 .zip(&info.ipv6_gateways)
625 {
626 da.0 = a.octets();
627 }
628 message.ipv6_gateway_count = info.ipv6_gateways.len() as u32;
629
630 for (da, a) in message
631 .ipv4_dns_servers
632 .get_mut(..info.ipv4_dns_servers.len())
633 .context("invalid ipv4 dns server count")?
634 .iter_mut()
635 .zip(&info.ipv4_dns_servers)
636 {
637 da.0 = a.octets();
638 }
639 message.ipv4_dns_server_count = info.ipv4_dns_servers.len() as u32;
640
641 for (da, a) in message
642 .ipv6_dns_servers
643 .get_mut(..info.ipv6_dns_servers.len())
644 .context("invalid ipv6 dns server count")?
645 .iter_mut()
646 .zip(&info.ipv6_dns_servers)
647 {
648 da.0 = a.octets();
649 }
650 message.ipv6_dns_server_count = info.ipv6_dns_servers.len() as u32;
651
652 Ok(msg2(
653 proto::KvpOperation::SET_IP_ADDRESS_INFO,
654 proto::KvpPool::GUEST,
655 message,
656 ))
657}
658
659#[derive(Debug, Error)]
660#[error("KVP error: {0:x?}")]
661struct RequestError(Status);
662
663fn pool_cvt(pool: hyperv_ic_resources::kvp::KvpPool) -> proto::KvpPool {
664 match pool {
665 hyperv_ic_resources::kvp::KvpPool::External => proto::KvpPool::EXTERNAL,
666 hyperv_ic_resources::kvp::KvpPool::Guest => proto::KvpPool::GUEST,
667 hyperv_ic_resources::kvp::KvpPool::Auto => proto::KvpPool::AUTO,
668 hyperv_ic_resources::kvp::KvpPool::AutoExternal => proto::KvpPool::AUTO_EXTERNAL,
669 }
670}
671
672fn msg<T: IntoBytes + Immutable>(
673 operation: proto::KvpOperation,
674 pool: proto::KvpPool,
675 message: T,
676) -> Box<proto::KvpMessage> {
677 let mut m = Box::new(proto::KvpMessage {
678 header: proto::KvpHeader { operation, pool },
679 data: [0; 2578],
680 });
681 let offset = align_of::<T>().saturating_sub(size_of::<proto::KvpHeader>());
682 message.write_to_prefix(&mut m.data[offset..]).unwrap();
683 m
684}
685
686fn msg2<T: IntoBytes + Immutable>(
687 operation: proto::KvpOperation,
688 pool: proto::KvpPool,
689 message: T,
690) -> Box<proto::KvpMessage2> {
691 let mut m = Box::new(proto::KvpMessage2 {
692 header: proto::KvpHeader { operation, pool },
693 data: [0; 0x1d02],
694 });
695 let offset = align_of::<T>().saturating_sub(size_of::<proto::KvpHeader>());
696 message.write_to_prefix(&mut m.data[offset..]).unwrap();
697 m
698}
699
700fn parse_str(v: &[u16], n: u32) -> anyhow::Result<String> {
701 if n % 2 != 0 {
702 anyhow::bail!("invalid string length");
703 }
704 let v = v.get(..n as usize / 2).context("string length too large")?;
705 if v.last() != Some(&0) {
706 anyhow::bail!("missing null terminator");
707 }
708 String::from_utf16(&v[..v.len() - 1]).context("invalid utf-16")
709}
710
711fn write_str(v: &mut [u16], s: &str) -> anyhow::Result<u32> {
712 let mut i = 0;
713 for (s, d) in s.encode_utf16().zip(&mut *v) {
714 *d = s;
715 i += 1;
716 }
717 *v.get_mut(i).context("string too long")? = 0;
718 Ok((i + 1) as u32 * 2)
719}
720
721fn write_value(buf: &mut [u8], v: &Value) -> anyhow::Result<(proto::ValueType, u32)> {
722 let r = match *v {
723 Value::String(ref s) => (
724 proto::ValueType::STRING,
725 write_str(<[u16]>::mut_from_bytes(buf).unwrap(), s)?,
726 ),
727 Value::U32(v) => {
728 buf[..4].copy_from_slice(&v.to_le_bytes());
729 (proto::ValueType::DWORD, 4)
730 }
731 Value::U64(v) => {
732 buf[..8].copy_from_slice(&v.to_le_bytes());
733 (proto::ValueType::QWORD, 8)
734 }
735 };
736 Ok(r)
737}