1#![expect(missing_docs)]
36#![forbid(unsafe_code)]
37
38pub mod loopback;
39pub mod null;
40pub mod resolve;
41pub mod tests;
42
43use async_trait::async_trait;
44use bitfield_struct::bitfield;
45use futures::FutureExt;
46use futures::StreamExt;
47use futures::TryFutureExt;
48use futures::lock::Mutex;
49use futures_concurrency::future::Race;
50use guestmem::GuestMemory;
51use guestmem::GuestMemoryError;
52use inspect::InspectMut;
53use inspect_counters::Counter;
54use mesh::rpc::Rpc;
55use mesh::rpc::RpcSend;
56use null::NullEndpoint;
57use pal_async::driver::Driver;
58use std::future::pending;
59use std::sync::Arc;
60use std::task::Context;
61use std::task::Poll;
62use thiserror::Error;
63
64pub struct QueueConfig {
69 pub driver: Box<dyn Driver>,
70}
71
72#[async_trait]
83pub trait Endpoint: Send + Sync + InspectMut {
84 fn endpoint_type(&self) -> &'static str;
86
87 async fn get_queues(
89 &mut self,
90 config: Vec<QueueConfig>,
91 rss: Option<&RssConfig<'_>>,
92 queues: &mut Vec<Box<dyn Queue>>,
93 ) -> anyhow::Result<()>;
94
95 async fn stop(&mut self);
99
100 fn is_ordered(&self) -> bool {
102 false
103 }
104
105 fn tx_offload_support(&self) -> TxOffloadSupport {
107 TxOffloadSupport::default()
108 }
109
110 fn multiqueue_support(&self) -> MultiQueueSupport {
112 MultiQueueSupport {
113 max_queues: 1,
114 indirection_table_size: 0,
115 }
116 }
117
118 fn tx_fast_completions(&self) -> bool {
122 false
123 }
124
125 async fn set_data_path_to_guest_vf(&self, _use_vf: bool) -> anyhow::Result<()> {
128 Err(anyhow::Error::msg("Unsupported in current endpoint"))
129 }
130
131 async fn get_data_path_to_guest_vf(&self) -> anyhow::Result<bool> {
132 Err(anyhow::Error::msg("Unsupported in current endpoint"))
133 }
134
135 async fn wait_for_endpoint_action(&mut self) -> EndpointAction {
137 pending().await
138 }
139
140 fn link_speed(&self) -> u64 {
142 10 * 1000 * 1000 * 1000
145 }
146}
147
148#[derive(Debug, Copy, Clone)]
150pub struct MultiQueueSupport {
151 pub max_queues: u16,
153 pub indirection_table_size: u16,
155}
156
157#[derive(Debug, Copy, Clone, Default)]
159pub struct TxOffloadSupport {
160 pub ipv4_header: bool,
162 pub tcp: bool,
164 pub udp: bool,
166 pub tso: bool,
168 pub uso: bool,
170}
171
172#[derive(Debug, Clone)]
173pub struct RssConfig<'a> {
174 pub key: &'a [u8],
175 pub indirection_table: &'a [u16],
176 pub flags: u32, }
178
179#[derive(Error, Debug)]
180pub enum TxError {
181 #[error("error requiring queue restart. {0}")]
182 TryRestart(#[source] anyhow::Error),
183 #[error("unrecoverable error. {0}")]
184 Fatal(#[source] anyhow::Error),
185}
186pub trait BackendQueueStats {
187 fn rx_errors(&self) -> Counter;
188 fn tx_errors(&self) -> Counter;
189 fn rx_packets(&self) -> Counter;
190 fn tx_packets(&self) -> Counter;
191 fn tx_vlan_packets(&self) -> Counter {
192 Counter::new()
193 }
194 fn rx_vlan_packets(&self) -> Counter {
195 Counter::new()
196 }
197}
198
199#[async_trait]
216pub trait Queue: Send + InspectMut {
217 async fn update_target_vp(&mut self, target_vp: u32) {
219 let _ = target_vp;
220 }
221
222 fn poll_ready(&mut self, cx: &mut Context<'_>, pool: &mut dyn BufferAccess) -> Poll<()>;
224
225 fn rx_avail(&mut self, pool: &mut dyn BufferAccess, done: &[RxId]);
227
228 fn rx_poll(
230 &mut self,
231 pool: &mut dyn BufferAccess,
232 packets: &mut [RxId],
233 ) -> anyhow::Result<usize>;
234
235 fn tx_avail(
239 &mut self,
240 pool: &mut dyn BufferAccess,
241 segments: &[TxSegment],
242 ) -> anyhow::Result<(bool, usize)>;
243
244 fn tx_poll(&mut self, pool: &mut dyn BufferAccess, done: &mut [TxId])
246 -> Result<usize, TxError>;
247
248 fn queue_stats(&self) -> Option<&dyn BackendQueueStats> {
250 None }
252}
253
254pub trait BufferAccess {
265 fn guest_memory(&self) -> &GuestMemory;
267
268 fn write_data(&mut self, id: RxId, data: &[u8]);
270
271 fn push_guest_addresses(&self, id: RxId, buf: &mut Vec<RxBufferSegment>);
276
277 fn capacity(&self, id: RxId) -> u32;
279
280 fn write_header(&mut self, id: RxId, metadata: &RxMetadata);
282
283 fn write_packet(&mut self, id: RxId, metadata: &RxMetadata, data: &[u8]) {
285 self.write_data(id, data);
286 self.write_header(id, metadata);
287 }
288}
289
290pub const ETHERNET_HEADER_LEN: u32 = 14;
291pub const ETHERNET_VLAN_HEADER_LEN: u32 = 18;
292
293pub const IPV4_MIN_HEADER_LEN: u16 = 20;
294pub const IPV6_MIN_HEADER_LEN: u16 = 40;
295
296#[bitfield(u16)]
297pub struct VlanMetadata {
298 #[bits(3)]
300 pub priority: u8,
301 pub drop_eligible_indicator: bool,
305 #[bits(12)]
307 pub vlan_id: u16,
308}
309
310#[derive(Debug, Copy, Clone)]
312#[repr(transparent)]
313pub struct RxId(pub u32);
314
315#[derive(Debug, Copy, Clone)]
317pub struct RxBufferSegment {
318 pub gpa: u64,
320 pub len: u32,
322}
323
324#[derive(Debug, Copy, Clone)]
326pub struct RxMetadata {
327 pub offset: usize,
329 pub len: usize,
331 pub ip_checksum: RxChecksumState,
333 pub l4_checksum: RxChecksumState,
335 pub l4_protocol: L4Protocol,
337 pub vlan: Option<VlanMetadata>,
341}
342
343impl Default for RxMetadata {
344 fn default() -> Self {
345 Self {
346 offset: 0,
347 len: 0,
348 ip_checksum: RxChecksumState::Unknown,
349 l4_checksum: RxChecksumState::Unknown,
350 l4_protocol: L4Protocol::Unknown,
351 vlan: None,
352 }
353 }
354}
355
356#[derive(Debug, Copy, Clone, PartialEq, Eq)]
358pub enum L3Protocol {
359 Unknown,
360 Ipv4,
361 Ipv6,
362}
363
364#[derive(Debug, Copy, Clone, PartialEq, Eq)]
366pub enum L4Protocol {
367 Unknown,
368 Tcp,
369 Udp,
370}
371
372#[derive(Debug, Copy, Clone, PartialEq, Eq)]
374pub enum RxChecksumState {
375 Unknown,
377 Good,
379 Bad,
381 ValidatedButWrong,
387}
388
389impl RxChecksumState {
390 pub fn is_valid(self) -> bool {
392 self == Self::Good || self == Self::ValidatedButWrong
393 }
394}
395
396#[derive(Debug, Copy, Clone)]
398#[repr(transparent)]
399pub struct TxId(pub u32);
400
401#[derive(Debug, Clone)]
402pub enum TxSegmentType {
404 Head(TxMetadata),
406 Tail,
408}
409
410#[derive(Debug, Clone)]
411pub struct TxMetadata {
413 pub id: TxId,
415 pub segment_count: u8,
417 pub flags: TxFlags,
419 pub len: u32,
421 pub l2_len: u8,
424 pub l3_len: u16,
427 pub l4_len: u8,
430 pub transport_header_offset: u16,
433 pub max_segment_size: u16,
437 pub vlan: Option<VlanMetadata>,
441}
442
443#[bitfield(u8)]
445pub struct TxFlags {
446 pub offload_ip_header_checksum: bool,
450 pub offload_tcp_checksum: bool,
454 pub offload_udp_checksum: bool,
458 pub offload_tcp_segmentation: bool,
464 pub is_ipv4: bool,
466 pub is_ipv6: bool,
468 pub offload_udp_segmentation: bool,
471 #[bits(1)]
472 _reserved: u8,
473}
474
475impl Default for TxMetadata {
476 fn default() -> Self {
477 Self {
478 id: TxId(0),
479 segment_count: 0,
480 len: 0,
481 flags: TxFlags::new(),
482 l2_len: 0,
483 l3_len: 0,
484 l4_len: 0,
485 transport_header_offset: 0,
486 max_segment_size: 0,
487 vlan: None,
488 }
489 }
490}
491
492#[derive(Debug, Clone)]
493pub struct TxSegment {
495 pub ty: TxSegmentType,
497 pub gpa: u64,
499 pub len: u32,
501}
502
503pub fn packet_count(mut segments: &[TxSegment]) -> usize {
505 let mut packet_count = 0;
506 while let Some(head) = segments.first() {
507 let TxSegmentType::Head(metadata) = &head.ty else {
508 unreachable!()
509 };
510 segments = &segments[metadata.segment_count as usize..];
511 packet_count += 1;
512 }
513 packet_count
514}
515
516pub fn next_packet(segments: &[TxSegment]) -> (&TxMetadata, &[TxSegment], &[TxSegment]) {
519 let metadata = if let TxSegmentType::Head(metadata) = &segments[0].ty {
520 metadata
521 } else {
522 unreachable!();
523 };
524 let (this, rest) = segments.split_at(metadata.segment_count.into());
525 (metadata, this, rest)
526}
527
528pub fn linearize(
531 pool: &dyn BufferAccess,
532 segments: &mut &[TxSegment],
533) -> Result<Vec<u8>, GuestMemoryError> {
534 let (head, this, rest) = next_packet(segments);
535 let mut v = vec![0; head.len as usize];
536 let mut offset = 0;
537 let mem = pool.guest_memory();
538 for segment in this {
539 let dest = &mut v[offset..offset + segment.len as usize];
540 mem.read_at(segment.gpa, dest)?;
541 offset += segment.len as usize;
542 }
543 assert_eq!(v.len(), offset);
544 *segments = rest;
545 Ok(v)
546}
547
548#[derive(PartialEq, Debug)]
549pub enum EndpointAction {
550 RestartRequired,
551 LinkStatusNotify(bool),
552}
553
554enum DisconnectableEndpointUpdate {
555 EndpointConnected(Box<dyn Endpoint>),
556 EndpointDisconnected(Rpc<(), Option<Box<dyn Endpoint>>>),
557}
558
559pub struct DisconnectableEndpointControl {
560 send_update: mesh::Sender<DisconnectableEndpointUpdate>,
561}
562
563impl DisconnectableEndpointControl {
564 pub fn connect(&mut self, endpoint: Box<dyn Endpoint>) -> anyhow::Result<()> {
565 self.send_update
566 .send(DisconnectableEndpointUpdate::EndpointConnected(endpoint));
567 Ok(())
568 }
569
570 pub async fn disconnect(&mut self) -> anyhow::Result<Option<Box<dyn Endpoint>>> {
571 self.send_update
572 .call(DisconnectableEndpointUpdate::EndpointDisconnected, ())
573 .map_err(anyhow::Error::from)
574 .await
575 }
576}
577
578pub struct DisconnectableEndpointCachedState {
579 is_ordered: bool,
580 tx_offload_support: TxOffloadSupport,
581 multiqueue_support: MultiQueueSupport,
582 tx_fast_completions: bool,
583 link_speed: u64,
584}
585
586pub struct DisconnectableEndpoint {
587 endpoint: Option<Box<dyn Endpoint>>,
588 null_endpoint: Box<dyn Endpoint>,
589 cached_state: Option<DisconnectableEndpointCachedState>,
590 receive_update: Arc<Mutex<mesh::Receiver<DisconnectableEndpointUpdate>>>,
591 notify_disconnect_complete: Option<(
592 Rpc<(), Option<Box<dyn Endpoint>>>,
593 Option<Box<dyn Endpoint>>,
594 )>,
595}
596
597impl InspectMut for DisconnectableEndpoint {
598 fn inspect_mut(&mut self, req: inspect::Request<'_>) {
599 self.current_mut().inspect_mut(req)
600 }
601}
602
603impl DisconnectableEndpoint {
604 pub fn new() -> (Self, DisconnectableEndpointControl) {
605 let (endpoint_tx, endpoint_rx) = mesh::channel();
606 let control = DisconnectableEndpointControl {
607 send_update: endpoint_tx,
608 };
609 (
610 Self {
611 endpoint: None,
612 null_endpoint: Box::new(NullEndpoint::new()),
613 cached_state: None,
614 receive_update: Arc::new(Mutex::new(endpoint_rx)),
615 notify_disconnect_complete: None,
616 },
617 control,
618 )
619 }
620
621 fn current(&self) -> &dyn Endpoint {
622 self.endpoint
623 .as_ref()
624 .unwrap_or(&self.null_endpoint)
625 .as_ref()
626 }
627
628 fn current_mut(&mut self) -> &mut dyn Endpoint {
629 self.endpoint
630 .as_mut()
631 .unwrap_or(&mut self.null_endpoint)
632 .as_mut()
633 }
634}
635
636#[async_trait]
637impl Endpoint for DisconnectableEndpoint {
638 fn endpoint_type(&self) -> &'static str {
639 self.current().endpoint_type()
640 }
641
642 async fn get_queues(
643 &mut self,
644 config: Vec<QueueConfig>,
645 rss: Option<&RssConfig<'_>>,
646 queues: &mut Vec<Box<dyn Queue>>,
647 ) -> anyhow::Result<()> {
648 self.current_mut().get_queues(config, rss, queues).await
649 }
650
651 async fn stop(&mut self) {
652 self.current_mut().stop().await
653 }
654
655 fn is_ordered(&self) -> bool {
656 self.cached_state
657 .as_ref()
658 .expect("Endpoint needs connected at least once before use")
659 .is_ordered
660 }
661
662 fn tx_offload_support(&self) -> TxOffloadSupport {
663 self.cached_state
664 .as_ref()
665 .expect("Endpoint needs connected at least once before use")
666 .tx_offload_support
667 }
668
669 fn multiqueue_support(&self) -> MultiQueueSupport {
670 self.cached_state
671 .as_ref()
672 .expect("Endpoint needs connected at least once before use")
673 .multiqueue_support
674 }
675
676 fn tx_fast_completions(&self) -> bool {
677 self.cached_state
678 .as_ref()
679 .expect("Endpoint needs connected at least once before use")
680 .tx_fast_completions
681 }
682
683 async fn set_data_path_to_guest_vf(&self, use_vf: bool) -> anyhow::Result<()> {
684 self.current().set_data_path_to_guest_vf(use_vf).await
685 }
686
687 async fn get_data_path_to_guest_vf(&self) -> anyhow::Result<bool> {
688 self.current().get_data_path_to_guest_vf().await
689 }
690
691 async fn wait_for_endpoint_action(&mut self) -> EndpointAction {
692 if let Some((rpc, old_endpoint)) = self.notify_disconnect_complete.take() {
695 rpc.handle(async |_| old_endpoint).await;
696 }
697
698 enum Message {
699 DisconnectableEndpointUpdate(DisconnectableEndpointUpdate),
700 UpdateFromEndpoint(EndpointAction),
701 }
702 let receiver = self.receive_update.clone();
703 let mut receive_update = receiver.lock().await;
704 let update = async {
705 match receive_update.next().await {
706 Some(m) => Message::DisconnectableEndpointUpdate(m),
707 None => {
708 pending::<()>().await;
709 unreachable!()
710 }
711 }
712 };
713 let ep_update = self
714 .current_mut()
715 .wait_for_endpoint_action()
716 .map(Message::UpdateFromEndpoint);
717 let m = (update, ep_update).race().await;
718 match m {
719 Message::DisconnectableEndpointUpdate(
720 DisconnectableEndpointUpdate::EndpointConnected(endpoint),
721 ) => {
722 let old_endpoint = self.endpoint.take();
723 assert!(old_endpoint.is_none());
724 self.endpoint = Some(endpoint);
725 self.cached_state = Some(DisconnectableEndpointCachedState {
726 is_ordered: self.current().is_ordered(),
727 tx_offload_support: self.current().tx_offload_support(),
728 multiqueue_support: self.current().multiqueue_support(),
729 tx_fast_completions: self.current().tx_fast_completions(),
730 link_speed: self.current().link_speed(),
731 });
732 EndpointAction::RestartRequired
733 }
734 Message::DisconnectableEndpointUpdate(
735 DisconnectableEndpointUpdate::EndpointDisconnected(rpc),
736 ) => {
737 let old_endpoint = self.endpoint.take();
738 self.notify_disconnect_complete = Some((rpc, old_endpoint));
743 EndpointAction::RestartRequired
744 }
745 Message::UpdateFromEndpoint(update) => update,
746 }
747 }
748
749 fn link_speed(&self) -> u64 {
750 self.cached_state
751 .as_ref()
752 .expect("Endpoint needs connected at least once before use")
753 .link_speed
754 }
755}