consomme/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! The Consomme user-mode TCP stack.
5//!
6//! This crate implements a user-mode TCP stack designed for use with
7//! virtualization. The guest operating system sends Ethernet frames, and this
8//! crate parses them and distributes the data streams to individual TCP and UDP
9//! sockets.
10//!
11//! The current implementation supports OS-backed TCP and UDP sockets,
12//! essentially causing this stack to act as a NAT implementation, providing
13//! guest OS networking by leveraging the host's network stack.
14//!
15//! This implementation includes a small DHCP server for address assignment.
16
17mod arp;
18mod dhcp;
19#[cfg_attr(unix, path = "dns_unix.rs")]
20#[cfg_attr(windows, path = "dns_windows.rs")]
21mod dns;
22mod tcp;
23mod udp;
24mod windows;
25
26use inspect::InspectMut;
27use mesh::rpc::Rpc;
28use mesh::rpc::RpcError;
29use mesh::rpc::RpcSend;
30use pal_async::driver::Driver;
31use smoltcp::phy::Checksum;
32use smoltcp::phy::ChecksumCapabilities;
33use smoltcp::wire::DhcpMessageType;
34use smoltcp::wire::EthernetAddress;
35use smoltcp::wire::EthernetFrame;
36use smoltcp::wire::EthernetProtocol;
37use smoltcp::wire::EthernetRepr;
38use smoltcp::wire::IPV4_HEADER_LEN;
39use smoltcp::wire::IpProtocol;
40use smoltcp::wire::Ipv4Address;
41use smoltcp::wire::Ipv4Packet;
42use std::net::Ipv4Addr;
43use std::net::SocketAddrV4;
44use std::task::Context;
45use std::task::Poll;
46use thiserror::Error;
47
48/// Error type returned from some dynamic update functions like bind_port.
49#[derive(Debug, Error)]
50pub enum ConsommeMessageError {
51    /// Communication error with running instance.
52    #[error("communication error")]
53    Mesh(RpcError),
54    /// Error executing request on current network instance.
55    #[error("network err")]
56    Network(DropReason),
57}
58
59/// Callback to modify network state dynamically.
60pub type ConsommeStateUpdateFn = Box<dyn Fn(&mut ConsommeState) + Send>;
61
62struct MessageBindPort {
63    protocol: IpProtocol,
64    address: Option<Ipv4Addr>,
65    port: u16,
66}
67
68enum ConsommeMessage {
69    BindPort(Rpc<MessageBindPort, Result<(), DropReason>>),
70    UnbindPort(Rpc<MessageBindPort, Result<(), DropReason>>),
71    UpdateState(Rpc<ConsommeStateUpdateFn, ()>),
72}
73
74/// Provide dynamic updates during runtime.
75pub struct ConsommeControl {
76    send: mesh::Sender<ConsommeMessage>,
77}
78
79impl ConsommeControl {
80    /// Binds a port to receive incoming packets.
81    pub async fn bind_port(
82        &self,
83        protocol: IpProtocol,
84        ip_addr: Option<Ipv4Addr>,
85        port: u16,
86    ) -> Result<(), ConsommeMessageError> {
87        self.send
88            .call(
89                ConsommeMessage::BindPort,
90                MessageBindPort {
91                    protocol,
92                    address: ip_addr,
93                    port,
94                },
95            )
96            .await
97            .map_err(ConsommeMessageError::Mesh)?
98            .map_err(ConsommeMessageError::Network)
99    }
100
101    /// Unbinds a port previously reserved with bind_port()
102    pub async fn unbind_port(
103        &self,
104        protocol: IpProtocol,
105        port: u16,
106    ) -> Result<(), ConsommeMessageError> {
107        self.send
108            .call(
109                ConsommeMessage::UnbindPort,
110                MessageBindPort {
111                    protocol,
112                    address: None,
113                    port,
114                },
115            )
116            .await
117            .map_err(ConsommeMessageError::Mesh)?
118            .map_err(ConsommeMessageError::Network)
119    }
120
121    /// Updates dynamic network state
122    pub async fn update_state(&self, f: ConsommeStateUpdateFn) -> Result<(), ConsommeMessageError> {
123        self.send
124            .call(ConsommeMessage::UpdateState, f)
125            .await
126            .map_err(ConsommeMessageError::Mesh)
127    }
128}
129
130/// A consomme instance.
131pub struct Consomme {
132    state: ConsommeState,
133    recv: Option<mesh::Receiver<ConsommeMessage>>,
134    tcp: tcp::Tcp,
135    udp: udp::Udp,
136}
137
138impl InspectMut for Consomme {
139    fn inspect_mut(&mut self, req: inspect::Request<'_>) {
140        req.respond()
141            .field("tcp", &self.tcp)
142            .field_mut("udp", &mut self.udp);
143    }
144}
145
146/// Dynamic networking properties of a consomme endpoint.
147pub struct ConsommeState {
148    /// Current IPv4 network mask.
149    pub net_mask: Ipv4Address,
150    /// Current Ipv4 gateway address.
151    pub gateway_ip: Ipv4Address,
152    /// Current Ipv4 gateway MAC address.
153    pub gateway_mac: EthernetAddress,
154    /// Current Ipv4 address assigned to endpoint.
155    pub client_ip: Ipv4Address,
156    /// Current client MAC address.
157    pub client_mac: EthernetAddress,
158    /// Current list of DNS resolvers.
159    pub nameservers: Vec<Ipv4Address>,
160    /// Buffer for packet processing
161    buffer: Box<[u8]>,
162}
163
164/// An error indicating that the CIDR is invalid.
165#[derive(Debug, Error)]
166#[error("invalid CIDR")]
167pub struct InvalidCidr;
168
169impl ConsommeState {
170    /// Create default dynamic network state. The default state is
171    ///     IP address: 10.0.0.2 / 24
172    ///     gateway: 10.0.0.1 with MAC address 52-55-10-0-0-1
173    ///     no DNS resolvers
174    pub fn new() -> Result<Self, Error> {
175        let nameservers = dns::nameservers()?;
176        Ok(Self {
177            gateway_ip: Ipv4Address::new(10, 0, 0, 1),
178            gateway_mac: EthernetAddress([0x52, 0x55, 10, 0, 0, 1]),
179            client_ip: Ipv4Address::new(10, 0, 0, 2),
180            client_mac: EthernetAddress([0x0, 0x0, 0x0, 0x0, 0x1, 0x0]),
181            net_mask: Ipv4Address::new(255, 255, 255, 0),
182            nameservers,
183            buffer: Box::new([0; 65535]),
184        })
185    }
186
187    /// Sets the cidr for the network.
188    ///
189    /// Setting, for example, 192.168.0.0/24 will set the gateway to
190    /// 192.168.0.1 and the client IP to 192.168.0.2.
191    pub fn set_cidr(&mut self, cidr: &str) -> Result<(), InvalidCidr> {
192        let cidr: smoltcp::wire::Ipv4Cidr = cidr.parse().map_err(|()| InvalidCidr)?;
193        let base_address = cidr.network().address();
194        self.gateway_ip = base_address;
195        self.gateway_ip.0[3] += 1;
196        self.client_ip = base_address;
197        self.client_ip.0[3] += 2;
198        self.net_mask = cidr.netmask();
199        Ok(())
200    }
201}
202
203/// An accessor for consomme.
204pub struct Access<'a, T> {
205    inner: &'a mut Consomme,
206    client: &'a mut T,
207}
208
209/// A consomme client.
210pub trait Client {
211    /// Gets the driver to use for handling new connections.
212    ///
213    /// TODO: generalize connection creation to allow pluggable model (not just
214    /// OS sockets) and remove this.
215    fn driver(&self) -> &dyn Driver;
216
217    /// Transmits a packet to the client.
218    ///
219    /// If `checksum.ipv4`, `checksum.tcp`, or `checksum.udp` are set, then the
220    /// packet contains an IPv4 header, TCP header, and/or UDP header with a
221    /// valid checksum.
222    ///
223    /// TODO:
224    ///
225    /// 1. support >MTU sized packets (RSC/LRO/GRO)
226    /// 2. allow discontiguous data to eliminate the extra copy from the TCP
227    ///    window.
228    fn recv(&mut self, data: &[u8], checksum: &ChecksumState);
229
230    /// Specifies the maximum size for the next call to `recv`.
231    ///
232    /// This is the MTU including the Ethernet frame header. This must be at
233    /// least [`MIN_MTU`].
234    ///
235    /// Return 0 to indicate that there are no buffers available for receiving
236    /// data.
237    fn rx_mtu(&mut self) -> usize;
238}
239
240/// Specifies the checksum state for a packet being transmitted.
241#[derive(Debug, Copy, Clone)]
242pub struct ChecksumState {
243    /// On receive, the data has a valid IPv4 header checksum. On send, the
244    /// checksum should be ignored.
245    pub ipv4: bool,
246    /// On receive, the data has a valid TCP checksum. On send, the checksum
247    /// should be ignored.
248    pub tcp: bool,
249    /// On receive, the data has a valid UDP checksum. On send, the checksum
250    /// should be ignored.
251    pub udp: bool,
252    /// The data consists of multiple TCP segments, each with the provided
253    /// segment size.
254    ///
255    /// The IP header's length field may be invalid and should be ignored.
256    pub tso: Option<u16>,
257}
258
259impl ChecksumState {
260    const NONE: Self = Self {
261        ipv4: false,
262        tcp: false,
263        udp: false,
264        tso: None,
265    };
266    const IPV4_ONLY: Self = Self {
267        ipv4: true,
268        tcp: false,
269        udp: false,
270        tso: None,
271    };
272    const TCP4: Self = Self {
273        ipv4: true,
274        tcp: true,
275        udp: false,
276        tso: None,
277    };
278    const UDP4: Self = Self {
279        ipv4: true,
280        tcp: false,
281        udp: true,
282        tso: None,
283    };
284
285    fn caps(&self) -> ChecksumCapabilities {
286        let mut caps = ChecksumCapabilities::default();
287        if self.ipv4 {
288            caps.ipv4 = Checksum::None;
289        }
290        if self.tcp {
291            caps.tcp = Checksum::None;
292        }
293        if self.udp {
294            caps.udp = Checksum::None;
295        }
296        caps
297    }
298}
299
300/// The minimum MTU for receives supported by Consomme (including the Ethernet
301/// frame).
302pub const MIN_MTU: usize = 1514;
303
304#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
305struct SocketAddress {
306    ip: Ipv4Address,
307    port: u16,
308}
309
310impl From<SocketAddress> for SocketAddrV4 {
311    fn from(addr: SocketAddress) -> Self {
312        Self::new(addr.ip.into(), addr.port)
313    }
314}
315
316impl From<SocketAddress> for socket2::SockAddr {
317    fn from(addr: SocketAddress) -> Self {
318        socket2::SockAddr::from(SocketAddrV4::from(addr))
319    }
320}
321
322#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
323struct FourTuple {
324    dst: SocketAddress,
325    src: SocketAddress,
326}
327
328/// The reason a packet was dropped without being handled.
329#[derive(Debug, Error)]
330pub enum DropReason {
331    /// The packet could not be parsed.
332    #[error("packet parsing error")]
333    Packet(#[from] smoltcp::Error),
334    /// The ethertype is unknown.
335    #[error("unsupported ethertype {0}")]
336    UnsupportedEthertype(EthernetProtocol),
337    /// The ethertype is unknown.
338    #[error("unsupported ip protocol {0}")]
339    UnsupportedIpProtocol(IpProtocol),
340    /// The ARP type is unsupported.
341    #[error("unsupported dhcp message type {0:?}")]
342    UnsupportedDhcp(DhcpMessageType),
343    /// The ARP type is unsupported.
344    #[error("unsupported arp type")]
345    UnsupportedArp,
346    /// The IPv4 checksum was invalid.
347    #[error("ipv4 checksum failure")]
348    Ipv4Checksum,
349    /// The send buffer is invalid.
350    #[error("send buffer full")]
351    SendBufferFull,
352    /// There was an IO error.
353    #[error("io error")]
354    Io(#[source] std::io::Error),
355    /// The TCP state is invalid.
356    #[error("bad tcp state")]
357    BadTcpState(#[from] tcp::TcpError),
358    /// Specified port is not bound.
359    #[error("port is not bound")]
360    PortNotBound,
361}
362
363/// An error to create a consomme instance.
364#[derive(Debug, Error)]
365pub enum Error {
366    /// Could not get DNS nameserver information.
367    #[error("failed to initialize nameservers")]
368    Dns(#[from] dns::Error),
369}
370
371#[derive(Debug)]
372struct Ipv4Addresses {
373    src_addr: Ipv4Address,
374    dst_addr: Ipv4Address,
375}
376
377impl Consomme {
378    /// Creates a new consomme instance.
379    pub fn new() -> Result<Self, Error> {
380        let state = ConsommeState::new()?;
381        Ok(Self::new_with_state(state))
382    }
383
384    /// Creates a new consomme instance with specified state.
385    pub fn new_with_state(state: ConsommeState) -> Self {
386        Self {
387            state,
388            recv: None,
389            tcp: tcp::Tcp::new(),
390            udp: udp::Udp::new(),
391        }
392    }
393
394    /// Creates a new consomme instance with dynamic state.
395    pub fn new_dynamic(state: ConsommeState) -> (Self, ConsommeControl) {
396        let (send, recv) = mesh::channel();
397        let this = Self {
398            state,
399            recv: Some(recv),
400            tcp: tcp::Tcp::new(),
401            udp: udp::Udp::new(),
402        };
403        let control = ConsommeControl { send };
404        (this, control)
405    }
406
407    /// Pairs the client with this instance to operate on the consomme instance.
408    pub fn access<'a, T: Client>(&'a mut self, client: &'a mut T) -> Access<'a, T> {
409        Access {
410            inner: self,
411            client,
412        }
413    }
414}
415
416impl<T: Client> Access<'_, T> {
417    fn process_message(&mut self, message: ConsommeMessage) {
418        match message {
419            ConsommeMessage::BindPort(rpc) => {
420                rpc.handle_sync(|bind_message| match bind_message.protocol {
421                    IpProtocol::Tcp => self.bind_tcp_port(bind_message.address, bind_message.port),
422                    IpProtocol::Udp => self.bind_udp_port(bind_message.address, bind_message.port),
423                    p => unimplemented!("Listen not supported for protocol {}", p),
424                });
425            }
426            ConsommeMessage::UnbindPort(rpc) => {
427                rpc.handle_sync(|bind_message| match bind_message.protocol {
428                    IpProtocol::Tcp => self.unbind_tcp_port(bind_message.port),
429                    IpProtocol::Udp => self.unbind_udp_port(bind_message.port),
430                    p => unimplemented!("Listen not supported for protocol {}", p),
431                });
432            }
433            ConsommeMessage::UpdateState(rpc) => {
434                rpc.handle_sync(|f| f(&mut self.inner.state));
435            }
436        }
437    }
438
439    fn poll_message(&mut self, cx: &mut Context<'_>) {
440        // process all pending messages
441        while let Some(recv) = self.inner.recv.as_mut() {
442            match recv.poll_recv(cx) {
443                Poll::Ready(Err(err)) => {
444                    tracing::warn!(
445                        err = &err as &dyn std::error::Error,
446                        "Consomme dynamic update channel failure"
447                    );
448                    self.inner.recv = None;
449                    return;
450                }
451                Poll::Ready(Ok(message)) => self.process_message(message),
452                Poll::Pending => return,
453            }
454        }
455    }
456
457    /// Polls for work, transmitting any ready packets to the client.
458    pub fn poll(&mut self, cx: &mut Context<'_>) {
459        self.poll_udp(cx);
460        self.poll_tcp(cx);
461        self.poll_message(cx);
462    }
463
464    /// Update all sockets to use the new client's IO driver. This must be
465    /// called if the previous driver is no longer usable or if the client
466    /// otherwise wants existing connections to be polled on a new IO driver.
467    pub fn refresh_driver(&mut self) {
468        self.refresh_tcp_driver();
469        self.refresh_udp_driver();
470    }
471
472    /// Sends an Ethernet frame to the network.
473    ///
474    /// If `checksum.ipv4`, `checksum.tcp`, or `checksum.udp` are set, then
475    /// skips validating the IPv4, TCP, and UDP checksums. Otherwise, these
476    /// checksums are validated as normal and packets with invalid checksums are
477    /// dropped.
478    ///
479    /// If `checksum.tso.is_some()`, then perform TCP segmentation offset on the
480    /// frame. Practically speaking, this means that the frame contains a TCP
481    /// packet with these caveats:
482    ///
483    ///   * The IP header length may be invalid and will be ignored. The TCP
484    ///     packet payload is assumed to end at the end of `data`.
485    ///   * The TCP segment's payload size may be larger than the advertized TCP
486    ///     MSS value.
487    ///
488    /// This allows for sending TCP data that is much larger than the MSS size
489    /// via a single call.
490    ///
491    /// TODO:
492    ///
493    ///   1. allow for discontiguous packets
494    ///   2. allow for packets in guest memory (including lifetime model, if
495    ///      necessary--currently TCP transmits only happen in `poll`, but this
496    ///      may not be necessary. If the underlying socket implementation
497    ///      performs a copy (as the standard kernel socket APIs do), then no
498    ///      lifetime model is necessary, but if an implementation wants
499    ///      zerocopy support then some mechanism to allow the guest memory to
500    ///      be released later will be necessary.
501    pub fn send(&mut self, data: &[u8], checksum: &ChecksumState) -> Result<(), DropReason> {
502        let frame_packet = EthernetFrame::new_unchecked(data);
503        let frame = EthernetRepr::parse(&frame_packet)?;
504        match frame.ethertype {
505            EthernetProtocol::Ipv4 => self.handle_ipv4(&frame, frame_packet.payload(), checksum)?,
506            EthernetProtocol::Arp => self.handle_arp(&frame, frame_packet.payload())?,
507            _ => return Err(DropReason::UnsupportedEthertype(frame.ethertype)),
508        }
509        Ok(())
510    }
511
512    fn handle_ipv4(
513        &mut self,
514        frame: &EthernetRepr,
515        payload: &[u8],
516        checksum: &ChecksumState,
517    ) -> Result<(), DropReason> {
518        let ipv4 = Ipv4Packet::new_unchecked(payload);
519        if payload.len() < IPV4_HEADER_LEN
520            || ipv4.version() != 4
521            || payload.len() < ipv4.header_len().into()
522            || payload.len() < ipv4.total_len().into()
523        {
524            return Err(DropReason::Packet(smoltcp::Error::Malformed));
525        }
526
527        let total_len = if checksum.tso.is_some() {
528            payload.len()
529        } else {
530            ipv4.total_len().into()
531        };
532        if total_len < ipv4.header_len().into() {
533            return Err(DropReason::Packet(smoltcp::Error::Malformed));
534        }
535
536        if ipv4.more_frags() || ipv4.frag_offset() != 0 {
537            return Err(DropReason::Packet(smoltcp::Error::Fragmented));
538        }
539
540        if !checksum.ipv4 && !ipv4.verify_checksum() {
541            return Err(DropReason::Ipv4Checksum);
542        }
543
544        let addresses = Ipv4Addresses {
545            src_addr: ipv4.src_addr(),
546            dst_addr: ipv4.dst_addr(),
547        };
548
549        let inner = &payload[ipv4.header_len().into()..total_len];
550
551        match ipv4.protocol() {
552            IpProtocol::Tcp => self.handle_tcp(&addresses, inner, checksum)?,
553            IpProtocol::Udp => self.handle_udp(frame, &addresses, inner, checksum)?,
554            p => return Err(DropReason::UnsupportedIpProtocol(p)),
555        };
556        Ok(())
557    }
558}