1use crate::BufferAccess;
10use crate::Endpoint;
11use crate::MultiQueueSupport;
12use crate::Queue;
13use crate::QueueConfig;
14use crate::RssConfig;
15use crate::RxId;
16use crate::RxMetadata;
17use crate::TxError;
18use crate::TxId;
19use crate::TxSegment;
20use crate::linearize;
21use crate::next_packet;
22use crate::packet_count;
23use async_trait::async_trait;
24use inspect::InspectMut;
25use std::collections::VecDeque;
26use std::task::Context;
27use std::task::Poll;
28
29#[derive(InspectMut)]
32#[inspect(skip)]
33pub struct LoopbackEndpoint(());
34
35impl LoopbackEndpoint {
36 pub fn new() -> Self {
38 Self(())
39 }
40}
41
42#[async_trait]
43impl Endpoint for LoopbackEndpoint {
44 fn endpoint_type(&self) -> &'static str {
45 "loopback"
46 }
47
48 async fn get_queues(
49 &mut self,
50 config: Vec<QueueConfig>,
51 _rss: Option<&RssConfig<'_>>,
52 queues: &mut Vec<Box<dyn Queue>>,
53 ) -> anyhow::Result<()> {
54 queues.extend(config.into_iter().map(|_config| {
55 Box::new(LoopbackQueue {
56 rx_avail: VecDeque::new(),
57 rx_done: VecDeque::new(),
58 }) as _
59 }));
60 Ok(())
61 }
62
63 async fn stop(&mut self) {}
64
65 fn is_ordered(&self) -> bool {
66 true
67 }
68
69 fn multiqueue_support(&self) -> MultiQueueSupport {
70 MultiQueueSupport {
71 max_queues: u16::MAX,
72 indirection_table_size: 64,
73 }
74 }
75}
76
77#[derive(InspectMut)]
78#[inspect(skip)]
79pub struct LoopbackQueue {
80 pub(crate) rx_avail: VecDeque<RxId>,
81 pub(crate) rx_done: VecDeque<RxId>,
82}
83
84impl Queue for LoopbackQueue {
85 fn poll_ready(&mut self, _cx: &mut Context<'_>, _pool: &mut dyn BufferAccess) -> Poll<()> {
86 if self.rx_done.is_empty() {
87 Poll::Pending
88 } else {
89 Poll::Ready(())
90 }
91 }
92
93 fn rx_avail(&mut self, _pool: &mut dyn BufferAccess, done: &[RxId]) {
94 tracing::debug!(count = done.len(), "rx_avail");
95 self.rx_avail.extend(done);
96 }
97
98 fn rx_poll(
99 &mut self,
100 _pool: &mut dyn BufferAccess,
101 packets: &mut [RxId],
102 ) -> anyhow::Result<usize> {
103 let n = packets.len().min(self.rx_done.len());
104 for (d, s) in packets.iter_mut().zip(self.rx_done.drain(..n)) {
105 *d = s;
106 }
107 Ok(n)
108 }
109
110 fn tx_avail(
111 &mut self,
112 pool: &mut dyn BufferAccess,
113 mut segments: &[TxSegment],
114 ) -> anyhow::Result<(bool, usize)> {
115 tracing::debug!(count = packet_count(segments), "tx_avail");
116 let mut sent = 0;
117 while !segments.is_empty() && !self.rx_avail.is_empty() {
118 let (meta, _, _) = next_packet(segments);
119 let vlan = meta.vlan;
120 let before = segments.len();
121 let packet = linearize(pool, &mut segments)?;
122 sent += before - segments.len();
123 let rx_id = self.rx_avail.pop_front().unwrap();
124 pool.write_packet(
125 rx_id,
126 &RxMetadata {
127 offset: 0,
128 len: packet.len(),
129 vlan,
130 ..Default::default()
131 },
132 &packet,
133 );
134 self.rx_done.push_back(rx_id);
135 }
136 Ok((true, sent))
137 }
138
139 fn tx_poll(
140 &mut self,
141 _pool: &mut dyn BufferAccess,
142 _done: &mut [TxId],
143 ) -> Result<usize, TxError> {
144 Ok(0)
145 }
146}