hyperv_ic/
kvp.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! KVP IC device.
5
6use crate::common::IcPipe;
7use crate::common::NegotiateState;
8use crate::common::Versions;
9use anyhow::Context;
10use async_trait::async_trait;
11use futures::FutureExt;
12use futures::StreamExt;
13use futures::stream::FusedStream;
14use futures::stream::once;
15use futures_concurrency::stream::Merge;
16use hyperv_ic_protocol::HeaderFlags;
17use hyperv_ic_protocol::Status;
18use hyperv_ic_protocol::kvp as proto;
19use hyperv_ic_resources::kvp::AddressOrigin;
20use hyperv_ic_resources::kvp::IpInfo;
21use hyperv_ic_resources::kvp::Ipv4AddressInfo;
22use hyperv_ic_resources::kvp::Ipv6AddressInfo;
23use hyperv_ic_resources::kvp::KeyValue;
24use hyperv_ic_resources::kvp::KvpConnectRpc;
25use hyperv_ic_resources::kvp::KvpRpc;
26use hyperv_ic_resources::kvp::Value;
27use inspect::Inspect;
28use inspect::InspectMut;
29use mesh::rpc::FailableRpc;
30use std::pin::pin;
31use task_control::Cancelled;
32use task_control::StopTask;
33use thiserror::Error;
34use vmbus_channel::RawAsyncChannel;
35use vmbus_channel::bus::ChannelType;
36use vmbus_channel::bus::OfferParams;
37use vmbus_channel::channel::ChannelOpenError;
38use vmbus_channel::gpadl_ring::GpadlRingMem;
39use vmbus_channel::simple::SaveRestoreSimpleVmbusDevice;
40use vmbus_channel::simple::SimpleVmbusDevice;
41use vmcore::save_restore::NoSavedState;
42use zerocopy::FromBytes;
43use zerocopy::FromZeros;
44use zerocopy::Immutable;
45use zerocopy::IntoBytes;
46
47const KVP_VERSIONS: &[hyperv_ic_protocol::Version] = &[
48    proto::KVP_VERSION_3,
49    proto::KVP_VERSION_4,
50    proto::KVP_VERSION_5,
51];
52
53/// KVP IC device.
54#[derive(InspectMut)]
55pub struct KvpIc {
56    #[inspect(skip)]
57    recv: mesh::Receiver<KvpConnectRpc>,
58    #[inspect(skip)]
59    wait_ready: Vec<FailableRpc<(), (mesh::Sender<KvpRpc>, mesh::OneshotReceiver<()>)>>,
60}
61
62#[doc(hidden)]
63#[derive(InspectMut)]
64pub struct KvpChannel {
65    #[inspect(mut)]
66    pipe: IcPipe,
67    state: ChannelState,
68}
69
70#[derive(Inspect)]
71#[inspect(external_tag)]
72enum ChannelState {
73    Negotiate(#[inspect(rename = "state")] NegotiateState),
74    Ready {
75        versions: Versions,
76        #[inspect(with = "|x| x.len()")]
77        clients: Vec<mesh::OneshotSender<()>>,
78        #[inspect(skip)]
79        rpc_recv: mesh::Receiver<KvpRpc>,
80        state: ReadyState,
81    },
82    Failed,
83}
84
85#[derive(Inspect)]
86#[inspect(external_tag)]
87enum ReadyState {
88    Ready,
89    SendingRequest(#[inspect(skip)] KvpRpc),
90    WaitingResponse(#[inspect(skip)] KvpRpc),
91}
92
93#[async_trait]
94impl SimpleVmbusDevice for KvpIc {
95    type SavedState = NoSavedState;
96    type Runner = KvpChannel;
97
98    fn offer(&self) -> OfferParams {
99        OfferParams {
100            interface_name: "kvp_ic".to_owned(),
101            instance_id: proto::INSTANCE_ID,
102            interface_id: proto::INTERFACE_ID,
103            channel_type: ChannelType::Pipe { message_mode: true },
104            ..Default::default()
105        }
106    }
107
108    fn inspect(&mut self, req: inspect::Request<'_>, runner: Option<&mut Self::Runner>) {
109        req.respond().merge(self).merge(runner);
110    }
111
112    fn open(
113        &mut self,
114        channel: RawAsyncChannel<GpadlRingMem>,
115        _guest_memory: guestmem::GuestMemory,
116    ) -> Result<Self::Runner, ChannelOpenError> {
117        KvpChannel::new(channel, None)
118    }
119
120    async fn run(
121        &mut self,
122        stop: &mut StopTask<'_>,
123        runner: &mut Self::Runner,
124    ) -> Result<(), Cancelled> {
125        stop.until_stopped(async { runner.process(self).await })
126            .await
127    }
128
129    fn supports_save_restore(
130        &mut self,
131    ) -> Option<
132        &mut dyn SaveRestoreSimpleVmbusDevice<SavedState = Self::SavedState, Runner = Self::Runner>,
133    > {
134        None
135    }
136}
137
138impl KvpIc {
139    /// Create a new KVP IC device.
140    pub fn new(recv: mesh::Receiver<KvpConnectRpc>) -> Self {
141        Self {
142            recv,
143            wait_ready: Vec::new(),
144        }
145    }
146}
147
148impl KvpChannel {
149    fn new(
150        channel: RawAsyncChannel<GpadlRingMem>,
151        restore_state: Option<ChannelState>,
152    ) -> Result<Self, ChannelOpenError> {
153        let pipe = IcPipe::new(channel)?;
154        Ok(Self {
155            pipe,
156            state: restore_state.unwrap_or(ChannelState::Negotiate(NegotiateState::default())),
157        })
158    }
159
160    async fn process(&mut self, ic: &mut KvpIc) -> ! {
161        enum Event {
162            StateMachine(anyhow::Result<()>),
163            Request(KvpConnectRpc),
164        }
165
166        loop {
167            let event = pin!(
168                (
169                    once(
170                        self.process_state_machine(&mut ic.wait_ready)
171                            .map(Event::StateMachine)
172                    ),
173                    (&mut ic.recv).map(Event::Request),
174                )
175                    .merge()
176            )
177            .next()
178            .await
179            .unwrap();
180            match event {
181                Event::StateMachine(r) => {
182                    if let Err(err) = r {
183                        tracing::error!(
184                            error = err.as_ref() as &dyn std::error::Error,
185                            "kvp ic error"
186                        );
187                        self.state = ChannelState::Failed;
188                        for rpc in ic.wait_ready.drain(..) {
189                            rpc.fail(anyhow::anyhow!("kvp channel failed"));
190                        }
191                    }
192                }
193                Event::Request(req) => match req {
194                    KvpConnectRpc::WaitForGuest(rpc) => match &mut self.state {
195                        ChannelState::Negotiate(_) => ic.wait_ready.push(rpc),
196                        ChannelState::Ready {
197                            clients, rpc_recv, ..
198                        } => {
199                            let rpc_send = rpc_recv.sender();
200                            let (send, recv) = mesh::oneshot();
201                            clients.retain(|c| !c.is_closed());
202                            clients.push(send);
203                            rpc.complete(Ok((rpc_send, recv)));
204                        }
205                        ChannelState::Failed => {
206                            rpc.fail(anyhow::anyhow!("kvp channel failed"));
207                        }
208                    },
209                },
210            }
211        }
212    }
213
214    async fn process_state_machine(
215        &mut self,
216        wait_ready: &mut Vec<FailableRpc<(), (mesh::Sender<KvpRpc>, mesh::OneshotReceiver<()>)>>,
217    ) -> anyhow::Result<()> {
218        match self.state {
219            ChannelState::Negotiate(ref mut state) => {
220                if let Some(versions) = self.pipe.negotiate(state, KVP_VERSIONS).await? {
221                    let mut rpc_recv = mesh::Receiver::new();
222                    let clients = wait_ready
223                        .drain(..)
224                        .map(|rpc| {
225                            let rpc_send = rpc_recv.sender();
226                            let (send, recv) = mesh::oneshot();
227                            rpc.complete(Ok((rpc_send, recv)));
228                            send
229                        })
230                        .collect();
231
232                    self.state = ChannelState::Ready {
233                        versions,
234                        clients,
235                        rpc_recv,
236                        state: ReadyState::Ready,
237                    };
238                }
239            }
240            ChannelState::Ready {
241                ref versions,
242                rpc_recv: ref mut recv,
243                ref mut state,
244                clients: _,
245            } => match state {
246                ReadyState::Ready => {
247                    if recv.is_terminated() {
248                        return std::future::pending().await;
249                    }
250                    if let Some(rpc) = recv.next().await {
251                        *state = ReadyState::SendingRequest(rpc);
252                    }
253                }
254                ReadyState::SendingRequest(rpc) => {
255                    match rpc {
256                        KvpRpc::Set(rpc) => {
257                            let params = rpc.input();
258                            let mut message = proto::MessageGetSet::new_zeroed();
259                            message.value.key_size =
260                                write_str(&mut message.value.key, &params.key)?;
261
262                            (message.value.value_type, message.value.value_size) =
263                                write_value(&mut message.value.value, &params.value)?;
264
265                            self.pipe
266                                .write_message(
267                                    versions,
268                                    hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
269                                    HeaderFlags::new().with_request(true).with_transaction(true),
270                                    msg(proto::KvpOperation::SET, pool_cvt(params.pool), message)
271                                        .as_bytes(),
272                                )
273                                .await?;
274                        }
275                        KvpRpc::Delete(rpc) => {
276                            let params = rpc.input();
277                            let mut message = proto::MessageDelete::new_zeroed();
278                            message.key_size = write_str(&mut message.key, &params.key)?;
279                            self.pipe
280                                .write_message(
281                                    versions,
282                                    hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
283                                    HeaderFlags::new().with_request(true).with_transaction(true),
284                                    msg(
285                                        proto::KvpOperation::DELETE,
286                                        pool_cvt(params.pool),
287                                        message,
288                                    )
289                                    .as_bytes(),
290                                )
291                                .await?;
292                        }
293                        KvpRpc::Enumerate(rpc) => {
294                            let params = rpc.input();
295                            let message = proto::MessageEnumerate {
296                                index: params.index,
297                                value: proto::Value::new_zeroed(),
298                            };
299
300                            self.pipe
301                                .write_message(
302                                    versions,
303                                    hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
304                                    HeaderFlags::new().with_request(true).with_transaction(true),
305                                    msg(
306                                        proto::KvpOperation::ENUMERATE,
307                                        pool_cvt(params.pool),
308                                        message,
309                                    )
310                                    .as_bytes(),
311                                )
312                                .await?;
313                        }
314                        KvpRpc::GetIpInfo(rpc) => {
315                            let params = rpc.input();
316                            let message = match prepare_get_ip_info(versions, &params.adapter_id) {
317                                Ok(message) => message,
318                                Err(err) => {
319                                    let ReadyState::SendingRequest(KvpRpc::GetIpInfo(rpc)) =
320                                        std::mem::replace(state, ReadyState::Ready)
321                                    else {
322                                        unreachable!()
323                                    };
324                                    rpc.fail(err);
325                                    return Ok(());
326                                }
327                            };
328                            self.pipe
329                                .write_message(
330                                    versions,
331                                    hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
332                                    HeaderFlags::new().with_request(true).with_transaction(true),
333                                    message.as_bytes(),
334                                )
335                                .await?;
336                        }
337                        KvpRpc::SetIpInfo(rpc) => {
338                            let params = rpc.input();
339                            let message = match prepare_set_ip_info(
340                                versions,
341                                &params.adapter_id,
342                                &params.info,
343                            ) {
344                                Ok(message) => message,
345                                Err(err) => {
346                                    let ReadyState::SendingRequest(KvpRpc::SetIpInfo(rpc)) =
347                                        std::mem::replace(state, ReadyState::Ready)
348                                    else {
349                                        unreachable!()
350                                    };
351                                    rpc.fail(err);
352                                    return Ok(());
353                                }
354                            };
355                            self.pipe
356                                .write_message(
357                                    versions,
358                                    hyperv_ic_protocol::MessageType::KVP_EXCHANGE,
359                                    HeaderFlags::new().with_request(true).with_transaction(true),
360                                    message.as_bytes(),
361                                )
362                                .await?;
363                        }
364                    }
365                    let ReadyState::SendingRequest(rpc) =
366                        std::mem::replace(state, ReadyState::Ready)
367                    else {
368                        unreachable!()
369                    };
370                    *state = ReadyState::WaitingResponse(rpc);
371                }
372                ReadyState::WaitingResponse(_) => {
373                    let (status, response) = self.pipe.read_response().await?;
374                    let r = if status == Status::SUCCESS {
375                        Ok(())
376                    } else {
377                        Err(RequestError(status))
378                    };
379                    let ReadyState::WaitingResponse(rpc) =
380                        std::mem::replace(state, ReadyState::Ready)
381                    else {
382                        unreachable!()
383                    };
384                    match rpc {
385                        KvpRpc::Set(rpc) => rpc.handle_failable_sync(|_| r),
386                        KvpRpc::Delete(rpc) => rpc.handle_failable_sync(|_| r),
387                        KvpRpc::Enumerate(rpc) => match r {
388                            Ok(()) => {
389                                let v = parse_enumerate_response(response)?;
390                                rpc.complete(Ok(Some(v)));
391                            }
392                            Err(RequestError(Status::NO_MORE_ITEMS)) => {
393                                rpc.complete(Ok(None));
394                            }
395                            Err(err) => rpc.fail(err),
396                        },
397                        KvpRpc::GetIpInfo(rpc) => match r {
398                            Ok(()) => {
399                                let v = parse_ip_info_response(response)?;
400                                rpc.complete(Ok(v))
401                            }
402                            Err(err) => rpc.fail(err),
403                        },
404                        KvpRpc::SetIpInfo(rpc) => rpc.handle_failable_sync(|_| r),
405                    }
406                }
407            },
408            ChannelState::Failed => std::future::pending().await,
409        }
410        Ok(())
411    }
412}
413
414fn parse_response<T: FromBytes + Immutable>(response: &[u8]) -> Result<T, anyhow::Error> {
415    let response = response
416        .get(align_of::<T>().max(size_of::<proto::KvpHeader>())..)
417        .context("missing header")?;
418    let (response, _) = T::read_from_prefix(response)
419        .ok()
420        .context("missing response")?;
421    Ok(response)
422}
423
424fn parse_enumerate_response(response: &[u8]) -> Result<KeyValue, anyhow::Error> {
425    let response = parse_response::<proto::MessageEnumerate>(response)?;
426    let key = parse_str(&response.value.key, response.value.key_size)?;
427    let value = match response.value.value_type {
428        proto::ValueType::DWORD => {
429            if response.value.value_size != 4 {
430                anyhow::bail!("invalid dword value size");
431            }
432            let value = u32::from_le_bytes(response.value.value[..4].try_into().unwrap());
433            Value::U32(value)
434        }
435        proto::ValueType::QWORD => {
436            if response.value.value_size != 8 {
437                anyhow::bail!("invalid qword value size");
438            }
439            let value = u64::from_le_bytes(response.value.value[..8].try_into().unwrap());
440            Value::U64(value)
441        }
442        proto::ValueType::STRING | proto::ValueType::EXPAND_STRING => {
443            let value = parse_str(
444                <[u16]>::ref_from_bytes(&response.value.value).unwrap(),
445                response.value.value_size,
446            )?;
447            Value::String(value)
448        }
449        proto::ValueType(v) => {
450            anyhow::bail!("invalid value type {v:#x}")
451        }
452    };
453
454    Ok(KeyValue { key, value })
455}
456
457fn parse_ip_info_response(response: &[u8]) -> Result<IpInfo, anyhow::Error> {
458    fn parse_ipv4(
459        addresses: &[proto::IpAddressV4],
460        count: u32,
461    ) -> anyhow::Result<impl Iterator<Item = std::net::Ipv4Addr> + '_> {
462        anyhow::Ok(
463            addresses
464                .get(..count as usize)
465                .context("invalid ipv4 address count")?
466                .iter()
467                .map(|x| x.0.into()),
468        )
469    }
470    fn parse_ipv6(
471        addresses: &[proto::IpAddressV6],
472        count: u32,
473    ) -> anyhow::Result<impl Iterator<Item = std::net::Ipv6Addr> + '_> {
474        anyhow::Ok(
475            addresses
476                .get(..count as usize)
477                .context("invalid ipv6 address count")?
478                .iter()
479                .map(|x| x.0.into()),
480        )
481    }
482    fn parse_origin(origin: proto::IpAddressOrigin) -> AddressOrigin {
483        match origin {
484            proto::IpAddressOrigin::UNKNOWN => AddressOrigin::Unknown,
485            proto::IpAddressOrigin::OTHER => AddressOrigin::Other,
486            proto::IpAddressOrigin::STATIC => AddressOrigin::Static,
487            _ => AddressOrigin::Unknown,
488        }
489    }
490
491    let response = parse_response::<proto::MessageIpAddressInfoBinary>(response)?;
492    let (ipv4_origins, rest) = response
493        .ip_address_origins
494        .split_at_checked(response.ipv4_address_count as usize)
495        .context("invalid ipv4 address count")?;
496
497    let ipv6_origins = rest
498        .get(..response.ipv6_address_count as usize)
499        .context("invalid ipv6 address count")?;
500
501    let info = IpInfo {
502        ipv4: matches!(
503            response.address_family,
504            proto::AddressFamily::IPV4 | proto::AddressFamily::IPV4V6
505        ),
506        ipv6: matches!(
507            response.address_family,
508            proto::AddressFamily::IPV6 | proto::AddressFamily::IPV4V6
509        ),
510        dhcp_enabled: response.dhcp_enabled != 0,
511        ipv4_addresses: parse_ipv4(&response.ipv4_addresses, response.ipv4_address_count)?
512            .zip(parse_ipv4(
513                &response.ipv4_subnets,
514                response.ipv4_address_count,
515            )?)
516            .zip(ipv4_origins)
517            .map(|((address, subnet), &origin)| Ipv4AddressInfo {
518                address,
519                subnet,
520                origin: parse_origin(origin),
521            })
522            .collect(),
523        ipv6_addresses: parse_ipv6(&response.ipv6_addresses, response.ipv6_address_count)?
524            .zip(
525                response
526                    .ipv6_subnets
527                    .get(..response.ipv6_address_count as usize)
528                    .context("invalid ipv6 address count")?,
529            )
530            .zip(ipv6_origins)
531            .map(|((address, &subnet), &origin)| Ipv6AddressInfo {
532                address,
533                subnet,
534                origin: parse_origin(origin),
535            })
536            .collect(),
537        ipv4_dns_servers: parse_ipv4(&response.ipv4_dns_servers, response.ipv4_dns_server_count)?
538            .collect(),
539        ipv6_dns_servers: parse_ipv6(&response.ipv6_dns_servers, response.ipv6_dns_server_count)?
540            .collect(),
541        ipv4_gateways: parse_ipv4(&response.ipv4_gateways, response.ipv4_gateway_count)?.collect(),
542        ipv6_gateways: parse_ipv6(&response.ipv6_gateways, response.ipv6_gateway_count)?.collect(),
543    };
544
545    Ok(info)
546}
547
548fn prepare_get_ip_info(
549    versions: &Versions,
550    adapter_id: &str,
551) -> anyhow::Result<Box<proto::KvpMessage2>> {
552    if versions.message_version < proto::KVP_VERSION_5 {
553        anyhow::bail!("non-binary protocol not supported");
554    }
555    let mut message = proto::MessageIpAddressInfoBinary::new_zeroed();
556    write_str(&mut message.adapter_id, adapter_id)?;
557    Ok(msg2(
558        proto::KvpOperation::GET_IP_ADDRESS_INFO,
559        proto::KvpPool::GUEST,
560        message,
561    ))
562}
563
564fn prepare_set_ip_info(
565    versions: &Versions,
566    adapter_id: &str,
567    info: &IpInfo,
568) -> anyhow::Result<Box<proto::KvpMessage2>> {
569    if versions.message_version < proto::KVP_VERSION_5 {
570        anyhow::bail!("non-binary protocol not supported");
571    }
572    let mut message = proto::MessageIpAddressInfoBinary::new_zeroed();
573    write_str(&mut message.adapter_id, adapter_id)?;
574    message.dhcp_enabled = info.dhcp_enabled as u8;
575    message.address_family = match (info.ipv4, info.ipv6) {
576        (true, true) => proto::AddressFamily::IPV4V6,
577        (true, false) => proto::AddressFamily::IPV4,
578        (false, true) => proto::AddressFamily::IPV6,
579        (false, false) => proto::AddressFamily::NONE,
580    };
581
582    for ((da, ds), a) in message
583        .ipv4_addresses
584        .get_mut(..info.ipv4_addresses.len())
585        .context("invalid ipv4 address count")?
586        .iter_mut()
587        .zip(&mut message.ipv4_subnets)
588        .zip(&info.ipv4_addresses)
589    {
590        da.0 = a.address.octets();
591        ds.0 = a.subnet.octets();
592    }
593    message.ipv4_address_count = info.ipv4_addresses.len() as u32;
594
595    for ((da, ds), a) in message
596        .ipv6_addresses
597        .get_mut(..info.ipv6_addresses.len())
598        .context("invalid ipv6 address count")?
599        .iter_mut()
600        .zip(&mut message.ipv6_subnets)
601        .zip(&info.ipv6_addresses)
602    {
603        da.0 = a.address.octets();
604        *ds = a.subnet;
605    }
606    message.ipv6_address_count = info.ipv6_addresses.len() as u32;
607
608    for (da, a) in message
609        .ipv4_gateways
610        .get_mut(..info.ipv4_gateways.len())
611        .context("invalid ipv4 gateway count")?
612        .iter_mut()
613        .zip(&info.ipv4_gateways)
614    {
615        da.0 = a.octets();
616    }
617    message.ipv4_gateway_count = info.ipv4_gateways.len() as u32;
618
619    for (da, a) in message
620        .ipv6_gateways
621        .get_mut(..info.ipv6_gateways.len())
622        .context("invalid ipv6 gateway count")?
623        .iter_mut()
624        .zip(&info.ipv6_gateways)
625    {
626        da.0 = a.octets();
627    }
628    message.ipv6_gateway_count = info.ipv6_gateways.len() as u32;
629
630    for (da, a) in message
631        .ipv4_dns_servers
632        .get_mut(..info.ipv4_dns_servers.len())
633        .context("invalid ipv4 dns server count")?
634        .iter_mut()
635        .zip(&info.ipv4_dns_servers)
636    {
637        da.0 = a.octets();
638    }
639    message.ipv4_dns_server_count = info.ipv4_dns_servers.len() as u32;
640
641    for (da, a) in message
642        .ipv6_dns_servers
643        .get_mut(..info.ipv6_dns_servers.len())
644        .context("invalid ipv6 dns server count")?
645        .iter_mut()
646        .zip(&info.ipv6_dns_servers)
647    {
648        da.0 = a.octets();
649    }
650    message.ipv6_dns_server_count = info.ipv6_dns_servers.len() as u32;
651
652    Ok(msg2(
653        proto::KvpOperation::SET_IP_ADDRESS_INFO,
654        proto::KvpPool::GUEST,
655        message,
656    ))
657}
658
659#[derive(Debug, Error)]
660#[error("KVP error: {0:x?}")]
661struct RequestError(Status);
662
663fn pool_cvt(pool: hyperv_ic_resources::kvp::KvpPool) -> proto::KvpPool {
664    match pool {
665        hyperv_ic_resources::kvp::KvpPool::External => proto::KvpPool::EXTERNAL,
666        hyperv_ic_resources::kvp::KvpPool::Guest => proto::KvpPool::GUEST,
667        hyperv_ic_resources::kvp::KvpPool::Auto => proto::KvpPool::AUTO,
668        hyperv_ic_resources::kvp::KvpPool::AutoExternal => proto::KvpPool::AUTO_EXTERNAL,
669    }
670}
671
672fn msg<T: IntoBytes + Immutable>(
673    operation: proto::KvpOperation,
674    pool: proto::KvpPool,
675    message: T,
676) -> Box<proto::KvpMessage> {
677    let mut m = Box::new(proto::KvpMessage {
678        header: proto::KvpHeader { operation, pool },
679        data: [0; 2578],
680    });
681    let offset = align_of::<T>().saturating_sub(size_of::<proto::KvpHeader>());
682    message.write_to_prefix(&mut m.data[offset..]).unwrap();
683    m
684}
685
686fn msg2<T: IntoBytes + Immutable>(
687    operation: proto::KvpOperation,
688    pool: proto::KvpPool,
689    message: T,
690) -> Box<proto::KvpMessage2> {
691    let mut m = Box::new(proto::KvpMessage2 {
692        header: proto::KvpHeader { operation, pool },
693        data: [0; 0x1d02],
694    });
695    let offset = align_of::<T>().saturating_sub(size_of::<proto::KvpHeader>());
696    message.write_to_prefix(&mut m.data[offset..]).unwrap();
697    m
698}
699
700fn parse_str(v: &[u16], n: u32) -> anyhow::Result<String> {
701    if n % 2 != 0 {
702        anyhow::bail!("invalid string length");
703    }
704    let v = v.get(..n as usize / 2).context("string length too large")?;
705    if v.last() != Some(&0) {
706        anyhow::bail!("missing null terminator");
707    }
708    String::from_utf16(&v[..v.len() - 1]).context("invalid utf-16")
709}
710
711fn write_str(v: &mut [u16], s: &str) -> anyhow::Result<u32> {
712    let mut i = 0;
713    for (s, d) in s.encode_utf16().zip(&mut *v) {
714        *d = s;
715        i += 1;
716    }
717    *v.get_mut(i).context("string too long")? = 0;
718    Ok((i + 1) as u32 * 2)
719}
720
721fn write_value(buf: &mut [u8], v: &Value) -> anyhow::Result<(proto::ValueType, u32)> {
722    let r = match *v {
723        Value::String(ref s) => (
724            proto::ValueType::STRING,
725            write_str(<[u16]>::mut_from_bytes(buf).unwrap(), s)?,
726        ),
727        Value::U32(v) => {
728            buf[..4].copy_from_slice(&v.to_le_bytes());
729            (proto::ValueType::DWORD, 4)
730        }
731        Value::U64(v) => {
732            buf[..8].copy_from_slice(&v.to_le_bytes());
733            (proto::ValueType::QWORD, 8)
734        }
735    };
736    Ok(r)
737}