net_backend/
loopback.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! Loopback endpoint implementation, which reflects all transmits back as
//! receives.
//!
//! This is useful for testing.

use crate::BufferAccess;
use crate::Endpoint;
use crate::MultiQueueSupport;
use crate::Queue;
use crate::QueueConfig;
use crate::RssConfig;
use crate::RxId;
use crate::RxMetadata;
use crate::TxId;
use crate::TxSegment;
use crate::linearize;
use crate::packet_count;
use async_trait::async_trait;
use inspect::InspectMut;
use std::collections::VecDeque;
use std::task::Context;
use std::task::Poll;

/// A networking backend that reflects all transmitted packets back as received
/// packets.
#[derive(InspectMut)]
#[inspect(skip)]
pub struct LoopbackEndpoint(());

impl LoopbackEndpoint {
    /// Returns a new endpoint.
    pub fn new() -> Self {
        Self(())
    }
}

#[async_trait]
impl Endpoint for LoopbackEndpoint {
    fn endpoint_type(&self) -> &'static str {
        "loopback"
    }

    async fn get_queues(
        &mut self,
        config: Vec<QueueConfig<'_>>,
        _rss: Option<&RssConfig<'_>>,
        queues: &mut Vec<Box<dyn Queue>>,
    ) -> anyhow::Result<()> {
        queues.extend(config.into_iter().map(|config| {
            Box::new(LoopbackQueue {
                pool: config.pool,
                rx_avail: config.initial_rx.to_vec().into(),
                rx_done: VecDeque::new(),
            }) as _
        }));
        Ok(())
    }

    async fn stop(&mut self) {}

    fn is_ordered(&self) -> bool {
        true
    }

    fn multiqueue_support(&self) -> MultiQueueSupport {
        MultiQueueSupport {
            max_queues: u16::MAX,
            indirection_table_size: 64,
        }
    }
}

#[derive(InspectMut)]
#[inspect(skip)]
pub struct LoopbackQueue {
    pub(crate) pool: Box<dyn BufferAccess>,
    pub(crate) rx_avail: VecDeque<RxId>,
    pub(crate) rx_done: VecDeque<RxId>,
}

impl Queue for LoopbackQueue {
    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<()> {
        if self.rx_done.is_empty() {
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }

    fn rx_avail(&mut self, done: &[RxId]) {
        tracing::debug!(count = done.len(), "rx_avail");
        self.rx_avail.extend(done);
    }

    fn rx_poll(&mut self, packets: &mut [RxId]) -> anyhow::Result<usize> {
        let n = packets.len().min(self.rx_done.len());
        for (d, s) in packets.iter_mut().zip(self.rx_done.drain(..n)) {
            *d = s;
        }
        Ok(n)
    }

    fn tx_avail(&mut self, mut segments: &[TxSegment]) -> anyhow::Result<(bool, usize)> {
        tracing::debug!(count = packet_count(segments), "tx_avail");
        let mut sent = 0;
        while !segments.is_empty() && !self.rx_avail.is_empty() {
            let before = segments.len();
            let packet = linearize(self.pool.as_ref(), &mut segments)?;
            sent += before - segments.len();
            let rx_id = self.rx_avail.pop_front().unwrap();
            self.pool.write_packet(
                rx_id,
                &RxMetadata {
                    offset: 0,
                    len: packet.len(),
                    ..Default::default()
                },
                &packet,
            );
            self.rx_done.push_back(rx_id);
        }
        Ok((true, sent))
    }

    fn tx_poll(&mut self, _done: &mut [TxId]) -> anyhow::Result<usize> {
        Ok(0)
    }

    fn buffer_access(&mut self) -> Option<&mut dyn BufferAccess> {
        Some(self.pool.as_mut())
    }
}