virtio/
queue.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Core virtio queue implementation, without any notification mechanisms, async
5//! support, or other transport-specific details.
6
7use crate::spec::queue as spec;
8use crate::spec::u16_le;
9use guestmem::GuestMemory;
10use guestmem::GuestMemoryError;
11use std::sync::atomic;
12use thiserror::Error;
13
14#[derive(Debug, Clone)]
15pub(crate) struct QueueCore {
16    queue_size: u16,
17    queue_desc: GuestMemory,
18    queue_avail: GuestMemory,
19    queue_used: GuestMemory,
20    use_ring_event_index: bool,
21    mem: GuestMemory,
22}
23
24#[derive(Debug, Error)]
25pub enum QueueError {
26    #[error("error accessing queue memory")]
27    Memory(#[source] GuestMemoryError),
28    #[error("an indirect descriptor had the indirect flag set")]
29    DoubleIndirect,
30    #[error("a descriptor chain is too long or has a cycle")]
31    TooLong,
32}
33
34#[derive(Debug, Copy, Clone, Default)]
35pub struct QueueParams {
36    pub size: u16,
37    pub enable: bool,
38    pub desc_addr: u64,
39    pub avail_addr: u64,
40    pub used_addr: u64,
41}
42
43impl QueueCore {
44    pub fn new(features: u64, mem: GuestMemory, params: QueueParams) -> Result<Self, QueueError> {
45        let use_ring_event_index = (features & crate::spec::VIRTIO_F_RING_EVENT_IDX as u64) != 0;
46
47        let queue_avail = mem
48            .subrange(
49                params.avail_addr,
50                spec::AVAIL_OFFSET_RING
51                    + spec::AVAIL_ELEMENT_SIZE * params.size as u64
52                    + size_of::<u16>() as u64,
53                true,
54            )
55            .map_err(QueueError::Memory)?;
56
57        let queue_used = mem
58            .subrange(
59                params.used_addr,
60                spec::USED_OFFSET_RING
61                    + spec::USED_ELEMENT_SIZE * params.size as u64
62                    + size_of::<u16>() as u64,
63                true,
64            )
65            .map_err(QueueError::Memory)?;
66
67        let queue_desc = mem
68            .subrange(
69                params.desc_addr,
70                size_of::<spec::Descriptor>() as u64 * params.size as u64,
71                true,
72            )
73            .map_err(QueueError::Memory)?;
74
75        Ok(Self {
76            queue_size: params.size,
77            queue_desc,
78            queue_avail,
79            queue_used,
80            use_ring_event_index,
81            mem,
82        })
83    }
84
85    fn set_used_flags(&self, flags: spec::UsedFlags) -> Result<(), QueueError> {
86        self.queue_used
87            .write_plain::<u16_le>(0, &u16::from(flags).into())
88            .map_err(QueueError::Memory)
89    }
90
91    fn get_available_index(&self) -> Result<u16, QueueError> {
92        Ok(self
93            .queue_avail
94            .read_plain::<u16_le>(spec::AVAIL_OFFSET_IDX)
95            .map_err(QueueError::Memory)?
96            .get())
97    }
98
99    fn is_available(&self, queue_last_avail_index: u16) -> Result<bool, QueueError> {
100        let mut avail_index = self.get_available_index()?;
101        if avail_index == queue_last_avail_index {
102            if self.use_ring_event_index {
103                self.set_available_event(avail_index)?;
104            } else {
105                self.set_used_flags(spec::UsedFlags::new())?;
106            }
107            // Ensure the available event/used flags are visible before checking
108            // the available index again.
109            atomic::fence(atomic::Ordering::SeqCst);
110            avail_index = self.get_available_index()?;
111            if avail_index == queue_last_avail_index {
112                return Ok(false);
113            }
114        }
115        if self.use_ring_event_index {
116            self.set_available_event(avail_index.wrapping_sub(1))?;
117        } else {
118            self.set_used_flags(spec::UsedFlags::new().with_no_notify(true))?;
119        }
120        // Ensure available index read is ordered before subsequent descriptor
121        // reads.
122        atomic::fence(atomic::Ordering::Acquire);
123        Ok(true)
124    }
125
126    pub fn descriptor_index(&self, avail_index: u16) -> Result<Option<u16>, QueueError> {
127        if self.is_available(avail_index)? {
128            Ok(Some(self.get_available_descriptor_index(avail_index)?))
129        } else {
130            Ok(None)
131        }
132    }
133
134    pub fn reader(&mut self, descriptor_index: u16) -> DescriptorReader<'_> {
135        DescriptorReader {
136            queue: self,
137            indirect_queue: None,
138            descriptor_index: Some(descriptor_index),
139            num_read: 0,
140        }
141    }
142
143    fn get_available_descriptor_index(&self, avail_index: u16) -> Result<u16, QueueError> {
144        let wrapped_index = (avail_index % self.queue_size) as u64;
145        Ok(self
146            .queue_avail
147            .read_plain::<u16_le>(
148                spec::AVAIL_OFFSET_RING + spec::AVAIL_ELEMENT_SIZE * wrapped_index,
149            )
150            .map_err(QueueError::Memory)?
151            .get())
152    }
153
154    fn set_available_event(&self, index: u16) -> Result<(), QueueError> {
155        let addr = spec::USED_OFFSET_RING + spec::USED_ELEMENT_SIZE * (self.queue_size as u64);
156        self.queue_used
157            .write_plain::<u16_le>(addr, &index.into())
158            .map_err(QueueError::Memory)
159    }
160
161    fn read_descriptor(
162        &self,
163        descriptor_queue: &GuestMemory,
164        index: u16,
165    ) -> Result<spec::Descriptor, QueueError> {
166        descriptor_queue
167            .read_plain(index as u64 * size_of::<spec::Descriptor>() as u64)
168            .map_err(QueueError::Memory)
169    }
170
171    pub fn complete_descriptor(
172        &mut self,
173        queue_last_used_index: &mut u16,
174        descriptor_index: u16,
175        bytes_written: u32,
176    ) -> Result<bool, QueueError> {
177        self.set_used_descriptor(*queue_last_used_index, descriptor_index, bytes_written)?;
178        let last_used_index = *queue_last_used_index;
179        *queue_last_used_index = queue_last_used_index.wrapping_add(1);
180
181        // Ensure used element writes are ordered before used index write.
182        atomic::fence(atomic::Ordering::Release);
183        self.set_used_index(*queue_last_used_index)?;
184
185        // Ensure the used index write is visible before reading the field that
186        // determines whether to signal.
187        atomic::fence(atomic::Ordering::SeqCst);
188        let send_signal = if self.use_ring_event_index {
189            last_used_index == self.get_used_event()?
190        } else {
191            !self.get_available_flags()?.no_interrupt()
192        };
193
194        Ok(send_signal)
195    }
196
197    fn get_available_flags(&self) -> Result<spec::AvailableFlags, QueueError> {
198        Ok(self
199            .queue_avail
200            .read_plain::<u16_le>(spec::AVAIL_OFFSET_FLAGS)
201            .map_err(QueueError::Memory)?
202            .get()
203            .into())
204    }
205
206    fn get_used_event(&self) -> Result<u16, QueueError> {
207        let addr = spec::AVAIL_OFFSET_RING + spec::AVAIL_ELEMENT_SIZE * self.queue_size as u64;
208        Ok(self
209            .queue_avail
210            .read_plain::<u16_le>(addr)
211            .map_err(QueueError::Memory)?
212            .get())
213    }
214
215    fn set_used_descriptor(
216        &self,
217        queue_last_used_index: u16,
218        descriptor_index: u16,
219        bytes_written: u32,
220    ) -> Result<(), QueueError> {
221        let wrapped_index = (queue_last_used_index % self.queue_size) as u64;
222        let addr = spec::USED_OFFSET_RING + spec::USED_ELEMENT_SIZE * wrapped_index;
223        self.queue_used
224            .write_plain(
225                addr,
226                &spec::UsedElement {
227                    id: (descriptor_index as u32).into(),
228                    len: bytes_written.into(),
229                },
230            )
231            .map_err(QueueError::Memory)
232    }
233
234    fn set_used_index(&self, index: u16) -> Result<(), QueueError> {
235        self.queue_used
236            .write_plain::<u16_le>(spec::USED_OFFSET_IDX, &index.into())
237            .map_err(QueueError::Memory)
238    }
239}
240
241pub struct DescriptorReader<'a> {
242    queue: &'a mut QueueCore,
243    indirect_queue: Option<GuestMemory>,
244    descriptor_index: Option<u16>,
245    num_read: u8,
246}
247
248pub struct VirtioQueuePayload {
249    pub writeable: bool,
250    pub address: u64,
251    pub length: u32,
252}
253
254impl DescriptorReader<'_> {
255    fn next_descriptor(&mut self) -> Result<Option<VirtioQueuePayload>, QueueError> {
256        let Some(descriptor_index) = self.descriptor_index else {
257            return Ok(None);
258        };
259        let descriptor = self.queue.read_descriptor(
260            self.indirect_queue
261                .as_ref()
262                .unwrap_or(&self.queue.queue_desc),
263            descriptor_index,
264        )?;
265        let descriptor = if !descriptor.flags().indirect() {
266            descriptor
267        } else {
268            if self.indirect_queue.is_some() {
269                return Err(QueueError::DoubleIndirect);
270            }
271            // TODO: should we really create a subrange for this, or is it
272            // rare enough for the HCS case that we can just read it
273            // directly?
274            let indirect_queue = self.indirect_queue.insert(
275                self.queue
276                    .mem
277                    .subrange(
278                        descriptor.address.get(),
279                        descriptor.length.get() as u64,
280                        true,
281                    )
282                    .map_err(QueueError::Memory)?,
283            );
284            self.descriptor_index = Some(0);
285            self.queue.read_descriptor(indirect_queue, 0)?
286        };
287
288        self.num_read += 1;
289        if descriptor.flags().next() {
290            let next = descriptor.next.get();
291            // Limit the descriptor chain length to avoid running out of memory
292            // this may be due to a cycle in the descriptor chain.
293            if self.num_read == 128 {
294                return Err(QueueError::TooLong);
295            }
296            self.descriptor_index = Some(next);
297        } else {
298            self.descriptor_index = None;
299        }
300
301        Ok(Some(VirtioQueuePayload {
302            writeable: descriptor.flags().write(),
303            address: descriptor.address.get(),
304            length: descriptor.length.get(),
305        }))
306    }
307}
308
309impl Iterator for DescriptorReader<'_> {
310    type Item = Result<VirtioQueuePayload, QueueError>;
311
312    fn next(&mut self) -> Option<Self::Item> {
313        self.next_descriptor().transpose()
314    }
315}