1#![expect(missing_docs)]
8#![forbid(unsafe_code)]
9
10pub mod loopback;
11pub mod null;
12pub mod resolve;
13pub mod tests;
14
15use async_trait::async_trait;
16use bitfield_struct::bitfield;
17use futures::FutureExt;
18use futures::StreamExt;
19use futures::TryFutureExt;
20use futures::lock::Mutex;
21use futures_concurrency::future::Race;
22use guestmem::GuestMemory;
23use guestmem::GuestMemoryError;
24use inspect::InspectMut;
25use inspect_counters::Counter;
26use mesh::rpc::Rpc;
27use mesh::rpc::RpcSend;
28use null::NullEndpoint;
29use pal_async::driver::Driver;
30use std::future::pending;
31use std::sync::Arc;
32use std::task::Context;
33use std::task::Poll;
34use thiserror::Error;
35
36pub struct QueueConfig<'a> {
38 pub pool: Box<dyn BufferAccess>,
39 pub initial_rx: &'a [RxId],
40 pub driver: Box<dyn Driver>,
41}
42
43#[async_trait]
45pub trait Endpoint: Send + Sync + InspectMut {
46 fn endpoint_type(&self) -> &'static str;
48
49 async fn get_queues(
54 &mut self,
55 config: Vec<QueueConfig<'_>>,
56 rss: Option<&RssConfig<'_>>,
57 queues: &mut Vec<Box<dyn Queue>>,
58 ) -> anyhow::Result<()>;
59
60 async fn stop(&mut self);
64
65 fn is_ordered(&self) -> bool {
67 false
68 }
69
70 fn tx_offload_support(&self) -> TxOffloadSupport {
72 TxOffloadSupport::default()
73 }
74
75 fn multiqueue_support(&self) -> MultiQueueSupport {
77 MultiQueueSupport {
78 max_queues: 1,
79 indirection_table_size: 0,
80 }
81 }
82
83 fn tx_fast_completions(&self) -> bool {
87 false
88 }
89
90 async fn set_data_path_to_guest_vf(&self, _use_vf: bool) -> anyhow::Result<()> {
93 Err(anyhow::Error::msg("Unsupported in current endpoint"))
94 }
95
96 async fn get_data_path_to_guest_vf(&self) -> anyhow::Result<bool> {
97 Err(anyhow::Error::msg("Unsupported in current endpoint"))
98 }
99
100 async fn wait_for_endpoint_action(&mut self) -> EndpointAction {
102 pending().await
103 }
104
105 fn link_speed(&self) -> u64 {
107 10 * 1000 * 1000 * 1000
110 }
111}
112
113#[derive(Debug, Copy, Clone)]
115pub struct MultiQueueSupport {
116 pub max_queues: u16,
118 pub indirection_table_size: u16,
120}
121
122#[derive(Debug, Copy, Clone, Default)]
124pub struct TxOffloadSupport {
125 pub ipv4_header: bool,
127 pub tcp: bool,
129 pub udp: bool,
131 pub tso: bool,
133}
134
135#[derive(Debug, Clone)]
136pub struct RssConfig<'a> {
137 pub key: &'a [u8],
138 pub indirection_table: &'a [u16],
139 pub flags: u32, }
141
142#[derive(Error, Debug)]
143pub enum TxError {
144 #[error("error requiring queue restart. {0}")]
145 TryRestart(#[source] anyhow::Error),
146 #[error("unrecoverable error. {0}")]
147 Fatal(#[source] anyhow::Error),
148}
149pub trait BackendQueueStats {
150 fn rx_errors(&self) -> Counter;
151 fn tx_errors(&self) -> Counter;
152 fn rx_packets(&self) -> Counter;
153 fn tx_packets(&self) -> Counter;
154}
155
156#[async_trait]
158pub trait Queue: Send + InspectMut {
159 async fn update_target_vp(&mut self, target_vp: u32) {
161 let _ = target_vp;
162 }
163
164 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()>;
166
167 fn rx_avail(&mut self, done: &[RxId]);
169
170 fn rx_poll(&mut self, packets: &mut [RxId]) -> anyhow::Result<usize>;
172
173 fn tx_avail(&mut self, segments: &[TxSegment]) -> anyhow::Result<(bool, usize)>;
177
178 fn tx_poll(&mut self, done: &mut [TxId]) -> Result<usize, TxError>;
180
181 fn buffer_access(&mut self) -> Option<&mut dyn BufferAccess>;
183
184 fn queue_stats(&self) -> Option<&dyn BackendQueueStats> {
186 None }
188}
189
190pub trait BufferAccess: 'static + Send {
192 fn guest_memory(&self) -> &GuestMemory;
194
195 fn write_data(&mut self, id: RxId, data: &[u8]);
197
198 fn guest_addresses(&mut self, id: RxId) -> &[RxBufferSegment];
200
201 fn capacity(&self, id: RxId) -> u32;
203
204 fn write_header(&mut self, id: RxId, metadata: &RxMetadata);
206
207 fn write_packet(&mut self, id: RxId, metadata: &RxMetadata, data: &[u8]) {
209 self.write_data(id, data);
210 self.write_header(id, metadata);
211 }
212}
213
214#[derive(Debug, Copy, Clone)]
216#[repr(transparent)]
217pub struct RxId(pub u32);
218
219#[derive(Debug, Copy, Clone)]
221pub struct RxBufferSegment {
222 pub gpa: u64,
224 pub len: u32,
226}
227
228#[derive(Debug, Copy, Clone)]
230pub struct RxMetadata {
231 pub offset: usize,
233 pub len: usize,
235 pub ip_checksum: RxChecksumState,
237 pub l4_checksum: RxChecksumState,
239 pub l4_protocol: L4Protocol,
241}
242
243impl Default for RxMetadata {
244 fn default() -> Self {
245 Self {
246 offset: 0,
247 len: 0,
248 ip_checksum: RxChecksumState::Unknown,
249 l4_checksum: RxChecksumState::Unknown,
250 l4_protocol: L4Protocol::Unknown,
251 }
252 }
253}
254
255#[derive(Debug, Copy, Clone, PartialEq, Eq)]
257pub enum L3Protocol {
258 Unknown,
259 Ipv4,
260 Ipv6,
261}
262
263#[derive(Debug, Copy, Clone, PartialEq, Eq)]
265pub enum L4Protocol {
266 Unknown,
267 Tcp,
268 Udp,
269}
270
271#[derive(Debug, Copy, Clone, PartialEq, Eq)]
273pub enum RxChecksumState {
274 Unknown,
276 Good,
278 Bad,
280 ValidatedButWrong,
286}
287
288impl RxChecksumState {
289 pub fn is_valid(self) -> bool {
291 self == Self::Good || self == Self::ValidatedButWrong
292 }
293}
294
295#[derive(Debug, Copy, Clone)]
297#[repr(transparent)]
298pub struct TxId(pub u32);
299
300#[derive(Debug, Clone)]
301pub enum TxSegmentType {
303 Head(TxMetadata),
305 Tail,
307}
308
309#[derive(Debug, Clone)]
310pub struct TxMetadata {
312 pub id: TxId,
314 pub segment_count: u8,
316 pub flags: TxFlags,
318 pub len: u32,
320 pub l2_len: u8,
323 pub l3_len: u16,
326 pub l4_len: u8,
329 pub max_tcp_segment_size: u16,
332}
333
334#[bitfield(u8)]
336pub struct TxFlags {
337 pub offload_ip_header_checksum: bool,
341 pub offload_tcp_checksum: bool,
345 pub offload_udp_checksum: bool,
349 pub offload_tcp_segmentation: bool,
355 pub is_ipv4: bool,
357 pub is_ipv6: bool,
359 #[bits(2)]
360 _reserved: u8,
361}
362
363impl Default for TxMetadata {
364 fn default() -> Self {
365 Self {
366 id: TxId(0),
367 segment_count: 0,
368 len: 0,
369 flags: TxFlags::new(),
370 l2_len: 0,
371 l3_len: 0,
372 l4_len: 0,
373 max_tcp_segment_size: 0,
374 }
375 }
376}
377
378#[derive(Debug, Clone)]
379pub struct TxSegment {
381 pub ty: TxSegmentType,
383 pub gpa: u64,
385 pub len: u32,
387}
388
389pub fn packet_count(mut segments: &[TxSegment]) -> usize {
391 let mut packet_count = 0;
392 while let Some(head) = segments.first() {
393 let TxSegmentType::Head(metadata) = &head.ty else {
394 unreachable!()
395 };
396 segments = &segments[metadata.segment_count as usize..];
397 packet_count += 1;
398 }
399 packet_count
400}
401
402pub fn next_packet(segments: &[TxSegment]) -> (&TxMetadata, &[TxSegment], &[TxSegment]) {
405 let metadata = if let TxSegmentType::Head(metadata) = &segments[0].ty {
406 metadata
407 } else {
408 unreachable!();
409 };
410 let (this, rest) = segments.split_at(metadata.segment_count.into());
411 (metadata, this, rest)
412}
413
414pub fn linearize(
417 pool: &dyn BufferAccess,
418 segments: &mut &[TxSegment],
419) -> Result<Vec<u8>, GuestMemoryError> {
420 let (head, this, rest) = next_packet(segments);
421 let mut v = vec![0; head.len as usize];
422 let mut offset = 0;
423 let mem = pool.guest_memory();
424 for segment in this {
425 let dest = &mut v[offset..offset + segment.len as usize];
426 mem.read_at(segment.gpa, dest)?;
427 offset += segment.len as usize;
428 }
429 assert_eq!(v.len(), offset);
430 *segments = rest;
431 Ok(v)
432}
433
434#[derive(PartialEq, Debug)]
435pub enum EndpointAction {
436 RestartRequired,
437 LinkStatusNotify(bool),
438}
439
440enum DisconnectableEndpointUpdate {
441 EndpointConnected(Box<dyn Endpoint>),
442 EndpointDisconnected(Rpc<(), Option<Box<dyn Endpoint>>>),
443}
444
445pub struct DisconnectableEndpointControl {
446 send_update: mesh::Sender<DisconnectableEndpointUpdate>,
447}
448
449impl DisconnectableEndpointControl {
450 pub fn connect(&mut self, endpoint: Box<dyn Endpoint>) -> anyhow::Result<()> {
451 self.send_update
452 .send(DisconnectableEndpointUpdate::EndpointConnected(endpoint));
453 Ok(())
454 }
455
456 pub async fn disconnect(&mut self) -> anyhow::Result<Option<Box<dyn Endpoint>>> {
457 self.send_update
458 .call(DisconnectableEndpointUpdate::EndpointDisconnected, ())
459 .map_err(anyhow::Error::from)
460 .await
461 }
462}
463
464pub struct DisconnectableEndpointCachedState {
465 is_ordered: bool,
466 tx_offload_support: TxOffloadSupport,
467 multiqueue_support: MultiQueueSupport,
468 tx_fast_completions: bool,
469 link_speed: u64,
470}
471
472pub struct DisconnectableEndpoint {
473 endpoint: Option<Box<dyn Endpoint>>,
474 null_endpoint: Box<dyn Endpoint>,
475 cached_state: Option<DisconnectableEndpointCachedState>,
476 receive_update: Arc<Mutex<mesh::Receiver<DisconnectableEndpointUpdate>>>,
477 notify_disconnect_complete: Option<(
478 Rpc<(), Option<Box<dyn Endpoint>>>,
479 Option<Box<dyn Endpoint>>,
480 )>,
481}
482
483impl InspectMut for DisconnectableEndpoint {
484 fn inspect_mut(&mut self, req: inspect::Request<'_>) {
485 self.current_mut().inspect_mut(req)
486 }
487}
488
489impl DisconnectableEndpoint {
490 pub fn new() -> (Self, DisconnectableEndpointControl) {
491 let (endpoint_tx, endpoint_rx) = mesh::channel();
492 let control = DisconnectableEndpointControl {
493 send_update: endpoint_tx,
494 };
495 (
496 Self {
497 endpoint: None,
498 null_endpoint: Box::new(NullEndpoint::new()),
499 cached_state: None,
500 receive_update: Arc::new(Mutex::new(endpoint_rx)),
501 notify_disconnect_complete: None,
502 },
503 control,
504 )
505 }
506
507 fn current(&self) -> &dyn Endpoint {
508 self.endpoint
509 .as_ref()
510 .unwrap_or(&self.null_endpoint)
511 .as_ref()
512 }
513
514 fn current_mut(&mut self) -> &mut dyn Endpoint {
515 self.endpoint
516 .as_mut()
517 .unwrap_or(&mut self.null_endpoint)
518 .as_mut()
519 }
520}
521
522#[async_trait]
523impl Endpoint for DisconnectableEndpoint {
524 fn endpoint_type(&self) -> &'static str {
525 self.current().endpoint_type()
526 }
527
528 async fn get_queues(
529 &mut self,
530 config: Vec<QueueConfig<'_>>,
531 rss: Option<&RssConfig<'_>>,
532 queues: &mut Vec<Box<dyn Queue>>,
533 ) -> anyhow::Result<()> {
534 self.current_mut().get_queues(config, rss, queues).await
535 }
536
537 async fn stop(&mut self) {
538 self.current_mut().stop().await
539 }
540
541 fn is_ordered(&self) -> bool {
542 self.cached_state
543 .as_ref()
544 .expect("Endpoint needs connected at least once before use")
545 .is_ordered
546 }
547
548 fn tx_offload_support(&self) -> TxOffloadSupport {
549 self.cached_state
550 .as_ref()
551 .expect("Endpoint needs connected at least once before use")
552 .tx_offload_support
553 }
554
555 fn multiqueue_support(&self) -> MultiQueueSupport {
556 self.cached_state
557 .as_ref()
558 .expect("Endpoint needs connected at least once before use")
559 .multiqueue_support
560 }
561
562 fn tx_fast_completions(&self) -> bool {
563 self.cached_state
564 .as_ref()
565 .expect("Endpoint needs connected at least once before use")
566 .tx_fast_completions
567 }
568
569 async fn set_data_path_to_guest_vf(&self, use_vf: bool) -> anyhow::Result<()> {
570 self.current().set_data_path_to_guest_vf(use_vf).await
571 }
572
573 async fn get_data_path_to_guest_vf(&self) -> anyhow::Result<bool> {
574 self.current().get_data_path_to_guest_vf().await
575 }
576
577 async fn wait_for_endpoint_action(&mut self) -> EndpointAction {
578 if let Some((rpc, old_endpoint)) = self.notify_disconnect_complete.take() {
581 rpc.handle(async |_| old_endpoint).await;
582 }
583
584 enum Message {
585 DisconnectableEndpointUpdate(DisconnectableEndpointUpdate),
586 UpdateFromEndpoint(EndpointAction),
587 }
588 let receiver = self.receive_update.clone();
589 let mut receive_update = receiver.lock().await;
590 let update = async {
591 match receive_update.next().await {
592 Some(m) => Message::DisconnectableEndpointUpdate(m),
593 None => {
594 pending::<()>().await;
595 unreachable!()
596 }
597 }
598 };
599 let ep_update = self
600 .current_mut()
601 .wait_for_endpoint_action()
602 .map(Message::UpdateFromEndpoint);
603 let m = (update, ep_update).race().await;
604 match m {
605 Message::DisconnectableEndpointUpdate(
606 DisconnectableEndpointUpdate::EndpointConnected(endpoint),
607 ) => {
608 let old_endpoint = self.endpoint.take();
609 assert!(old_endpoint.is_none());
610 self.endpoint = Some(endpoint);
611 self.cached_state = Some(DisconnectableEndpointCachedState {
612 is_ordered: self.current().is_ordered(),
613 tx_offload_support: self.current().tx_offload_support(),
614 multiqueue_support: self.current().multiqueue_support(),
615 tx_fast_completions: self.current().tx_fast_completions(),
616 link_speed: self.current().link_speed(),
617 });
618 EndpointAction::RestartRequired
619 }
620 Message::DisconnectableEndpointUpdate(
621 DisconnectableEndpointUpdate::EndpointDisconnected(rpc),
622 ) => {
623 let old_endpoint = self.endpoint.take();
624 self.notify_disconnect_complete = Some((rpc, old_endpoint));
629 EndpointAction::RestartRequired
630 }
631 Message::UpdateFromEndpoint(update) => update,
632 }
633 }
634
635 fn link_speed(&self) -> u64 {
636 self.cached_state
637 .as_ref()
638 .expect("Endpoint needs connected at least once before use")
639 .link_speed
640 }
641}