1use crate::BufferAccess;
10use crate::Endpoint;
11use crate::MultiQueueSupport;
12use crate::Queue;
13use crate::QueueConfig;
14use crate::RssConfig;
15use crate::RxId;
16use crate::RxMetadata;
17use crate::TxError;
18use crate::TxId;
19use crate::TxSegment;
20use crate::linearize;
21use crate::packet_count;
22use async_trait::async_trait;
23use inspect::InspectMut;
24use std::collections::VecDeque;
25use std::task::Context;
26use std::task::Poll;
27
28#[derive(InspectMut)]
31#[inspect(skip)]
32pub struct LoopbackEndpoint(());
33
34impl LoopbackEndpoint {
35 pub fn new() -> Self {
37 Self(())
38 }
39}
40
41#[async_trait]
42impl Endpoint for LoopbackEndpoint {
43 fn endpoint_type(&self) -> &'static str {
44 "loopback"
45 }
46
47 async fn get_queues(
48 &mut self,
49 config: Vec<QueueConfig>,
50 _rss: Option<&RssConfig<'_>>,
51 queues: &mut Vec<Box<dyn Queue>>,
52 ) -> anyhow::Result<()> {
53 queues.extend(config.into_iter().map(|_config| {
54 Box::new(LoopbackQueue {
55 rx_avail: VecDeque::new(),
56 rx_done: VecDeque::new(),
57 }) as _
58 }));
59 Ok(())
60 }
61
62 async fn stop(&mut self) {}
63
64 fn is_ordered(&self) -> bool {
65 true
66 }
67
68 fn multiqueue_support(&self) -> MultiQueueSupport {
69 MultiQueueSupport {
70 max_queues: u16::MAX,
71 indirection_table_size: 64,
72 }
73 }
74}
75
76#[derive(InspectMut)]
77#[inspect(skip)]
78pub struct LoopbackQueue {
79 pub(crate) rx_avail: VecDeque<RxId>,
80 pub(crate) rx_done: VecDeque<RxId>,
81}
82
83impl Queue for LoopbackQueue {
84 fn poll_ready(&mut self, _cx: &mut Context<'_>, _pool: &mut dyn BufferAccess) -> Poll<()> {
85 if self.rx_done.is_empty() {
86 Poll::Pending
87 } else {
88 Poll::Ready(())
89 }
90 }
91
92 fn rx_avail(&mut self, _pool: &mut dyn BufferAccess, done: &[RxId]) {
93 tracing::debug!(count = done.len(), "rx_avail");
94 self.rx_avail.extend(done);
95 }
96
97 fn rx_poll(
98 &mut self,
99 _pool: &mut dyn BufferAccess,
100 packets: &mut [RxId],
101 ) -> anyhow::Result<usize> {
102 let n = packets.len().min(self.rx_done.len());
103 for (d, s) in packets.iter_mut().zip(self.rx_done.drain(..n)) {
104 *d = s;
105 }
106 Ok(n)
107 }
108
109 fn tx_avail(
110 &mut self,
111 pool: &mut dyn BufferAccess,
112 mut segments: &[TxSegment],
113 ) -> anyhow::Result<(bool, usize)> {
114 tracing::debug!(count = packet_count(segments), "tx_avail");
115 let mut sent = 0;
116 while !segments.is_empty() && !self.rx_avail.is_empty() {
117 let before = segments.len();
118 let packet = linearize(pool, &mut segments)?;
119 sent += before - segments.len();
120 let rx_id = self.rx_avail.pop_front().unwrap();
121 pool.write_packet(
122 rx_id,
123 &RxMetadata {
124 offset: 0,
125 len: packet.len(),
126 ..Default::default()
127 },
128 &packet,
129 );
130 self.rx_done.push_back(rx_id);
131 }
132 Ok((true, sent))
133 }
134
135 fn tx_poll(
136 &mut self,
137 _pool: &mut dyn BufferAccess,
138 _done: &mut [TxId],
139 ) -> Result<usize, TxError> {
140 Ok(0)
141 }
142}