1use crate::BufferAccess;
10use crate::Endpoint;
11use crate::MultiQueueSupport;
12use crate::Queue;
13use crate::QueueConfig;
14use crate::RssConfig;
15use crate::RxId;
16use crate::RxMetadata;
17use crate::TxError;
18use crate::TxId;
19use crate::TxSegment;
20use crate::linearize;
21use crate::packet_count;
22use async_trait::async_trait;
23use inspect::InspectMut;
24use std::collections::VecDeque;
25use std::task::Context;
26use std::task::Poll;
27
28#[derive(InspectMut)]
31#[inspect(skip)]
32pub struct LoopbackEndpoint(());
33
34impl LoopbackEndpoint {
35 pub fn new() -> Self {
37 Self(())
38 }
39}
40
41#[async_trait]
42impl Endpoint for LoopbackEndpoint {
43 fn endpoint_type(&self) -> &'static str {
44 "loopback"
45 }
46
47 async fn get_queues(
48 &mut self,
49 config: Vec<QueueConfig<'_>>,
50 _rss: Option<&RssConfig<'_>>,
51 queues: &mut Vec<Box<dyn Queue>>,
52 ) -> anyhow::Result<()> {
53 queues.extend(config.into_iter().map(|config| {
54 Box::new(LoopbackQueue {
55 pool: config.pool,
56 rx_avail: config.initial_rx.to_vec().into(),
57 rx_done: VecDeque::new(),
58 }) as _
59 }));
60 Ok(())
61 }
62
63 async fn stop(&mut self) {}
64
65 fn is_ordered(&self) -> bool {
66 true
67 }
68
69 fn multiqueue_support(&self) -> MultiQueueSupport {
70 MultiQueueSupport {
71 max_queues: u16::MAX,
72 indirection_table_size: 64,
73 }
74 }
75}
76
77#[derive(InspectMut)]
78#[inspect(skip)]
79pub struct LoopbackQueue {
80 pub(crate) pool: Box<dyn BufferAccess>,
81 pub(crate) rx_avail: VecDeque<RxId>,
82 pub(crate) rx_done: VecDeque<RxId>,
83}
84
85impl Queue for LoopbackQueue {
86 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<()> {
87 if self.rx_done.is_empty() {
88 Poll::Pending
89 } else {
90 Poll::Ready(())
91 }
92 }
93
94 fn rx_avail(&mut self, done: &[RxId]) {
95 tracing::debug!(count = done.len(), "rx_avail");
96 self.rx_avail.extend(done);
97 }
98
99 fn rx_poll(&mut self, packets: &mut [RxId]) -> anyhow::Result<usize> {
100 let n = packets.len().min(self.rx_done.len());
101 for (d, s) in packets.iter_mut().zip(self.rx_done.drain(..n)) {
102 *d = s;
103 }
104 Ok(n)
105 }
106
107 fn tx_avail(&mut self, mut segments: &[TxSegment]) -> anyhow::Result<(bool, usize)> {
108 tracing::debug!(count = packet_count(segments), "tx_avail");
109 let mut sent = 0;
110 while !segments.is_empty() && !self.rx_avail.is_empty() {
111 let before = segments.len();
112 let packet = linearize(self.pool.as_ref(), &mut segments)?;
113 sent += before - segments.len();
114 let rx_id = self.rx_avail.pop_front().unwrap();
115 self.pool.write_packet(
116 rx_id,
117 &RxMetadata {
118 offset: 0,
119 len: packet.len(),
120 ..Default::default()
121 },
122 &packet,
123 );
124 self.rx_done.push_back(rx_id);
125 }
126 Ok((true, sent))
127 }
128
129 fn tx_poll(&mut self, _done: &mut [TxId]) -> Result<usize, TxError> {
130 Ok(0)
131 }
132
133 fn buffer_access(&mut self) -> Option<&mut dyn BufferAccess> {
134 Some(self.pool.as_mut())
135 }
136}