hyperv_ic/
kvp.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! KVP IC device.
5
6use crate::common::IcPipe;
7use crate::common::NegotiateState;
8use crate::common::Versions;
9use anyhow::Context;
10use async_trait::async_trait;
11use futures::FutureExt;
12use futures::StreamExt;
13use futures::stream::FusedStream;
14use futures::stream::once;
15use futures_concurrency::stream::Merge;
16use hyperv_ic_protocol::HeaderFlags;
17use hyperv_ic_protocol::Status;
18use hyperv_ic_protocol::kvp as proto;
19use hyperv_ic_resources::kvp::AddressOrigin;
20use hyperv_ic_resources::kvp::IpInfo;
21use hyperv_ic_resources::kvp::Ipv4AddressInfo;
22use hyperv_ic_resources::kvp::Ipv6AddressInfo;
23use hyperv_ic_resources::kvp::KeyValue;
24use hyperv_ic_resources::kvp::KvpConnectRpc;
25use hyperv_ic_resources::kvp::KvpRpc;
26use hyperv_ic_resources::kvp::Value;
27use inspect::Inspect;
28use inspect::InspectMut;
29use mesh::rpc::FailableRpc;
30use std::pin::pin;
31use task_control::Cancelled;
32use task_control::StopTask;
33use thiserror::Error;
34use vmbus_channel::RawAsyncChannel;
35use vmbus_channel::bus::ChannelType;
36use vmbus_channel::bus::OfferParams;
37use vmbus_channel::channel::ChannelOpenError;
38use vmbus_channel::gpadl_ring::GpadlRingMem;
39use vmbus_channel::simple::SaveRestoreSimpleVmbusDevice;
40use vmbus_channel::simple::SimpleVmbusDevice;
41use vmcore::save_restore::NoSavedState;
42use zerocopy::FromBytes;
43use zerocopy::FromZeros;
44use zerocopy::Immutable;
45use zerocopy::IntoBytes;
46
47const KVP_VERSIONS: &[hyperv_ic_protocol::Version] = &[
48    proto::KVP_VERSION_3,
49    proto::KVP_VERSION_4,
50    proto::KVP_VERSION_5,
51];
52
53/// KVP IC device.
54#[derive(InspectMut)]
55pub struct KvpIc {
56    #[inspect(skip)]
57    recv: mesh::Receiver<KvpConnectRpc>,
58    #[inspect(skip)]
59    wait_ready: Vec<FailableRpc<(), (mesh::Sender<KvpRpc>, mesh::OneshotReceiver<()>)>>,
60}
61
62#[doc(hidden)]
63#[derive(InspectMut)]
64pub struct KvpChannel {
65    #[inspect(mut)]
66    pipe: IcPipe,
67    state: ChannelState,
68}
69
70#[expect(clippy::large_enum_variant)]
71#[derive(Inspect)]
72#[inspect(external_tag)]
73enum ChannelState {
74    Negotiate(#[inspect(rename = "state")] NegotiateState),
75    Ready {
76        versions: Versions,
77        #[inspect(with = "|x| x.len()")]
78        clients: Vec<mesh::OneshotSender<()>>,
79        #[inspect(skip)]
80        rpc_recv: mesh::Receiver<KvpRpc>,
81        state: ReadyState,
82    },
83    Failed,
84}
85
86#[derive(Inspect)]
87#[inspect(external_tag)]
88enum ReadyState {
89    Ready,
90    SendingRequest(#[inspect(skip)] KvpRpc),
91    WaitingResponse(#[inspect(skip)] KvpRpc),
92}
93
94#[async_trait]
95impl SimpleVmbusDevice for KvpIc {
96    type SavedState = NoSavedState;
97    type Runner = KvpChannel;
98
99    fn offer(&self) -> OfferParams {
100        OfferParams {
101            interface_name: "kvp_ic".to_owned(),
102            instance_id: proto::INSTANCE_ID,
103            interface_id: proto::INTERFACE_ID,
104            channel_type: ChannelType::Pipe { message_mode: true },
105            ..Default::default()
106        }
107    }
108
109    fn inspect(&mut self, req: inspect::Request<'_>, runner: Option<&mut Self::Runner>) {
110        req.respond().merge(self).merge(runner);
111    }
112
113    fn open(
114        &mut self,
115        channel: RawAsyncChannel<GpadlRingMem>,
116        _guest_memory: guestmem::GuestMemory,
117    ) -> Result<Self::Runner, ChannelOpenError> {
118        KvpChannel::new(channel, None)
119    }
120
121    async fn run(
122        &mut self,
123        stop: &mut StopTask<'_>,
124        runner: &mut Self::Runner,
125    ) -> Result<(), Cancelled> {
126        stop.until_stopped(async { runner.process(self).await })
127            .await
128    }
129
130    fn supports_save_restore(
131        &mut self,
132    ) -> Option<
133        &mut dyn SaveRestoreSimpleVmbusDevice<SavedState = Self::SavedState, Runner = Self::Runner>,
134    > {
135        None
136    }
137}
138
139impl KvpIc {
140    /// Create a new KVP IC device.
141    pub fn new(recv: mesh::Receiver<KvpConnectRpc>) -> Self {
142        Self {
143            recv,
144            wait_ready: Vec::new(),
145        }
146    }
147}
148
149impl KvpChannel {
150    fn new(
151        channel: RawAsyncChannel<GpadlRingMem>,
152        restore_state: Option<ChannelState>,
153    ) -> Result<Self, ChannelOpenError> {
154        let pipe = IcPipe::new(channel)?;
155        Ok(Self {
156            pipe,
157            state: restore_state.unwrap_or(ChannelState::Negotiate(NegotiateState::default())),
158        })
159    }
160
161    async fn process(&mut self, ic: &mut KvpIc) -> ! {
162        enum Event {
163            StateMachine(anyhow::Result<()>),
164            Request(KvpConnectRpc),
165        }
166
167        loop {
168            let event = pin!(
169                (
170                    once(
171                        self.process_state_machine(&mut ic.wait_ready)
172                            .map(Event::StateMachine)
173                    ),
174                    (&mut ic.recv).map(Event::Request),
175                )
176                    .merge()
177            )
178            .next()
179            .await
180            .unwrap();
181            match event {
182                Event::StateMachine(r) => {
183                    if let Err(err) = r {
184                        tracing::error!(
185                            error = err.as_ref() as &dyn std::error::Error,
186                            "kvp ic error"
187                        );
188                        self.state = ChannelState::Failed;
189                        for rpc in ic.wait_ready.drain(..) {
190                            rpc.fail(anyhow::anyhow!("kvp channel failed"));
191                        }
192                    }
193                }
194                Event::Request(req) => match req {
195                    KvpConnectRpc::WaitForGuest(rpc) => match &mut self.state {
196                        ChannelState::Negotiate(_) => ic.wait_ready.push(rpc),
197                        ChannelState::Ready {
198                            clients, rpc_recv, ..
199                        } => {
200                            let rpc_send = rpc_recv.sender();
201                            let (send, recv) = mesh::oneshot();
202                            clients.retain(|c| !c.is_closed());
203                            clients.push(send);
204                            rpc.complete(Ok((rpc_send, recv)));
205                        }
206                        ChannelState::Failed => {
207                            rpc.fail(anyhow::anyhow!("kvp channel failed"));
208                        }
209                    },
210                },
211            }
212        }
213    }
214
215    async fn process_state_machine(
216        &mut self,
217        wait_ready: &mut Vec<FailableRpc<(), (mesh::Sender<KvpRpc>, mesh::OneshotReceiver<()>)>>,
218    ) -> anyhow::Result<()> {
219        match self.state {
220            ChannelState::Negotiate(ref mut state) => {
221                if let Some(versions) = self.pipe.negotiate(state, KVP_VERSIONS).await? {
222                    let mut rpc_recv = mesh::Receiver::new();
223                    let clients = wait_ready
224                        .drain(..)
225                        .map(|rpc| {
226                            let rpc_send = rpc_recv.sender();
227                            let (send, recv) = mesh::oneshot();
228                            rpc.complete(Ok((rpc_send, recv)));
229                            send
230                        })
231                        .collect();
232
233                    self.state = ChannelState::Ready {
234                        versions,
235                        clients,
236                        rpc_recv,
237                        state: ReadyState::Ready,
238                    };
239                }
240            }
241            ChannelState::Ready {
242                ref versions,
243                rpc_recv: ref mut recv,
244                ref mut state,
245                clients: _,
246            } => match state {
247                ReadyState::Ready => {
248                    if recv.is_terminated() {
249                        return std::future::pending().await;
250                    }
251                    if let Some(rpc) = recv.next().await {
252                        *state = ReadyState::SendingRequest(rpc);
253                    }
254                }
255                ReadyState::SendingRequest(rpc) => {
256                    match rpc {
257                        KvpRpc::Set(rpc) => {
258                            let params = rpc.input();
259                            let mut message = proto::MessageGetSet::new_zeroed();
260                            message.value.key_size =
261                                write_str(&mut message.value.key, &params.key)?;
262
263                            (message.value.value_type, message.value.value_size) =
264                                write_value(&mut message.value.value, &params.value)?;
265
266                            self.pipe
267                                .write_message(
268                                    versions,
269                                    hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
270                                    HeaderFlags::new().with_request(true).with_transaction(true),
271                                    msg(proto::KvpOperation::SET, pool_cvt(params.pool), message)
272                                        .as_bytes(),
273                                )
274                                .await?;
275                        }
276                        KvpRpc::Delete(rpc) => {
277                            let params = rpc.input();
278                            let mut message = proto::MessageDelete::new_zeroed();
279                            message.key_size = write_str(&mut message.key, &params.key)?;
280                            self.pipe
281                                .write_message(
282                                    versions,
283                                    hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
284                                    HeaderFlags::new().with_request(true).with_transaction(true),
285                                    msg(
286                                        proto::KvpOperation::DELETE,
287                                        pool_cvt(params.pool),
288                                        message,
289                                    )
290                                    .as_bytes(),
291                                )
292                                .await?;
293                        }
294                        KvpRpc::Enumerate(rpc) => {
295                            let params = rpc.input();
296                            let message = proto::MessageEnumerate {
297                                index: params.index,
298                                value: proto::Value::new_zeroed(),
299                            };
300
301                            self.pipe
302                                .write_message(
303                                    versions,
304                                    hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
305                                    HeaderFlags::new().with_request(true).with_transaction(true),
306                                    msg(
307                                        proto::KvpOperation::ENUMERATE,
308                                        pool_cvt(params.pool),
309                                        message,
310                                    )
311                                    .as_bytes(),
312                                )
313                                .await?;
314                        }
315                        KvpRpc::GetIpInfo(rpc) => {
316                            let params = rpc.input();
317                            let message = match prepare_get_ip_info(versions, &params.adapter_id) {
318                                Ok(message) => message,
319                                Err(err) => {
320                                    let ReadyState::SendingRequest(KvpRpc::GetIpInfo(rpc)) =
321                                        std::mem::replace(state, ReadyState::Ready)
322                                    else {
323                                        unreachable!()
324                                    };
325                                    rpc.fail(err);
326                                    return Ok(());
327                                }
328                            };
329                            self.pipe
330                                .write_message(
331                                    versions,
332                                    hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
333                                    HeaderFlags::new().with_request(true).with_transaction(true),
334                                    message.as_bytes(),
335                                )
336                                .await?;
337                        }
338                        KvpRpc::SetIpInfo(rpc) => {
339                            let params = rpc.input();
340                            let message = match prepare_set_ip_info(
341                                versions,
342                                &params.adapter_id,
343                                &params.info,
344                            ) {
345                                Ok(message) => message,
346                                Err(err) => {
347                                    let ReadyState::SendingRequest(KvpRpc::SetIpInfo(rpc)) =
348                                        std::mem::replace(state, ReadyState::Ready)
349                                    else {
350                                        unreachable!()
351                                    };
352                                    rpc.fail(err);
353                                    return Ok(());
354                                }
355                            };
356                            self.pipe
357                                .write_message(
358                                    versions,
359                                    hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
360                                    HeaderFlags::new().with_request(true).with_transaction(true),
361                                    message.as_bytes(),
362                                )
363                                .await?;
364                        }
365                    }
366                    let ReadyState::SendingRequest(rpc) =
367                        std::mem::replace(state, ReadyState::Ready)
368                    else {
369                        unreachable!()
370                    };
371                    *state = ReadyState::WaitingResponse(rpc);
372                }
373                ReadyState::WaitingResponse(_) => {
374                    let (status, response) = self.pipe.read_response().await?;
375                    let r = if status == Status::SUCCESS {
376                        Ok(())
377                    } else {
378                        Err(RequestError(status))
379                    };
380                    let ReadyState::WaitingResponse(rpc) =
381                        std::mem::replace(state, ReadyState::Ready)
382                    else {
383                        unreachable!()
384                    };
385                    match rpc {
386                        KvpRpc::Set(rpc) => rpc.handle_failable_sync(|_| r),
387                        KvpRpc::Delete(rpc) => rpc.handle_failable_sync(|_| r),
388                        KvpRpc::Enumerate(rpc) => match r {
389                            Ok(()) => {
390                                let v = parse_enumerate_response(response)?;
391                                rpc.complete(Ok(Some(v)));
392                            }
393                            Err(RequestError(Status::NO_MORE_ITEMS)) => {
394                                rpc.complete(Ok(None));
395                            }
396                            Err(err) => rpc.fail(err),
397                        },
398                        KvpRpc::GetIpInfo(rpc) => match r {
399                            Ok(()) => {
400                                let v = parse_ip_info_response(response)?;
401                                rpc.complete(Ok(v))
402                            }
403                            Err(err) => rpc.fail(err),
404                        },
405                        KvpRpc::SetIpInfo(rpc) => rpc.handle_failable_sync(|_| r),
406                    }
407                }
408            },
409            ChannelState::Failed => std::future::pending().await,
410        }
411        Ok(())
412    }
413}
414
415fn parse_response<T: FromBytes + Immutable>(response: &[u8]) -> Result<T, anyhow::Error> {
416    let response = response
417        .get(align_of::<T>().max(size_of::<proto::KvpHeader>())..)
418        .context("missing header")?;
419    let (response, _) = T::read_from_prefix(response)
420        .ok()
421        .context("missing response")?;
422    Ok(response)
423}
424
425fn parse_enumerate_response(response: &[u8]) -> Result<KeyValue, anyhow::Error> {
426    let response = parse_response::<proto::MessageEnumerate>(response)?;
427    let key = parse_str(&response.value.key, response.value.key_size)?;
428    let value = match response.value.value_type {
429        proto::ValueType::DWORD => {
430            if response.value.value_size != 4 {
431                anyhow::bail!("invalid dword value size");
432            }
433            let value = u32::from_le_bytes(response.value.value[..4].try_into().unwrap());
434            Value::U32(value)
435        }
436        proto::ValueType::QWORD => {
437            if response.value.value_size != 8 {
438                anyhow::bail!("invalid qword value size");
439            }
440            let value = u64::from_le_bytes(response.value.value[..8].try_into().unwrap());
441            Value::U64(value)
442        }
443        proto::ValueType::STRING | proto::ValueType::EXPAND_STRING => {
444            let value = parse_str(
445                <[u16]>::ref_from_bytes(&response.value.value).unwrap(),
446                response.value.value_size,
447            )?;
448            Value::String(value)
449        }
450        proto::ValueType(v) => {
451            anyhow::bail!("invalid value type {v:#x}")
452        }
453    };
454
455    Ok(KeyValue { key, value })
456}
457
458fn parse_ip_info_response(response: &[u8]) -> Result<IpInfo, anyhow::Error> {
459    fn parse_ipv4(
460        addresses: &[proto::IpAddressV4],
461        count: u32,
462    ) -> anyhow::Result<impl Iterator<Item = std::net::Ipv4Addr> + '_> {
463        anyhow::Ok(
464            addresses
465                .get(..count as usize)
466                .context("invalid ipv4 address count")?
467                .iter()
468                .map(|x| x.0.into()),
469        )
470    }
471    fn parse_ipv6(
472        addresses: &[proto::IpAddressV6],
473        count: u32,
474    ) -> anyhow::Result<impl Iterator<Item = std::net::Ipv6Addr> + '_> {
475        anyhow::Ok(
476            addresses
477                .get(..count as usize)
478                .context("invalid ipv6 address count")?
479                .iter()
480                .map(|x| x.0.into()),
481        )
482    }
483    fn parse_origin(origin: proto::IpAddressOrigin) -> AddressOrigin {
484        match origin {
485            proto::IpAddressOrigin::UNKNOWN => AddressOrigin::Unknown,
486            proto::IpAddressOrigin::OTHER => AddressOrigin::Other,
487            proto::IpAddressOrigin::STATIC => AddressOrigin::Static,
488            _ => AddressOrigin::Unknown,
489        }
490    }
491
492    let response = parse_response::<proto::MessageIpAddressInfoBinary>(response)?;
493    let (ipv4_origins, rest) = response
494        .ip_address_origins
495        .split_at_checked(response.ipv4_address_count as usize)
496        .context("invalid ipv4 address count")?;
497
498    let ipv6_origins = rest
499        .get(..response.ipv6_address_count as usize)
500        .context("invalid ipv6 address count")?;
501
502    let info = IpInfo {
503        ipv4: matches!(
504            response.address_family,
505            proto::AddressFamily::IPV4 | proto::AddressFamily::IPV4V6
506        ),
507        ipv6: matches!(
508            response.address_family,
509            proto::AddressFamily::IPV6 | proto::AddressFamily::IPV4V6
510        ),
511        dhcp_enabled: response.dhcp_enabled != 0,
512        ipv4_addresses: parse_ipv4(&response.ipv4_addresses, response.ipv4_address_count)?
513            .zip(parse_ipv4(
514                &response.ipv4_subnets,
515                response.ipv4_address_count,
516            )?)
517            .zip(ipv4_origins)
518            .map(|((address, subnet), &origin)| Ipv4AddressInfo {
519                address,
520                subnet,
521                origin: parse_origin(origin),
522            })
523            .collect(),
524        ipv6_addresses: parse_ipv6(&response.ipv6_addresses, response.ipv6_address_count)?
525            .zip(
526                response
527                    .ipv6_subnets
528                    .get(..response.ipv6_address_count as usize)
529                    .context("invalid ipv6 address count")?,
530            )
531            .zip(ipv6_origins)
532            .map(|((address, &subnet), &origin)| Ipv6AddressInfo {
533                address,
534                subnet,
535                origin: parse_origin(origin),
536            })
537            .collect(),
538        ipv4_dns_servers: parse_ipv4(&response.ipv4_dns_servers, response.ipv4_dns_server_count)?
539            .collect(),
540        ipv6_dns_servers: parse_ipv6(&response.ipv6_dns_servers, response.ipv6_dns_server_count)?
541            .collect(),
542        ipv4_gateways: parse_ipv4(&response.ipv4_gateways, response.ipv4_gateway_count)?.collect(),
543        ipv6_gateways: parse_ipv6(&response.ipv6_gateways, response.ipv6_gateway_count)?.collect(),
544    };
545
546    Ok(info)
547}
548
549fn prepare_get_ip_info(
550    versions: &Versions,
551    adapter_id: &str,
552) -> anyhow::Result<Box<proto::KvpMessage2>> {
553    if versions.message_version < proto::KVP_VERSION_5 {
554        anyhow::bail!("non-binary protocol not supported");
555    }
556    let mut message = proto::MessageIpAddressInfoBinary::new_zeroed();
557    write_str(&mut message.adapter_id, adapter_id)?;
558    Ok(msg2(
559        proto::KvpOperation::GET_IP_ADDRESS_INFO,
560        proto::KvpPool::GUEST,
561        message,
562    ))
563}
564
565fn prepare_set_ip_info(
566    versions: &Versions,
567    adapter_id: &str,
568    info: &IpInfo,
569) -> anyhow::Result<Box<proto::KvpMessage2>> {
570    if versions.message_version < proto::KVP_VERSION_5 {
571        anyhow::bail!("non-binary protocol not supported");
572    }
573    let mut message = proto::MessageIpAddressInfoBinary::new_zeroed();
574    write_str(&mut message.adapter_id, adapter_id)?;
575    message.dhcp_enabled = info.dhcp_enabled as u8;
576    message.address_family = match (info.ipv4, info.ipv6) {
577        (true, true) => proto::AddressFamily::IPV4V6,
578        (true, false) => proto::AddressFamily::IPV4,
579        (false, true) => proto::AddressFamily::IPV6,
580        (false, false) => proto::AddressFamily::NONE,
581    };
582
583    for ((da, ds), a) in message
584        .ipv4_addresses
585        .get_mut(..info.ipv4_addresses.len())
586        .context("invalid ipv4 address count")?
587        .iter_mut()
588        .zip(&mut message.ipv4_subnets)
589        .zip(&info.ipv4_addresses)
590    {
591        da.0 = a.address.octets();
592        ds.0 = a.subnet.octets();
593    }
594    message.ipv4_address_count = info.ipv4_addresses.len() as u32;
595
596    for ((da, ds), a) in message
597        .ipv6_addresses
598        .get_mut(..info.ipv6_addresses.len())
599        .context("invalid ipv6 address count")?
600        .iter_mut()
601        .zip(&mut message.ipv6_subnets)
602        .zip(&info.ipv6_addresses)
603    {
604        da.0 = a.address.octets();
605        *ds = a.subnet;
606    }
607    message.ipv6_address_count = info.ipv6_addresses.len() as u32;
608
609    for (da, a) in message
610        .ipv4_gateways
611        .get_mut(..info.ipv4_gateways.len())
612        .context("invalid ipv4 gateway count")?
613        .iter_mut()
614        .zip(&info.ipv4_gateways)
615    {
616        da.0 = a.octets();
617    }
618    message.ipv4_gateway_count = info.ipv4_gateways.len() as u32;
619
620    for (da, a) in message
621        .ipv6_gateways
622        .get_mut(..info.ipv6_gateways.len())
623        .context("invalid ipv6 gateway count")?
624        .iter_mut()
625        .zip(&info.ipv6_gateways)
626    {
627        da.0 = a.octets();
628    }
629    message.ipv6_gateway_count = info.ipv6_gateways.len() as u32;
630
631    for (da, a) in message
632        .ipv4_dns_servers
633        .get_mut(..info.ipv4_dns_servers.len())
634        .context("invalid ipv4 dns server count")?
635        .iter_mut()
636        .zip(&info.ipv4_dns_servers)
637    {
638        da.0 = a.octets();
639    }
640    message.ipv4_dns_server_count = info.ipv4_dns_servers.len() as u32;
641
642    for (da, a) in message
643        .ipv6_dns_servers
644        .get_mut(..info.ipv6_dns_servers.len())
645        .context("invalid ipv6 dns server count")?
646        .iter_mut()
647        .zip(&info.ipv6_dns_servers)
648    {
649        da.0 = a.octets();
650    }
651    message.ipv6_dns_server_count = info.ipv6_dns_servers.len() as u32;
652
653    Ok(msg2(
654        proto::KvpOperation::SET_IP_ADDRESS_INFO,
655        proto::KvpPool::GUEST,
656        message,
657    ))
658}
659
660#[derive(Debug, Error)]
661#[error("KVP error: {0:x?}")]
662struct RequestError(Status);
663
664fn pool_cvt(pool: hyperv_ic_resources::kvp::KvpPool) -> proto::KvpPool {
665    match pool {
666        hyperv_ic_resources::kvp::KvpPool::External => proto::KvpPool::EXTERNAL,
667        hyperv_ic_resources::kvp::KvpPool::Guest => proto::KvpPool::GUEST,
668        hyperv_ic_resources::kvp::KvpPool::Auto => proto::KvpPool::AUTO,
669        hyperv_ic_resources::kvp::KvpPool::AutoExternal => proto::KvpPool::AUTO_EXTERNAL,
670    }
671}
672
673fn msg<T: IntoBytes + Immutable>(
674    operation: proto::KvpOperation,
675    pool: proto::KvpPool,
676    message: T,
677) -> Box<proto::KvpMessage> {
678    let mut m = Box::new(proto::KvpMessage {
679        header: proto::KvpHeader { operation, pool },
680        data: [0; 2578],
681    });
682    let offset = align_of::<T>().saturating_sub(size_of::<proto::KvpHeader>());
683    message.write_to_prefix(&mut m.data[offset..]).unwrap();
684    m
685}
686
687fn msg2<T: IntoBytes + Immutable>(
688    operation: proto::KvpOperation,
689    pool: proto::KvpPool,
690    message: T,
691) -> Box<proto::KvpMessage2> {
692    let mut m = Box::new(proto::KvpMessage2 {
693        header: proto::KvpHeader { operation, pool },
694        data: [0; 0x1d02],
695    });
696    let offset = align_of::<T>().saturating_sub(size_of::<proto::KvpHeader>());
697    message.write_to_prefix(&mut m.data[offset..]).unwrap();
698    m
699}
700
701fn parse_str(v: &[u16], n: u32) -> anyhow::Result<String> {
702    if n % 2 != 0 {
703        anyhow::bail!("invalid string length");
704    }
705    let v = v.get(..n as usize / 2).context("string length too large")?;
706    if v.last() != Some(&0) {
707        anyhow::bail!("missing null terminator");
708    }
709    String::from_utf16(&v[..v.len() - 1]).context("invalid utf-16")
710}
711
712fn write_str(v: &mut [u16], s: &str) -> anyhow::Result<u32> {
713    let mut i = 0;
714    for (s, d) in s.encode_utf16().zip(&mut *v) {
715        *d = s;
716        i += 1;
717    }
718    *v.get_mut(i).context("string too long")? = 0;
719    Ok((i + 1) as u32 * 2)
720}
721
722fn write_value(buf: &mut [u8], v: &Value) -> anyhow::Result<(proto::ValueType, u32)> {
723    let r = match *v {
724        Value::String(ref s) => (
725            proto::ValueType::STRING,
726            write_str(<[u16]>::mut_from_bytes(buf).unwrap(), s)?,
727        ),
728        Value::U32(v) => {
729            buf[..4].copy_from_slice(&v.to_le_bytes());
730            (proto::ValueType::DWORD, 4)
731        }
732        Value::U64(v) => {
733            buf[..8].copy_from_slice(&v.to_le_bytes());
734            (proto::ValueType::QWORD, 8)
735        }
736    };
737    Ok(r)
738}