Skip to main content

net_backend/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Network backend traits and infrastructure.
5//!
6//! This crate defines the abstraction boundary between network
7//! **frontends** (guest-facing devices) and network **backends**
8//! (host-side packet I/O). The key types are:
9//!
10//! * [`Endpoint`] — a backend factory. One per NIC, responsible for
11//!   creating [`Queue`] objects when the frontend activates the device.
12//!
13//! * [`Queue`] — a single TX/RX data path. Backends implement this to
14//!   send and receive packets. A device may have multiple queues (RSS).
15//!
16//! * [`BufferAccess`] — owned by the frontend, provides access to
17//!   guest memory receive buffers. Passed by `&mut` reference to every
18//!   [`Queue`] method that needs it, so the frontend retains exclusive
19//!   ownership and no internal locking is required.
20//!
21//! ## Lifecycle
22//!
23//! 1. The frontend creates a [`BufferAccess`] implementation and one
24//!    [`QueueConfig`] per desired queue (containing just a driver).
25//! 2. It calls [`Endpoint::get_queues`], which returns boxed [`Queue`]
26//!    objects.
27//! 3. The frontend posts initial receive buffers by calling
28//!    [`Queue::rx_avail`] with its [`BufferAccess`].
29//! 4. The main loop polls [`Queue::poll_ready`] for backend events,
30//!    then calls [`Queue::rx_poll`] / [`Queue::tx_avail`] /
31//!    [`Queue::tx_poll`] to exchange packets—always passing
32//!    `&mut dyn BufferAccess`.
33//! 5. On shutdown, queues are dropped and [`Endpoint::stop`] is called.
34
35#![expect(missing_docs)]
36#![forbid(unsafe_code)]
37
38pub mod loopback;
39pub mod null;
40pub mod resolve;
41pub mod tests;
42
43use async_trait::async_trait;
44use bitfield_struct::bitfield;
45use futures::FutureExt;
46use futures::StreamExt;
47use futures::TryFutureExt;
48use futures::lock::Mutex;
49use futures_concurrency::future::Race;
50use guestmem::GuestMemory;
51use guestmem::GuestMemoryError;
52use inspect::InspectMut;
53use inspect_counters::Counter;
54use mesh::rpc::Rpc;
55use mesh::rpc::RpcSend;
56use null::NullEndpoint;
57use pal_async::driver::Driver;
58use std::future::pending;
59use std::sync::Arc;
60use std::task::Context;
61use std::task::Poll;
62use thiserror::Error;
63
64/// Per-queue configuration passed to [`Endpoint::get_queues`].
65///
66/// Contains only an async driver handle. Receive buffers are posted
67/// separately via [`Queue::rx_avail`] after queue creation.
68pub struct QueueConfig {
69    pub driver: Box<dyn Driver>,
70}
71
72/// A network endpoint — the backend side of a NIC.
73///
74/// An endpoint is a factory for [`Queue`] objects. It represents a
75/// connection to some packet transport (TAP device, hardware NIC,
76/// user-space network stack, etc.) and can create one or more queues
77/// for parallel TX/RX processing.
78///
79/// Frontends (e.g. `virtio_net`, `netvsp`, `gdma`) own the endpoint
80/// and call [`get_queues`](Endpoint::get_queues) when the guest
81/// activates the NIC.
82#[async_trait]
83pub trait Endpoint: Send + Sync + InspectMut {
84    /// Returns an informational endpoint type.
85    fn endpoint_type(&self) -> &'static str;
86
87    /// Initializes the queues associated with the endpoint.
88    async fn get_queues(
89        &mut self,
90        config: Vec<QueueConfig>,
91        rss: Option<&RssConfig<'_>>,
92        queues: &mut Vec<Box<dyn Queue>>,
93    ) -> anyhow::Result<()>;
94
95    /// Stops the endpoint.
96    ///
97    /// All queues returned via `get_queues` must have been dropped.
98    async fn stop(&mut self);
99
100    /// Specifies whether packets are always completed in order.
101    fn is_ordered(&self) -> bool {
102        false
103    }
104
105    /// Specifies the supported set of transmit offloads.
106    fn tx_offload_support(&self) -> TxOffloadSupport {
107        TxOffloadSupport::default()
108    }
109
110    /// Specifies parameters related to supporting multiple queues.
111    fn multiqueue_support(&self) -> MultiQueueSupport {
112        MultiQueueSupport {
113            max_queues: 1,
114            indirection_table_size: 0,
115        }
116    }
117
118    /// If true, transmits are guaranteed to complete quickly. This is used to
119    /// allow eliding tx notifications from the guest when there are already
120    /// some tx packets in flight.
121    fn tx_fast_completions(&self) -> bool {
122        false
123    }
124
125    /// Sets the current data path for packet flow (e.g. via vmbus synthnic or through virtual function).
126    /// This is only supported for endpoints that pair with an accelerated device.
127    async fn set_data_path_to_guest_vf(&self, _use_vf: bool) -> anyhow::Result<()> {
128        Err(anyhow::Error::msg("Unsupported in current endpoint"))
129    }
130
131    async fn get_data_path_to_guest_vf(&self) -> anyhow::Result<bool> {
132        Err(anyhow::Error::msg("Unsupported in current endpoint"))
133    }
134
135    /// On completion, the return value indicates the specific endpoint action to take.
136    async fn wait_for_endpoint_action(&mut self) -> EndpointAction {
137        pending().await
138    }
139
140    /// Link speed in bps.
141    fn link_speed(&self) -> u64 {
142        // Reporting a reasonable default value (10Gbps) here that the individual endpoints
143        // can overwrite.
144        10 * 1000 * 1000 * 1000
145    }
146}
147
148/// Multi-queue related support.
149#[derive(Debug, Copy, Clone)]
150pub struct MultiQueueSupport {
151    /// The number of supported queues.
152    pub max_queues: u16,
153    /// The size of the RSS indirection table.
154    pub indirection_table_size: u16,
155}
156
157/// The set of supported transmit offloads.
158#[derive(Debug, Copy, Clone, Default)]
159pub struct TxOffloadSupport {
160    /// IPv4 header checksum offload.
161    pub ipv4_header: bool,
162    /// TCP checksum offload.
163    pub tcp: bool,
164    /// UDP checksum offload.
165    pub udp: bool,
166    /// TCP segmentation offload.
167    pub tso: bool,
168    /// UDP segmentation offload (USO).
169    pub uso: bool,
170}
171
172#[derive(Debug, Clone)]
173pub struct RssConfig<'a> {
174    pub key: &'a [u8],
175    pub indirection_table: &'a [u16],
176    pub flags: u32, // TODO
177}
178
179#[derive(Error, Debug)]
180pub enum TxError {
181    #[error("error requiring queue restart. {0}")]
182    TryRestart(#[source] anyhow::Error),
183    #[error("unrecoverable error. {0}")]
184    Fatal(#[source] anyhow::Error),
185}
186pub trait BackendQueueStats {
187    fn rx_errors(&self) -> Counter;
188    fn tx_errors(&self) -> Counter;
189    fn rx_packets(&self) -> Counter;
190    fn tx_packets(&self) -> Counter;
191    fn tx_vlan_packets(&self) -> Counter {
192        Counter::new()
193    }
194    fn rx_vlan_packets(&self) -> Counter {
195        Counter::new()
196    }
197}
198
199/// A single TX/RX data path for sending and receiving network packets.
200///
201/// Created by [`Endpoint::get_queues`] and driven by the frontend in
202/// a poll loop. Every method that touches receive buffers takes
203/// `pool: &mut dyn BufferAccess` so the frontend retains ownership
204/// of guest memory state.
205///
206/// Typical poll loop:
207/// ```text
208/// loop {
209///     poll_ready(cx, pool)  // wait for backend events
210///     rx_poll(pool, ..)     // drain completed receives
211///     tx_avail(pool, ..)    // post guest TX packets
212///     tx_poll(pool, ..)     // drain TX completions
213/// }
214/// ```
215#[async_trait]
216pub trait Queue: Send + InspectMut {
217    /// Updates the queue's target VP.
218    async fn update_target_vp(&mut self, target_vp: u32) {
219        let _ = target_vp;
220    }
221
222    /// Polls the queue for readiness.
223    fn poll_ready(&mut self, cx: &mut Context<'_>, pool: &mut dyn BufferAccess) -> Poll<()>;
224
225    /// Makes receive buffers available for use by the device.
226    fn rx_avail(&mut self, pool: &mut dyn BufferAccess, done: &[RxId]);
227
228    /// Polls the device for receives.
229    fn rx_poll(
230        &mut self,
231        pool: &mut dyn BufferAccess,
232        packets: &mut [RxId],
233    ) -> anyhow::Result<usize>;
234
235    /// Posts transmits to the device.
236    ///
237    /// Returns `Ok(false)` if the segments will complete asynchronously.
238    fn tx_avail(
239        &mut self,
240        pool: &mut dyn BufferAccess,
241        segments: &[TxSegment],
242    ) -> anyhow::Result<(bool, usize)>;
243
244    /// Polls the device for transmit completions.
245    fn tx_poll(&mut self, pool: &mut dyn BufferAccess, done: &mut [TxId])
246    -> Result<usize, TxError>;
247
248    /// Get queue statistics
249    fn queue_stats(&self) -> Option<&dyn BackendQueueStats> {
250        None // Default implementation - not all queues implement stats
251    }
252}
253
254/// Frontend-owned access to guest receive buffers.
255///
256/// Each frontend implements this trait to map [`RxId`] values to
257/// guest memory regions. The backend writes received packet data
258/// and metadata through these methods.
259///
260/// The frontend owns the `BufferAccess` and passes `&mut` references
261/// to [`Queue`] methods. This means no `Arc`/`Mutex` is needed
262/// between the frontend and backend for buffer access—the borrow
263/// checker enforces exclusive access statically.
264pub trait BufferAccess {
265    /// The associated guest memory accessor.
266    fn guest_memory(&self) -> &GuestMemory;
267
268    /// Writes data to the specified buffer.
269    fn write_data(&mut self, id: RxId, data: &[u8]);
270
271    /// Appends the guest address segments for the specified buffer to `buf`.
272    ///
273    /// Callers must clear `buf` before calling if they do not want segments
274    /// from a previous call to be retained.
275    fn push_guest_addresses(&self, id: RxId, buf: &mut Vec<RxBufferSegment>);
276
277    /// The capacity of the specified buffer in bytes.
278    fn capacity(&self, id: RxId) -> u32;
279
280    /// Sets the packet metadata for the receive.
281    fn write_header(&mut self, id: RxId, metadata: &RxMetadata);
282
283    /// Writes the packet header and data in a single call.
284    fn write_packet(&mut self, id: RxId, metadata: &RxMetadata, data: &[u8]) {
285        self.write_data(id, data);
286        self.write_header(id, metadata);
287    }
288}
289
290pub const ETHERNET_HEADER_LEN: u32 = 14;
291pub const ETHERNET_VLAN_HEADER_LEN: u32 = 18;
292
293pub const IPV4_MIN_HEADER_LEN: u16 = 20;
294pub const IPV6_MIN_HEADER_LEN: u16 = 40;
295
296#[bitfield(u16)]
297pub struct VlanMetadata {
298    /// Priority for 802.1Q.
299    #[bits(3)]
300    pub priority: u8,
301    /// In pretty much every circumstance this is false. When
302    /// it is used, setting DEI will inform switches/routing infra
303    /// that this can be dropped before higher priority traffic.
304    pub drop_eligible_indicator: bool,
305    /// The 802.1Q ID for this transmission.
306    #[bits(12)]
307    pub vlan_id: u16,
308}
309
310/// A receive buffer ID.
311#[derive(Debug, Copy, Clone)]
312#[repr(transparent)]
313pub struct RxId(pub u32);
314
315/// An individual segment in guest memory of a receive buffer.
316#[derive(Debug, Copy, Clone)]
317pub struct RxBufferSegment {
318    /// Guest physical address.
319    pub gpa: u64,
320    /// The number of bytes in this range.
321    pub len: u32,
322}
323
324/// Receive packet metadata.
325#[derive(Debug, Copy, Clone)]
326pub struct RxMetadata {
327    /// The offset of the packet data from the beginning of the receive buffer.
328    pub offset: usize,
329    /// The length of the packet in bytes.
330    pub len: usize,
331    /// The IP checksum validation state.
332    pub ip_checksum: RxChecksumState,
333    /// The L4 checksum validation state.
334    pub l4_checksum: RxChecksumState,
335    /// The L4 protocol.
336    pub l4_protocol: L4Protocol,
337    /// Information about 802.1Q VLAN tagging. When a vlan is in use, this structure
338    /// is populated. Only applies when traffic is being received over an L2 connection,
339    /// so L3-only or above traffic will not use this option.
340    pub vlan: Option<VlanMetadata>,
341}
342
343impl Default for RxMetadata {
344    fn default() -> Self {
345        Self {
346            offset: 0,
347            len: 0,
348            ip_checksum: RxChecksumState::Unknown,
349            l4_checksum: RxChecksumState::Unknown,
350            l4_protocol: L4Protocol::Unknown,
351            vlan: None,
352        }
353    }
354}
355
356/// The "L3" protocol: the IP layer.
357#[derive(Debug, Copy, Clone, PartialEq, Eq)]
358pub enum L3Protocol {
359    Unknown,
360    Ipv4,
361    Ipv6,
362}
363
364/// The "L4" protocol: the TCP/UDP layer.
365#[derive(Debug, Copy, Clone, PartialEq, Eq)]
366pub enum L4Protocol {
367    Unknown,
368    Tcp,
369    Udp,
370}
371
372/// The receive checksum state for a packet.
373#[derive(Debug, Copy, Clone, PartialEq, Eq)]
374pub enum RxChecksumState {
375    /// The checksum was not evaluated.
376    Unknown,
377    /// The checksum value is correct.
378    Good,
379    /// The checksum value is incorrect.
380    Bad,
381    /// The checksum has been validated, but the value in the header is wrong.
382    ///
383    /// This occurs when LRO/RSC offload has been performed--multiple packet
384    /// payloads are glommed together without updating the checksum in the first
385    /// packet's header.
386    ValidatedButWrong,
387}
388
389impl RxChecksumState {
390    /// Returns true if the checksum has been validated.
391    pub fn is_valid(self) -> bool {
392        self == Self::Good || self == Self::ValidatedButWrong
393    }
394}
395
396/// A transmit ID. This may be used by multiple segments at the same time.
397#[derive(Debug, Copy, Clone)]
398#[repr(transparent)]
399pub struct TxId(pub u32);
400
401#[derive(Debug, Clone)]
402/// The segment type.
403pub enum TxSegmentType {
404    /// The start of a packet.
405    Head(TxMetadata),
406    /// A packet continuation.
407    Tail,
408}
409
410#[derive(Debug, Clone)]
411/// Transmit packet metadata.
412pub struct TxMetadata {
413    /// The transmit ID.
414    pub id: TxId,
415    /// The number of segments, including this one.
416    pub segment_count: u8,
417    /// Flags.
418    pub flags: TxFlags,
419    /// The total length of the packet in bytes.
420    pub len: u32,
421    /// The length of the Ethernet frame header. Only guaranteed to be set if
422    /// various offload flags are set.
423    pub l2_len: u8,
424    /// The length of the IP header. Only guaranteed to be set if various
425    /// offload flags are set.
426    pub l3_len: u16,
427    /// The length of the TCP header. Only guaranteed to be set if various
428    /// offload flags are set.
429    pub l4_len: u8,
430    /// The offset into the buffer where the L4 header begins (TCP or UDP). Only
431    /// expected to be set if offload (checksum and/or segmentation) flags are set.
432    pub transport_header_offset: u16,
433    /// The maximum segment size, used for segmentation offload (TSO or USO).
434    /// Only guaranteed to be set if [`TxFlags::offload_tcp_segmentation`] or
435    /// [`TxFlags::offload_udp_segmentation`] is set.
436    pub max_segment_size: u16,
437    /// Information about 802.1Q VLAN tagging. When a vlan is in use, this structure
438    /// is populated. Only applies when traffic is being sent over an L2 connection,
439    /// so L3-only or above traffic will not use this option.
440    pub vlan: Option<VlanMetadata>,
441}
442
443/// Flags affecting transmit behavior.
444#[bitfield(u8)]
445pub struct TxFlags {
446    /// Offload IPv4 header checksum calculation.
447    ///
448    /// `l3_protocol`, `l2_len`, and `l3_len` must be set.
449    pub offload_ip_header_checksum: bool,
450    /// Offload the TCP checksum calculation.
451    ///
452    /// `l3_protocol`, `l2_len`, and `l3_len` must be set.
453    pub offload_tcp_checksum: bool,
454    /// Offload the UDP checksum calculation.
455    ///
456    /// `l3_protocol`, `l2_len`, and `l3_len` must be set.
457    pub offload_udp_checksum: bool,
458    /// Offload the TCP segmentation, allowing packets to be larger than the
459    /// MTU.
460    ///
461    /// `l3_protocol`, `l2_len`, `l3_len`, `l4_len`, and `tcp_segment_size` must
462    /// be set.
463    pub offload_tcp_segmentation: bool,
464    /// If true, the packet is IPv4.
465    pub is_ipv4: bool,
466    /// If true, the packet is IPv6. Mutually exclusive with `is_ipv4`.
467    pub is_ipv6: bool,
468    /// Offload UDP segmentation (USO), allowing UDP packets larger than the
469    /// MTU. `l2_len`, `l3_len`, and `max_segment_size` must be set.
470    pub offload_udp_segmentation: bool,
471    #[bits(1)]
472    _reserved: u8,
473}
474
475impl Default for TxMetadata {
476    fn default() -> Self {
477        Self {
478            id: TxId(0),
479            segment_count: 0,
480            len: 0,
481            flags: TxFlags::new(),
482            l2_len: 0,
483            l3_len: 0,
484            l4_len: 0,
485            transport_header_offset: 0,
486            max_segment_size: 0,
487            vlan: None,
488        }
489    }
490}
491
492#[derive(Debug, Clone)]
493/// A transmit packet segment.
494pub struct TxSegment {
495    /// The segment type (head or tail).
496    pub ty: TxSegmentType,
497    /// The guest address of this segment.
498    pub gpa: u64,
499    /// The length of this segment.
500    pub len: u32,
501}
502
503/// Computes the number of packets in `segments`.
504pub fn packet_count(mut segments: &[TxSegment]) -> usize {
505    let mut packet_count = 0;
506    while let Some(head) = segments.first() {
507        let TxSegmentType::Head(metadata) = &head.ty else {
508            unreachable!()
509        };
510        segments = &segments[metadata.segment_count as usize..];
511        packet_count += 1;
512    }
513    packet_count
514}
515
516/// Gets the next packet from a list of segments, returning the packet metadata,
517/// the segments in the packet, and the remaining segments.
518pub fn next_packet(segments: &[TxSegment]) -> (&TxMetadata, &[TxSegment], &[TxSegment]) {
519    let metadata = if let TxSegmentType::Head(metadata) = &segments[0].ty {
520        metadata
521    } else {
522        unreachable!();
523    };
524    let (this, rest) = segments.split_at(metadata.segment_count.into());
525    (metadata, this, rest)
526}
527
528/// Linearizes the next packet in a list of segments, returning the buffer data
529/// and advancing the segment list.
530pub fn linearize(
531    pool: &dyn BufferAccess,
532    segments: &mut &[TxSegment],
533) -> Result<Vec<u8>, GuestMemoryError> {
534    let (head, this, rest) = next_packet(segments);
535    let mut v = vec![0; head.len as usize];
536    let mut offset = 0;
537    let mem = pool.guest_memory();
538    for segment in this {
539        let dest = &mut v[offset..offset + segment.len as usize];
540        mem.read_at(segment.gpa, dest)?;
541        offset += segment.len as usize;
542    }
543    assert_eq!(v.len(), offset);
544    *segments = rest;
545    Ok(v)
546}
547
548#[derive(PartialEq, Debug)]
549pub enum EndpointAction {
550    RestartRequired,
551    LinkStatusNotify(bool),
552}
553
554enum DisconnectableEndpointUpdate {
555    EndpointConnected(Box<dyn Endpoint>),
556    EndpointDisconnected(Rpc<(), Option<Box<dyn Endpoint>>>),
557}
558
559pub struct DisconnectableEndpointControl {
560    send_update: mesh::Sender<DisconnectableEndpointUpdate>,
561}
562
563impl DisconnectableEndpointControl {
564    pub fn connect(&mut self, endpoint: Box<dyn Endpoint>) -> anyhow::Result<()> {
565        self.send_update
566            .send(DisconnectableEndpointUpdate::EndpointConnected(endpoint));
567        Ok(())
568    }
569
570    pub async fn disconnect(&mut self) -> anyhow::Result<Option<Box<dyn Endpoint>>> {
571        self.send_update
572            .call(DisconnectableEndpointUpdate::EndpointDisconnected, ())
573            .map_err(anyhow::Error::from)
574            .await
575    }
576}
577
578pub struct DisconnectableEndpointCachedState {
579    is_ordered: bool,
580    tx_offload_support: TxOffloadSupport,
581    multiqueue_support: MultiQueueSupport,
582    tx_fast_completions: bool,
583    link_speed: u64,
584}
585
586pub struct DisconnectableEndpoint {
587    endpoint: Option<Box<dyn Endpoint>>,
588    null_endpoint: Box<dyn Endpoint>,
589    cached_state: Option<DisconnectableEndpointCachedState>,
590    receive_update: Arc<Mutex<mesh::Receiver<DisconnectableEndpointUpdate>>>,
591    notify_disconnect_complete: Option<(
592        Rpc<(), Option<Box<dyn Endpoint>>>,
593        Option<Box<dyn Endpoint>>,
594    )>,
595}
596
597impl InspectMut for DisconnectableEndpoint {
598    fn inspect_mut(&mut self, req: inspect::Request<'_>) {
599        self.current_mut().inspect_mut(req)
600    }
601}
602
603impl DisconnectableEndpoint {
604    pub fn new() -> (Self, DisconnectableEndpointControl) {
605        let (endpoint_tx, endpoint_rx) = mesh::channel();
606        let control = DisconnectableEndpointControl {
607            send_update: endpoint_tx,
608        };
609        (
610            Self {
611                endpoint: None,
612                null_endpoint: Box::new(NullEndpoint::new()),
613                cached_state: None,
614                receive_update: Arc::new(Mutex::new(endpoint_rx)),
615                notify_disconnect_complete: None,
616            },
617            control,
618        )
619    }
620
621    fn current(&self) -> &dyn Endpoint {
622        self.endpoint
623            .as_ref()
624            .unwrap_or(&self.null_endpoint)
625            .as_ref()
626    }
627
628    fn current_mut(&mut self) -> &mut dyn Endpoint {
629        self.endpoint
630            .as_mut()
631            .unwrap_or(&mut self.null_endpoint)
632            .as_mut()
633    }
634}
635
636#[async_trait]
637impl Endpoint for DisconnectableEndpoint {
638    fn endpoint_type(&self) -> &'static str {
639        self.current().endpoint_type()
640    }
641
642    async fn get_queues(
643        &mut self,
644        config: Vec<QueueConfig>,
645        rss: Option<&RssConfig<'_>>,
646        queues: &mut Vec<Box<dyn Queue>>,
647    ) -> anyhow::Result<()> {
648        self.current_mut().get_queues(config, rss, queues).await
649    }
650
651    async fn stop(&mut self) {
652        self.current_mut().stop().await
653    }
654
655    fn is_ordered(&self) -> bool {
656        self.cached_state
657            .as_ref()
658            .expect("Endpoint needs connected at least once before use")
659            .is_ordered
660    }
661
662    fn tx_offload_support(&self) -> TxOffloadSupport {
663        self.cached_state
664            .as_ref()
665            .expect("Endpoint needs connected at least once before use")
666            .tx_offload_support
667    }
668
669    fn multiqueue_support(&self) -> MultiQueueSupport {
670        self.cached_state
671            .as_ref()
672            .expect("Endpoint needs connected at least once before use")
673            .multiqueue_support
674    }
675
676    fn tx_fast_completions(&self) -> bool {
677        self.cached_state
678            .as_ref()
679            .expect("Endpoint needs connected at least once before use")
680            .tx_fast_completions
681    }
682
683    async fn set_data_path_to_guest_vf(&self, use_vf: bool) -> anyhow::Result<()> {
684        self.current().set_data_path_to_guest_vf(use_vf).await
685    }
686
687    async fn get_data_path_to_guest_vf(&self) -> anyhow::Result<bool> {
688        self.current().get_data_path_to_guest_vf().await
689    }
690
691    async fn wait_for_endpoint_action(&mut self) -> EndpointAction {
692        // If the previous message disconnected the endpoint, notify the caller
693        // that the operation has completed, returning the old endpoint.
694        if let Some((rpc, old_endpoint)) = self.notify_disconnect_complete.take() {
695            rpc.handle(async |_| old_endpoint).await;
696        }
697
698        enum Message {
699            DisconnectableEndpointUpdate(DisconnectableEndpointUpdate),
700            UpdateFromEndpoint(EndpointAction),
701        }
702        let receiver = self.receive_update.clone();
703        let mut receive_update = receiver.lock().await;
704        let update = async {
705            match receive_update.next().await {
706                Some(m) => Message::DisconnectableEndpointUpdate(m),
707                None => {
708                    pending::<()>().await;
709                    unreachable!()
710                }
711            }
712        };
713        let ep_update = self
714            .current_mut()
715            .wait_for_endpoint_action()
716            .map(Message::UpdateFromEndpoint);
717        let m = (update, ep_update).race().await;
718        match m {
719            Message::DisconnectableEndpointUpdate(
720                DisconnectableEndpointUpdate::EndpointConnected(endpoint),
721            ) => {
722                let old_endpoint = self.endpoint.take();
723                assert!(old_endpoint.is_none());
724                self.endpoint = Some(endpoint);
725                self.cached_state = Some(DisconnectableEndpointCachedState {
726                    is_ordered: self.current().is_ordered(),
727                    tx_offload_support: self.current().tx_offload_support(),
728                    multiqueue_support: self.current().multiqueue_support(),
729                    tx_fast_completions: self.current().tx_fast_completions(),
730                    link_speed: self.current().link_speed(),
731                });
732                EndpointAction::RestartRequired
733            }
734            Message::DisconnectableEndpointUpdate(
735                DisconnectableEndpointUpdate::EndpointDisconnected(rpc),
736            ) => {
737                let old_endpoint = self.endpoint.take();
738                // Wait until the next call into this function to notify the
739                // caller that the operation has completed. This makes it more
740                // likely that the endpoint is no longer referenced (old queues
741                // have been disposed, etc.).
742                self.notify_disconnect_complete = Some((rpc, old_endpoint));
743                EndpointAction::RestartRequired
744            }
745            Message::UpdateFromEndpoint(update) => update,
746        }
747    }
748
749    fn link_speed(&self) -> u64 {
750        self.cached_state
751            .as_ref()
752            .expect("Endpoint needs connected at least once before use")
753            .link_speed
754    }
755}