1#![expect(missing_docs)]
36#![forbid(unsafe_code)]
37
38pub mod loopback;
39pub mod null;
40pub mod resolve;
41pub mod tests;
42
43use async_trait::async_trait;
44use bitfield_struct::bitfield;
45use futures::FutureExt;
46use futures::StreamExt;
47use futures::TryFutureExt;
48use futures::lock::Mutex;
49use futures_concurrency::future::Race;
50use guestmem::GuestMemory;
51use guestmem::GuestMemoryError;
52use inspect::InspectMut;
53use inspect_counters::Counter;
54use mesh::rpc::Rpc;
55use mesh::rpc::RpcSend;
56use null::NullEndpoint;
57use pal_async::driver::Driver;
58use std::future::pending;
59use std::sync::Arc;
60use std::task::Context;
61use std::task::Poll;
62use thiserror::Error;
63
64pub struct QueueConfig {
69 pub driver: Box<dyn Driver>,
70}
71
72#[async_trait]
83pub trait Endpoint: Send + Sync + InspectMut {
84 fn endpoint_type(&self) -> &'static str;
86
87 async fn get_queues(
89 &mut self,
90 config: Vec<QueueConfig>,
91 rss: Option<&RssConfig<'_>>,
92 queues: &mut Vec<Box<dyn Queue>>,
93 ) -> anyhow::Result<()>;
94
95 async fn stop(&mut self);
99
100 fn is_ordered(&self) -> bool {
102 false
103 }
104
105 fn tx_offload_support(&self) -> TxOffloadSupport {
107 TxOffloadSupport::default()
108 }
109
110 fn multiqueue_support(&self) -> MultiQueueSupport {
112 MultiQueueSupport {
113 max_queues: 1,
114 indirection_table_size: 0,
115 }
116 }
117
118 fn tx_fast_completions(&self) -> bool {
122 false
123 }
124
125 async fn set_data_path_to_guest_vf(&self, _use_vf: bool) -> anyhow::Result<()> {
128 Err(anyhow::Error::msg("Unsupported in current endpoint"))
129 }
130
131 async fn get_data_path_to_guest_vf(&self) -> anyhow::Result<bool> {
132 Err(anyhow::Error::msg("Unsupported in current endpoint"))
133 }
134
135 async fn wait_for_endpoint_action(&mut self) -> EndpointAction {
137 pending().await
138 }
139
140 fn link_speed(&self) -> u64 {
142 10 * 1000 * 1000 * 1000
145 }
146}
147
148#[derive(Debug, Copy, Clone)]
150pub struct MultiQueueSupport {
151 pub max_queues: u16,
153 pub indirection_table_size: u16,
155}
156
157#[derive(Debug, Copy, Clone, Default)]
159pub struct TxOffloadSupport {
160 pub ipv4_header: bool,
162 pub tcp: bool,
164 pub udp: bool,
166 pub tso: bool,
168}
169
170#[derive(Debug, Clone)]
171pub struct RssConfig<'a> {
172 pub key: &'a [u8],
173 pub indirection_table: &'a [u16],
174 pub flags: u32, }
176
177#[derive(Error, Debug)]
178pub enum TxError {
179 #[error("error requiring queue restart. {0}")]
180 TryRestart(#[source] anyhow::Error),
181 #[error("unrecoverable error. {0}")]
182 Fatal(#[source] anyhow::Error),
183}
184pub trait BackendQueueStats {
185 fn rx_errors(&self) -> Counter;
186 fn tx_errors(&self) -> Counter;
187 fn rx_packets(&self) -> Counter;
188 fn tx_packets(&self) -> Counter;
189}
190
191#[async_trait]
208pub trait Queue: Send + InspectMut {
209 async fn update_target_vp(&mut self, target_vp: u32) {
211 let _ = target_vp;
212 }
213
214 fn poll_ready(&mut self, cx: &mut Context<'_>, pool: &mut dyn BufferAccess) -> Poll<()>;
216
217 fn rx_avail(&mut self, pool: &mut dyn BufferAccess, done: &[RxId]);
219
220 fn rx_poll(
222 &mut self,
223 pool: &mut dyn BufferAccess,
224 packets: &mut [RxId],
225 ) -> anyhow::Result<usize>;
226
227 fn tx_avail(
231 &mut self,
232 pool: &mut dyn BufferAccess,
233 segments: &[TxSegment],
234 ) -> anyhow::Result<(bool, usize)>;
235
236 fn tx_poll(&mut self, pool: &mut dyn BufferAccess, done: &mut [TxId])
238 -> Result<usize, TxError>;
239
240 fn queue_stats(&self) -> Option<&dyn BackendQueueStats> {
242 None }
244}
245
246pub trait BufferAccess {
257 fn guest_memory(&self) -> &GuestMemory;
259
260 fn write_data(&mut self, id: RxId, data: &[u8]);
262
263 fn push_guest_addresses(&self, id: RxId, buf: &mut Vec<RxBufferSegment>);
268
269 fn capacity(&self, id: RxId) -> u32;
271
272 fn write_header(&mut self, id: RxId, metadata: &RxMetadata);
274
275 fn write_packet(&mut self, id: RxId, metadata: &RxMetadata, data: &[u8]) {
277 self.write_data(id, data);
278 self.write_header(id, metadata);
279 }
280}
281
282#[derive(Debug, Copy, Clone)]
284#[repr(transparent)]
285pub struct RxId(pub u32);
286
287#[derive(Debug, Copy, Clone)]
289pub struct RxBufferSegment {
290 pub gpa: u64,
292 pub len: u32,
294}
295
296#[derive(Debug, Copy, Clone)]
298pub struct RxMetadata {
299 pub offset: usize,
301 pub len: usize,
303 pub ip_checksum: RxChecksumState,
305 pub l4_checksum: RxChecksumState,
307 pub l4_protocol: L4Protocol,
309}
310
311impl Default for RxMetadata {
312 fn default() -> Self {
313 Self {
314 offset: 0,
315 len: 0,
316 ip_checksum: RxChecksumState::Unknown,
317 l4_checksum: RxChecksumState::Unknown,
318 l4_protocol: L4Protocol::Unknown,
319 }
320 }
321}
322
323#[derive(Debug, Copy, Clone, PartialEq, Eq)]
325pub enum L3Protocol {
326 Unknown,
327 Ipv4,
328 Ipv6,
329}
330
331#[derive(Debug, Copy, Clone, PartialEq, Eq)]
333pub enum L4Protocol {
334 Unknown,
335 Tcp,
336 Udp,
337}
338
339#[derive(Debug, Copy, Clone, PartialEq, Eq)]
341pub enum RxChecksumState {
342 Unknown,
344 Good,
346 Bad,
348 ValidatedButWrong,
354}
355
356impl RxChecksumState {
357 pub fn is_valid(self) -> bool {
359 self == Self::Good || self == Self::ValidatedButWrong
360 }
361}
362
363#[derive(Debug, Copy, Clone)]
365#[repr(transparent)]
366pub struct TxId(pub u32);
367
368#[derive(Debug, Clone)]
369pub enum TxSegmentType {
371 Head(TxMetadata),
373 Tail,
375}
376
377#[derive(Debug, Clone)]
378pub struct TxMetadata {
380 pub id: TxId,
382 pub segment_count: u8,
384 pub flags: TxFlags,
386 pub len: u32,
388 pub l2_len: u8,
391 pub l3_len: u16,
394 pub l4_len: u8,
397 pub max_tcp_segment_size: u16,
400}
401
402#[bitfield(u8)]
404pub struct TxFlags {
405 pub offload_ip_header_checksum: bool,
409 pub offload_tcp_checksum: bool,
413 pub offload_udp_checksum: bool,
417 pub offload_tcp_segmentation: bool,
423 pub is_ipv4: bool,
425 pub is_ipv6: bool,
427 #[bits(2)]
428 _reserved: u8,
429}
430
431impl Default for TxMetadata {
432 fn default() -> Self {
433 Self {
434 id: TxId(0),
435 segment_count: 0,
436 len: 0,
437 flags: TxFlags::new(),
438 l2_len: 0,
439 l3_len: 0,
440 l4_len: 0,
441 max_tcp_segment_size: 0,
442 }
443 }
444}
445
446#[derive(Debug, Clone)]
447pub struct TxSegment {
449 pub ty: TxSegmentType,
451 pub gpa: u64,
453 pub len: u32,
455}
456
457pub fn packet_count(mut segments: &[TxSegment]) -> usize {
459 let mut packet_count = 0;
460 while let Some(head) = segments.first() {
461 let TxSegmentType::Head(metadata) = &head.ty else {
462 unreachable!()
463 };
464 segments = &segments[metadata.segment_count as usize..];
465 packet_count += 1;
466 }
467 packet_count
468}
469
470pub fn next_packet(segments: &[TxSegment]) -> (&TxMetadata, &[TxSegment], &[TxSegment]) {
473 let metadata = if let TxSegmentType::Head(metadata) = &segments[0].ty {
474 metadata
475 } else {
476 unreachable!();
477 };
478 let (this, rest) = segments.split_at(metadata.segment_count.into());
479 (metadata, this, rest)
480}
481
482pub fn linearize(
485 pool: &dyn BufferAccess,
486 segments: &mut &[TxSegment],
487) -> Result<Vec<u8>, GuestMemoryError> {
488 let (head, this, rest) = next_packet(segments);
489 let mut v = vec![0; head.len as usize];
490 let mut offset = 0;
491 let mem = pool.guest_memory();
492 for segment in this {
493 let dest = &mut v[offset..offset + segment.len as usize];
494 mem.read_at(segment.gpa, dest)?;
495 offset += segment.len as usize;
496 }
497 assert_eq!(v.len(), offset);
498 *segments = rest;
499 Ok(v)
500}
501
502#[derive(PartialEq, Debug)]
503pub enum EndpointAction {
504 RestartRequired,
505 LinkStatusNotify(bool),
506}
507
508enum DisconnectableEndpointUpdate {
509 EndpointConnected(Box<dyn Endpoint>),
510 EndpointDisconnected(Rpc<(), Option<Box<dyn Endpoint>>>),
511}
512
513pub struct DisconnectableEndpointControl {
514 send_update: mesh::Sender<DisconnectableEndpointUpdate>,
515}
516
517impl DisconnectableEndpointControl {
518 pub fn connect(&mut self, endpoint: Box<dyn Endpoint>) -> anyhow::Result<()> {
519 self.send_update
520 .send(DisconnectableEndpointUpdate::EndpointConnected(endpoint));
521 Ok(())
522 }
523
524 pub async fn disconnect(&mut self) -> anyhow::Result<Option<Box<dyn Endpoint>>> {
525 self.send_update
526 .call(DisconnectableEndpointUpdate::EndpointDisconnected, ())
527 .map_err(anyhow::Error::from)
528 .await
529 }
530}
531
532pub struct DisconnectableEndpointCachedState {
533 is_ordered: bool,
534 tx_offload_support: TxOffloadSupport,
535 multiqueue_support: MultiQueueSupport,
536 tx_fast_completions: bool,
537 link_speed: u64,
538}
539
540pub struct DisconnectableEndpoint {
541 endpoint: Option<Box<dyn Endpoint>>,
542 null_endpoint: Box<dyn Endpoint>,
543 cached_state: Option<DisconnectableEndpointCachedState>,
544 receive_update: Arc<Mutex<mesh::Receiver<DisconnectableEndpointUpdate>>>,
545 notify_disconnect_complete: Option<(
546 Rpc<(), Option<Box<dyn Endpoint>>>,
547 Option<Box<dyn Endpoint>>,
548 )>,
549}
550
551impl InspectMut for DisconnectableEndpoint {
552 fn inspect_mut(&mut self, req: inspect::Request<'_>) {
553 self.current_mut().inspect_mut(req)
554 }
555}
556
557impl DisconnectableEndpoint {
558 pub fn new() -> (Self, DisconnectableEndpointControl) {
559 let (endpoint_tx, endpoint_rx) = mesh::channel();
560 let control = DisconnectableEndpointControl {
561 send_update: endpoint_tx,
562 };
563 (
564 Self {
565 endpoint: None,
566 null_endpoint: Box::new(NullEndpoint::new()),
567 cached_state: None,
568 receive_update: Arc::new(Mutex::new(endpoint_rx)),
569 notify_disconnect_complete: None,
570 },
571 control,
572 )
573 }
574
575 fn current(&self) -> &dyn Endpoint {
576 self.endpoint
577 .as_ref()
578 .unwrap_or(&self.null_endpoint)
579 .as_ref()
580 }
581
582 fn current_mut(&mut self) -> &mut dyn Endpoint {
583 self.endpoint
584 .as_mut()
585 .unwrap_or(&mut self.null_endpoint)
586 .as_mut()
587 }
588}
589
590#[async_trait]
591impl Endpoint for DisconnectableEndpoint {
592 fn endpoint_type(&self) -> &'static str {
593 self.current().endpoint_type()
594 }
595
596 async fn get_queues(
597 &mut self,
598 config: Vec<QueueConfig>,
599 rss: Option<&RssConfig<'_>>,
600 queues: &mut Vec<Box<dyn Queue>>,
601 ) -> anyhow::Result<()> {
602 self.current_mut().get_queues(config, rss, queues).await
603 }
604
605 async fn stop(&mut self) {
606 self.current_mut().stop().await
607 }
608
609 fn is_ordered(&self) -> bool {
610 self.cached_state
611 .as_ref()
612 .expect("Endpoint needs connected at least once before use")
613 .is_ordered
614 }
615
616 fn tx_offload_support(&self) -> TxOffloadSupport {
617 self.cached_state
618 .as_ref()
619 .expect("Endpoint needs connected at least once before use")
620 .tx_offload_support
621 }
622
623 fn multiqueue_support(&self) -> MultiQueueSupport {
624 self.cached_state
625 .as_ref()
626 .expect("Endpoint needs connected at least once before use")
627 .multiqueue_support
628 }
629
630 fn tx_fast_completions(&self) -> bool {
631 self.cached_state
632 .as_ref()
633 .expect("Endpoint needs connected at least once before use")
634 .tx_fast_completions
635 }
636
637 async fn set_data_path_to_guest_vf(&self, use_vf: bool) -> anyhow::Result<()> {
638 self.current().set_data_path_to_guest_vf(use_vf).await
639 }
640
641 async fn get_data_path_to_guest_vf(&self) -> anyhow::Result<bool> {
642 self.current().get_data_path_to_guest_vf().await
643 }
644
645 async fn wait_for_endpoint_action(&mut self) -> EndpointAction {
646 if let Some((rpc, old_endpoint)) = self.notify_disconnect_complete.take() {
649 rpc.handle(async |_| old_endpoint).await;
650 }
651
652 enum Message {
653 DisconnectableEndpointUpdate(DisconnectableEndpointUpdate),
654 UpdateFromEndpoint(EndpointAction),
655 }
656 let receiver = self.receive_update.clone();
657 let mut receive_update = receiver.lock().await;
658 let update = async {
659 match receive_update.next().await {
660 Some(m) => Message::DisconnectableEndpointUpdate(m),
661 None => {
662 pending::<()>().await;
663 unreachable!()
664 }
665 }
666 };
667 let ep_update = self
668 .current_mut()
669 .wait_for_endpoint_action()
670 .map(Message::UpdateFromEndpoint);
671 let m = (update, ep_update).race().await;
672 match m {
673 Message::DisconnectableEndpointUpdate(
674 DisconnectableEndpointUpdate::EndpointConnected(endpoint),
675 ) => {
676 let old_endpoint = self.endpoint.take();
677 assert!(old_endpoint.is_none());
678 self.endpoint = Some(endpoint);
679 self.cached_state = Some(DisconnectableEndpointCachedState {
680 is_ordered: self.current().is_ordered(),
681 tx_offload_support: self.current().tx_offload_support(),
682 multiqueue_support: self.current().multiqueue_support(),
683 tx_fast_completions: self.current().tx_fast_completions(),
684 link_speed: self.current().link_speed(),
685 });
686 EndpointAction::RestartRequired
687 }
688 Message::DisconnectableEndpointUpdate(
689 DisconnectableEndpointUpdate::EndpointDisconnected(rpc),
690 ) => {
691 let old_endpoint = self.endpoint.take();
692 self.notify_disconnect_complete = Some((rpc, old_endpoint));
697 EndpointAction::RestartRequired
698 }
699 Message::UpdateFromEndpoint(update) => update,
700 }
701 }
702
703 fn link_speed(&self) -> u64 {
704 self.cached_state
705 .as_ref()
706 .expect("Endpoint needs connected at least once before use")
707 .link_speed
708 }
709}