virtio/
queue.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Core virtio queue implementation, without any notification mechanisms, async
5//! support, or other transport-specific details.
6
7use crate::spec::VirtioDeviceFeatures;
8use crate::spec::queue as spec;
9use crate::spec::u16_le;
10use guestmem::GuestMemory;
11use guestmem::GuestMemoryError;
12use inspect::Inspect;
13use spec::DescriptorFlags;
14use spec::SplitDescriptor;
15use std::sync::atomic;
16use thiserror::Error;
17use zerocopy::FromBytes;
18use zerocopy::Immutable;
19use zerocopy::IntoBytes;
20use zerocopy::KnownLayout;
21
22pub(crate) fn descriptor_offset(index: u16) -> u64 {
23    index as u64 * size_of::<SplitDescriptor>() as u64
24}
25
26pub(crate) fn read_descriptor<T: IntoBytes + FromBytes + Immutable + KnownLayout>(
27    queue_desc: &GuestMemory,
28    index: u16,
29) -> Result<T, QueueError> {
30    queue_desc
31        .read_plain::<T>(descriptor_offset(index))
32        .map_err(QueueError::Memory)
33}
34
35#[derive(Debug, Error)]
36pub enum QueueError {
37    #[error("error accessing queue memory")]
38    Memory(#[source] GuestMemoryError),
39    #[error("an indirect descriptor had the indirect flag set")]
40    DoubleIndirect,
41    #[error("a descriptor chain is too long or has a cycle")]
42    TooLong,
43    #[error("Invalid queue size {0}. Must be a power of 2.")]
44    InvalidQueueSize(u16),
45}
46
47pub struct QueueDescriptor {
48    address: u64,
49    length: u32,
50    flags: DescriptorFlags,
51    next: Option<u16>,
52}
53
54pub enum QueueCompletionContext {
55    Split(SplitQueueCompletionContext),
56}
57
58pub struct QueueWork {
59    context: QueueCompletionContext,
60    pub payload: Vec<VirtioQueuePayload>,
61}
62
63impl QueueWork {
64    pub fn descriptor_index(&self) -> u16 {
65        match &self.context {
66            QueueCompletionContext::Split(context) => context.descriptor_index,
67        }
68    }
69}
70
71#[derive(Debug, Inspect)]
72#[inspect(tag = "type")]
73enum QueueGetWorkInner {
74    Split(#[inspect(flatten)] SplitQueueGetWork),
75}
76
77#[derive(Debug)]
78enum QueueCompleteWorkInner {
79    Split(SplitQueueCompleteWork),
80}
81
82#[derive(Debug, Copy, Clone, Default)]
83pub struct QueueParams {
84    pub size: u16,
85    pub enable: bool,
86    pub desc_addr: u64,
87    pub avail_addr: u64,
88    pub used_addr: u64,
89}
90
91#[derive(Debug, Inspect)]
92pub(crate) struct QueueCoreGetWork {
93    queue_desc: GuestMemory,
94    queue_size: u16,
95    #[inspect(skip)]
96    features: VirtioDeviceFeatures,
97    mem: GuestMemory,
98    #[inspect(flatten)]
99    inner: QueueGetWorkInner,
100}
101
102impl QueueCoreGetWork {
103    pub fn new(
104        features: VirtioDeviceFeatures,
105        mem: GuestMemory,
106        params: QueueParams,
107    ) -> Result<Self, QueueError> {
108        // Queue size must be a power of 2
109        if !params.size.is_power_of_two() {
110            return Err(QueueError::InvalidQueueSize(params.size));
111        }
112        let queue_desc = mem
113            .subrange(params.desc_addr, descriptor_offset(params.size), true)
114            .map_err(QueueError::Memory)?;
115        let inner = QueueGetWorkInner::Split(SplitQueueGetWork::new(
116            features.clone(),
117            mem.clone(),
118            params,
119        )?);
120        Ok(Self {
121            queue_desc,
122            queue_size: params.size,
123            features,
124            mem,
125            inner,
126        })
127    }
128
129    pub fn try_next_work(&mut self) -> Result<Option<QueueWork>, QueueError> {
130        let index = match &mut self.inner {
131            QueueGetWorkInner::Split(split) => split.is_available()?,
132        };
133        let Some(index) = index else {
134            return Ok(None);
135        };
136        let QueueGetWorkInner::Split(split) = &mut self.inner;
137        // Fetch descriptor index from given available index.
138        let descriptor_index = split.get_available_descriptor_index(index)?;
139        let payload = self
140            .reader(descriptor_index)
141            .collect::<Result<Vec<_>, _>>()?;
142        Ok(Some(QueueWork {
143            context: QueueCompletionContext::Split(SplitQueueCompletionContext {
144                descriptor_index,
145            }),
146            payload,
147        }))
148    }
149
150    fn reader(&mut self, descriptor_index: u16) -> DescriptorReader<'_> {
151        DescriptorReader {
152            chain: DescriptorChain::new(
153                self,
154                self.features.bank0().ring_indirect_desc(),
155                descriptor_index,
156            ),
157        }
158    }
159
160    fn descriptor(
161        &self,
162        desc_queue: &GuestMemory,
163        index: u16,
164    ) -> Result<QueueDescriptor, QueueError> {
165        let descriptor = match self.inner {
166            QueueGetWorkInner::Split(_) => {
167                let descriptor: SplitDescriptor = read_descriptor(desc_queue, index)?;
168                QueueDescriptor {
169                    address: descriptor.address.get(),
170                    length: descriptor.length.get(),
171                    flags: descriptor.flags(),
172                    next: if descriptor.flags().next() {
173                        Some(descriptor.next.get())
174                    } else {
175                        None
176                    },
177                }
178            }
179        };
180        Ok(descriptor)
181    }
182
183    fn size(&self) -> u16 {
184        self.queue_size
185    }
186}
187
188#[derive(Debug)]
189pub struct QueueCoreCompleteWork {
190    inner: QueueCompleteWorkInner,
191}
192
193impl QueueCoreCompleteWork {
194    pub fn new(
195        features: VirtioDeviceFeatures,
196        mem: GuestMemory,
197        params: QueueParams,
198    ) -> Result<Self, QueueError> {
199        let inner = QueueCompleteWorkInner::Split(SplitQueueCompleteWork::new(
200            features.clone(),
201            mem.clone(),
202            params,
203        )?);
204        Ok(Self { inner })
205    }
206
207    pub fn complete_descriptor(
208        &mut self,
209        work: &QueueWork,
210        bytes_written: u32,
211    ) -> Result<bool, QueueError> {
212        let QueueCompleteWorkInner::Split(inner) = &mut self.inner;
213        let QueueCompletionContext::Split(context) = &work.context;
214        inner.complete_descriptor(context, bytes_written)
215    }
216}
217
218pub(crate) fn new_queue(
219    features: VirtioDeviceFeatures,
220    mem: GuestMemory,
221    params: QueueParams,
222) -> Result<(QueueCoreGetWork, QueueCoreCompleteWork), QueueError> {
223    let get_work = QueueCoreGetWork::new(features.clone(), mem.clone(), params)?;
224    let complete_work = QueueCoreCompleteWork::new(features.clone(), mem.clone(), params)?;
225    Ok((get_work, complete_work))
226}
227
228pub struct SplitQueueCompletionContext {
229    pub descriptor_index: u16,
230}
231
232#[derive(Debug, Inspect)]
233#[inspect(extra = "Self::inspect_extra")]
234pub(crate) struct SplitQueueGetWork {
235    queue_avail: GuestMemory,
236    queue_used: GuestMemory,
237    queue_size: u16,
238    last_avail_index: u16,
239    use_ring_event_index: bool,
240}
241
242impl SplitQueueGetWork {
243    fn inspect_extra(&self, resp: &mut inspect::Response<'_>) {
244        resp.field("available_index", self.get_available_index().ok());
245    }
246
247    pub fn new(
248        features: VirtioDeviceFeatures,
249        mem: GuestMemory,
250        params: QueueParams,
251    ) -> Result<Self, QueueError> {
252        let queue_avail = mem
253            .subrange(
254                params.avail_addr,
255                spec::AVAIL_OFFSET_RING
256                    + spec::AVAIL_ELEMENT_SIZE * params.size as u64
257                    + size_of::<u16>() as u64,
258                true,
259            )
260            .map_err(QueueError::Memory)?;
261
262        let queue_used = mem
263            .subrange(
264                params.used_addr,
265                spec::USED_OFFSET_RING
266                    + spec::USED_ELEMENT_SIZE * params.size as u64
267                    + size_of::<u16>() as u64,
268                true,
269            )
270            .map_err(QueueError::Memory)?;
271        Ok(Self {
272            queue_avail,
273            queue_used,
274            queue_size: params.size,
275            last_avail_index: 0,
276            use_ring_event_index: features.bank0().ring_event_idx(),
277        })
278    }
279
280    fn set_used_flags(&self, flags: spec::UsedFlags) -> Result<(), QueueError> {
281        self.queue_used
282            .write_plain::<u16_le>(0, &u16::from(flags).into())
283            .map_err(QueueError::Memory)
284    }
285
286    fn get_available_index(&self) -> Result<u16, QueueError> {
287        Ok(self
288            .queue_avail
289            .read_plain::<u16_le>(spec::AVAIL_OFFSET_IDX)
290            .map_err(QueueError::Memory)?
291            .get())
292    }
293
294    pub fn is_available(&mut self) -> Result<Option<u16>, QueueError> {
295        let mut avail_index = Self::get_available_index(self)?;
296        if avail_index == self.last_avail_index {
297            if self.use_ring_event_index {
298                self.set_available_event(avail_index)?;
299            } else {
300                self.set_used_flags(spec::UsedFlags::new())?;
301            }
302            // Ensure the available event/used flags are visible before checking
303            // the available index again.
304            atomic::fence(atomic::Ordering::SeqCst);
305            avail_index = Self::get_available_index(self)?;
306            if avail_index == self.last_avail_index {
307                return Ok(None);
308            }
309        }
310
311        if self.use_ring_event_index {
312            self.set_available_event(self.last_avail_index)?;
313        } else {
314            self.set_used_flags(spec::UsedFlags::new().with_no_notify(true))?;
315        }
316        let next_avail_index = self.last_avail_index;
317        self.last_avail_index = self.last_avail_index.wrapping_add(1);
318        // Ensure available index read is ordered before subsequent descriptor
319        // reads.
320        atomic::fence(atomic::Ordering::Acquire);
321        Ok(Some(next_avail_index % self.queue_size))
322    }
323
324    pub fn get_available_descriptor_index(&self, wrapped_index: u16) -> Result<u16, QueueError> {
325        Ok(self
326            .queue_avail
327            .read_plain::<u16_le>(
328                spec::AVAIL_OFFSET_RING + spec::AVAIL_ELEMENT_SIZE * wrapped_index as u64,
329            )
330            .map_err(QueueError::Memory)?
331            .get())
332    }
333
334    fn set_available_event(&self, index: u16) -> Result<(), QueueError> {
335        let addr = spec::USED_OFFSET_RING + spec::USED_ELEMENT_SIZE * (self.queue_size as u64);
336        self.queue_used
337            .write_plain::<u16_le>(addr, &index.into())
338            .map_err(QueueError::Memory)
339    }
340}
341
342#[derive(Debug)]
343pub(crate) struct SplitQueueCompleteWork {
344    queue_avail: GuestMemory,
345    queue_used: GuestMemory,
346    queue_size: u16,
347    last_used_index: u16,
348    use_ring_event_index: bool,
349}
350
351impl SplitQueueCompleteWork {
352    pub fn new(
353        features: VirtioDeviceFeatures,
354        mem: GuestMemory,
355        params: QueueParams,
356    ) -> Result<Self, QueueError> {
357        let queue_avail = mem
358            .subrange(
359                params.avail_addr,
360                spec::AVAIL_OFFSET_RING
361                    + spec::AVAIL_ELEMENT_SIZE * params.size as u64
362                    + size_of::<u16>() as u64,
363                true,
364            )
365            .map_err(QueueError::Memory)?;
366        let queue_used = mem
367            .subrange(
368                params.used_addr,
369                spec::USED_OFFSET_RING
370                    + spec::USED_ELEMENT_SIZE * params.size as u64
371                    + size_of::<u16>() as u64,
372                true,
373            )
374            .map_err(QueueError::Memory)?;
375        Ok(Self {
376            queue_avail,
377            queue_used,
378            queue_size: params.size,
379            last_used_index: 0,
380            use_ring_event_index: features.bank0().ring_event_idx(),
381        })
382    }
383
384    pub fn complete_descriptor(
385        &mut self,
386        context: &SplitQueueCompletionContext,
387        bytes_written: u32,
388    ) -> Result<bool, QueueError> {
389        self.set_used_descriptor(
390            self.last_used_index,
391            context.descriptor_index,
392            bytes_written,
393        )?;
394        let last_used_index = self.last_used_index;
395        self.last_used_index = self.last_used_index.wrapping_add(1);
396
397        // Ensure used element writes are ordered before used index write.
398        atomic::fence(atomic::Ordering::Release);
399        self.set_used_index(self.last_used_index)?;
400
401        // Ensure the used index write is visible before reading the field that
402        // determines whether to signal.
403        atomic::fence(atomic::Ordering::SeqCst);
404        let send_signal = if self.use_ring_event_index {
405            last_used_index == self.get_used_event()?
406        } else {
407            !self.get_available_flags()?.no_interrupt()
408        };
409
410        Ok(send_signal)
411    }
412
413    fn get_available_flags(&self) -> Result<spec::AvailableFlags, QueueError> {
414        Ok(self
415            .queue_avail
416            .read_plain::<u16_le>(spec::AVAIL_OFFSET_FLAGS)
417            .map_err(QueueError::Memory)?
418            .get()
419            .into())
420    }
421
422    fn get_used_event(&self) -> Result<u16, QueueError> {
423        let addr = spec::AVAIL_OFFSET_RING + spec::AVAIL_ELEMENT_SIZE * self.queue_size as u64;
424        Ok(self
425            .queue_avail
426            .read_plain::<u16_le>(addr)
427            .map_err(QueueError::Memory)?
428            .get())
429    }
430
431    fn set_used_descriptor(
432        &self,
433        queue_last_used_index: u16,
434        descriptor_index: u16,
435        bytes_written: u32,
436    ) -> Result<(), QueueError> {
437        let wrapped_index = (queue_last_used_index % self.queue_size) as u64;
438        let addr = spec::USED_OFFSET_RING + spec::USED_ELEMENT_SIZE * wrapped_index;
439        self.queue_used
440            .write_plain(
441                addr,
442                &spec::UsedElement {
443                    id: (descriptor_index as u32).into(),
444                    len: bytes_written.into(),
445                },
446            )
447            .map_err(QueueError::Memory)
448    }
449
450    fn set_used_index(&self, index: u16) -> Result<(), QueueError> {
451        self.queue_used
452            .write_plain::<u16_le>(spec::USED_OFFSET_IDX, &index.into())
453            .map_err(QueueError::Memory)
454    }
455}
456
457pub struct DescriptorReader<'a> {
458    chain: DescriptorChain<'a>,
459}
460
461pub struct VirtioQueuePayload {
462    pub writeable: bool,
463    pub address: u64,
464    pub length: u32,
465}
466
467impl Iterator for DescriptorReader<'_> {
468    type Item = Result<VirtioQueuePayload, QueueError>;
469
470    fn next(&mut self) -> Option<Self::Item> {
471        self.chain.next().map(|descriptor| {
472            descriptor.map(|descriptor| VirtioQueuePayload {
473                writeable: descriptor.flags.write(),
474                address: descriptor.address,
475                length: descriptor.length,
476            })
477        })
478    }
479}
480
481pub struct DescriptorChain<'a> {
482    queue: &'a QueueCoreGetWork,
483    queue_size: u16,
484    indirect_support: bool,
485    indirect_queue: Option<GuestMemory>,
486    descriptor_index: Option<u16>,
487    num_read: u16,
488    max_desc_chain: u16,
489}
490
491impl<'a> DescriptorChain<'a> {
492    const MAX_DESC_CHAIN: u16 = 128;
493
494    fn new(queue: &'a QueueCoreGetWork, indirect_support: bool, descriptor_index: u16) -> Self {
495        Self {
496            queue,
497            queue_size: queue.size(),
498            indirect_support,
499            indirect_queue: None,
500            descriptor_index: Some(descriptor_index),
501            num_read: 0,
502            max_desc_chain: std::cmp::min(queue.size(), Self::MAX_DESC_CHAIN),
503        }
504    }
505
506    fn next_descriptor(&mut self) -> Result<Option<QueueDescriptor>, QueueError> {
507        let Some(descriptor_index) = self.descriptor_index else {
508            return Ok(None);
509        };
510        let descriptor = self.queue.descriptor(
511            self.indirect_queue
512                .as_ref()
513                .unwrap_or(&self.queue.queue_desc),
514            descriptor_index,
515        )?;
516        let descriptor = if !self.indirect_support || !descriptor.flags.indirect() {
517            descriptor
518        } else {
519            if self.indirect_queue.is_some() {
520                return Err(QueueError::DoubleIndirect);
521            }
522            let indirect_queue = self.indirect_queue.insert(
523                self.queue
524                    .mem
525                    .subrange(descriptor.address, descriptor.length as u64, true)
526                    .map_err(QueueError::Memory)?,
527            );
528            self.descriptor_index = Some(0);
529            self.queue_size = std::cmp::min(
530                (descriptor.length / size_of::<SplitDescriptor>() as u32) as u16,
531                self.queue_size,
532            );
533            self.max_desc_chain = std::cmp::min(self.queue_size, Self::MAX_DESC_CHAIN);
534            self.queue.descriptor(indirect_queue, 0)?
535        };
536
537        self.num_read += 1;
538        self.descriptor_index = descriptor.next.map(|next| next % self.queue_size);
539        // Limit the descriptor chain length to avoid running out of memory.
540        // This may be due to a cycle in the descriptor chain.
541        if self.descriptor_index.is_some() && self.num_read == self.max_desc_chain {
542            return Err(QueueError::TooLong);
543        }
544        Ok(Some(descriptor))
545    }
546}
547
548impl Iterator for DescriptorChain<'_> {
549    type Item = Result<QueueDescriptor, QueueError>;
550
551    fn next(&mut self) -> Option<Self::Item> {
552        self.next_descriptor().transpose()
553    }
554}