vmbus_ring/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! This module implements the low-level interface to the VmBus ring buffer. The
5//! ring buffer resides in guest memory and is mapped into the host, allowing
6//! efficient transfer of variable-sized packets.
7//!
8//! Ring buffer packets have headers called descriptors, which can specify a
9//! transaction ID and metadata referring to memory outside the ring buffer.
10//! Each packet is a multiple of 8 bytes.
11//!
12//! In practice, ring buffers always come in pairs so that packets can be both
13//! sent and received. However, this module's interfaces operate on them singly.
14
15#![expect(missing_docs)]
16#![forbid(unsafe_code)]
17
18pub mod gparange;
19
20pub use pipe_protocol::*;
21pub use protocol::PAGE_SIZE;
22pub use protocol::TransferPageRange;
23
24use crate::gparange::GpaRange;
25use guestmem::AccessError;
26use guestmem::MemoryRead;
27use guestmem::MemoryWrite;
28use guestmem::ranges::PagedRange;
29use inspect::Inspect;
30use protocol::*;
31use safeatomic::AtomicSliceOps;
32use std::fmt::Debug;
33use std::sync::Arc;
34use std::sync::atomic::AtomicU8;
35use std::sync::atomic::AtomicU32;
36use std::sync::atomic::AtomicU64;
37use std::sync::atomic::Ordering;
38use thiserror::Error;
39use zerocopy::FromZeros;
40use zerocopy::IntoBytes;
41
42mod pipe_protocol {
43    use zerocopy::FromBytes;
44    use zerocopy::Immutable;
45    use zerocopy::IntoBytes;
46    use zerocopy::KnownLayout;
47
48    /// Pipe channel packets are prefixed with this header to allow for
49    /// non-8-multiple lengths.
50    #[repr(C)]
51    #[derive(Debug, Copy, Clone, IntoBytes, Immutable, KnownLayout, FromBytes)]
52    pub struct PipeHeader {
53        pub packet_type: u32,
54        pub len: u32,
55    }
56
57    /// Regular data packet.
58    pub const PIPE_PACKET_TYPE_DATA: u32 = 1;
59    /// Data packet that has been partially consumed, in which case the `len`
60    /// field's high word is the number of bytes already read. The opposite
61    /// endpoint will never write this type.
62    pub const PIPE_PACKET_TYPE_PARTIAL: u32 = 2;
63    /// Setup a GPA direct buffer for RDMA.
64    pub const PIPE_PACKET_TYPE_SETUP_GPA_DIRECT: u32 = 3;
65    /// Tear down a GPA direct buffer.
66    pub const PIPE_PACKET_TYPE_TEARDOWN_GPA_DIRECT: u32 = 4;
67
68    /// The maximum size of a pipe packet's payload.
69    pub const MAXIMUM_PIPE_PACKET_SIZE: usize = 16384;
70}
71
72mod protocol {
73    use crate::CONTROL_WORD_COUNT;
74    use inspect::Inspect;
75    use safeatomic::AtomicSliceOps;
76    use std::fmt::Debug;
77    use std::sync::atomic::AtomicU32;
78    use std::sync::atomic::Ordering;
79    use zerocopy::FromBytes;
80    use zerocopy::Immutable;
81    use zerocopy::IntoBytes;
82    use zerocopy::KnownLayout;
83
84    /// VmBus ring buffers are sized in multiples 4KB pages, with a 4KB control page.
85    pub const PAGE_SIZE: usize = 4096;
86
87    /// The descriptor header on every packet.
88    #[repr(C)]
89    #[derive(Copy, Clone, Debug, IntoBytes, Immutable, KnownLayout, FromBytes)]
90    pub struct PacketDescriptor {
91        pub packet_type: u16,
92        pub data_offset8: u16,
93        pub length8: u16,
94        pub flags: u16,
95        pub transaction_id: u64,
96    }
97
98    /// A control page accessor.
99    pub struct Control<'a>(pub &'a [AtomicU32; CONTROL_WORD_COUNT]);
100
101    impl<'a> Control<'a> {
102        pub fn from_page(page: &'a guestmem::Page) -> Option<Self> {
103            let slice = page.as_atomic_slice()?[..CONTROL_WORD_COUNT]
104                .try_into()
105                .unwrap();
106            Some(Self(slice))
107        }
108
109        pub fn inp(&self) -> &AtomicU32 {
110            &self.0[0]
111        }
112        pub fn outp(&self) -> &AtomicU32 {
113            &self.0[1]
114        }
115        pub fn interrupt_mask(&self) -> &AtomicU32 {
116            &self.0[2]
117        }
118        pub fn pending_send_size(&self) -> &AtomicU32 {
119            &self.0[3]
120        }
121        pub fn feature_bits(&self) -> &AtomicU32 {
122            &self.0[16]
123        }
124    }
125
126    impl Inspect for Control<'_> {
127        fn inspect(&self, req: inspect::Request<'_>) {
128            req.respond()
129                .hex("in", self.inp().load(Ordering::Relaxed))
130                .hex("out", self.outp().load(Ordering::Relaxed))
131                .hex(
132                    "interrupt_mask",
133                    self.interrupt_mask().load(Ordering::Relaxed),
134                )
135                .hex(
136                    "pending_send_size",
137                    self.pending_send_size().load(Ordering::Relaxed),
138                )
139                .hex("feature_bits", self.feature_bits().load(Ordering::Relaxed));
140        }
141    }
142
143    impl Debug for Control<'_> {
144        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145            f.debug_struct("Control")
146                .field("inp", self.inp())
147                .field("outp", self.outp())
148                .field("interrupt_mask", self.interrupt_mask())
149                .field("pending_send_size", self.pending_send_size())
150                .field("feature_bits", self.feature_bits())
151                .finish()
152        }
153    }
154
155    /// If set, the endpoint supports sending signals when the number of free
156    /// bytes in the ring reaches or exceeds `pending_send_size`.
157    pub const FEATURE_SUPPORTS_PENDING_SEND_SIZE: u32 = 1;
158
159    /// A transfer range specifying a length and offset within a transfer page
160    /// set. Only used by NetVSP.
161    #[repr(C)]
162    #[derive(Debug, Copy, Clone, IntoBytes, Immutable, KnownLayout, FromBytes)]
163    pub struct TransferPageRange {
164        pub byte_count: u32,
165        pub byte_offset: u32,
166    }
167
168    /// The extended portion of the packet descriptor that describes a transfer
169    /// page packet.
170    #[repr(C)]
171    #[derive(Copy, Clone, Debug, IntoBytes, Immutable, KnownLayout, FromBytes)]
172    pub struct TransferPageHeader {
173        pub transfer_page_set_id: u16,
174        pub reserved: u16, // may have garbage non-zero values
175        pub range_count: u32,
176    }
177
178    /// The extended portion of the packet descriptor describing a GPA direct packet.
179    #[repr(C)]
180    #[derive(Copy, Clone, Debug, IntoBytes, Immutable, KnownLayout, FromBytes)]
181    pub struct GpaDirectHeader {
182        pub reserved: u32, // may have garbage non-zero values
183        pub range_count: u32,
184    }
185
186    pub const PACKET_FLAG_COMPLETION_REQUESTED: u16 = 1;
187
188    /// The packet footer.
189    #[repr(C)]
190    #[derive(Copy, Clone, Debug, IntoBytes, Immutable, KnownLayout, FromBytes)]
191    pub struct Footer {
192        pub reserved: u32,
193        /// The ring offset of the packet.
194        pub offset: u32,
195    }
196}
197
198#[derive(Copy, Clone, Debug, Error)]
199pub enum Error {
200    #[error("invalid ring buffer pointer")]
201    InvalidRingPointer,
202    #[error("invalid message length")]
203    InvalidMessageLength,
204    #[error("invalid data available")]
205    InvalidDataAvailable,
206    #[error("ring buffer too large")]
207    RingTooLarge,
208    #[error("invalid ring memory")]
209    InvalidRingMemory,
210    #[error("invalid descriptor offset or length")]
211    InvalidDescriptorLengths,
212    #[error("unknown packet descriptor flags")]
213    InvalidDescriptorFlags,
214    #[error("unknown packet descriptor type")]
215    InvalidDescriptorType,
216    #[error("invalid range count for gpa direct packet")]
217    InvalidDescriptorGpaDirectRangeCount,
218    #[error("the interrupt mask bit was supposed to be clear but it is set")]
219    InterruptsExternallyMasked,
220}
221
222#[derive(Copy, Clone, Debug, Error)]
223pub enum ReadError {
224    #[error("ring buffer empty")]
225    Empty,
226    #[error(transparent)]
227    Corrupt(#[from] Error),
228}
229
230#[derive(Copy, Clone, Debug, Error)]
231pub enum WriteError {
232    #[error("ring buffer full")]
233    Full(usize),
234    #[error(transparent)]
235    Corrupt(#[from] Error),
236}
237
238/// A range within a ring buffer.
239#[derive(Copy, Clone, Debug)]
240pub struct RingRange {
241    off: u32,
242    size: u32,
243}
244
245impl RingRange {
246    /// The empty range.
247    pub fn empty() -> Self {
248        RingRange { off: 0, size: 0 }
249    }
250
251    /// Retrieves a `MemoryWrite` that allows for writing to the range.
252    pub fn writer<'a, T: Ring>(&self, ring: &'a T) -> RingRangeWriter<'a, T::Memory> {
253        RingRangeWriter {
254            start: self.off,
255            end: self.off + self.size,
256            mem: ring.mem(),
257        }
258    }
259
260    /// Writes the full range using aligned writes of `u64` values.
261    ///
262    /// # Panics
263    /// Panics if the ring range is not exactly the size of `data`.
264    pub fn write_aligned_full<T: Ring>(&self, ring: &T, data: &[u64]) {
265        assert_eq!(self.size as usize, data.len() * 8);
266        ring.mem().write_aligned(self.off as usize, data.as_bytes());
267    }
268
269    /// Retrieves a `MemoryRead` that allows for writing to the range.
270    pub fn reader<'a, T: Ring>(&self, ring: &'a T) -> RingRangeReader<'a, T::Memory> {
271        RingRangeReader {
272            start: self.off,
273            end: self.off + self.size,
274            mem: ring.mem(),
275        }
276    }
277
278    /// Returns the length of the range.
279    pub fn len(&self) -> usize {
280        self.size as usize
281    }
282
283    /// Checks if this range is empty.
284    pub fn is_empty(&self) -> bool {
285        self.size == 0
286    }
287}
288
289/// A type implementing `MemoryRead` accessing a `RingRange`.
290pub struct RingRangeReader<'a, T> {
291    start: u32,
292    end: u32,
293    mem: &'a T,
294}
295
296impl<T: RingMem> MemoryRead for RingRangeReader<'_, T> {
297    fn read(&mut self, data: &mut [u8]) -> Result<&mut Self, AccessError> {
298        if self.len() < data.len() {
299            return Err(AccessError::OutOfRange(self.len(), data.len()));
300        }
301        self.mem.read_at(self.start as usize, data);
302        self.start += data.len() as u32;
303        Ok(self)
304    }
305
306    fn skip(&mut self, len: usize) -> Result<&mut Self, AccessError> {
307        if self.len() < len {
308            return Err(AccessError::OutOfRange(self.len(), len));
309        }
310        self.start += len as u32;
311        Ok(self)
312    }
313
314    fn len(&self) -> usize {
315        (self.end - self.start) as usize
316    }
317}
318
319/// A type implementing `MemoryWrite` accessing a `RingRange`.
320pub struct RingRangeWriter<'a, T> {
321    start: u32,
322    end: u32,
323    mem: &'a T,
324}
325
326impl<T: RingMem> MemoryWrite for RingRangeWriter<'_, T> {
327    fn write(&mut self, data: &[u8]) -> Result<(), AccessError> {
328        if self.len() < data.len() {
329            return Err(AccessError::OutOfRange(self.len(), data.len()));
330        }
331        self.mem.write_at(self.start as usize, data);
332        self.start += data.len() as u32;
333        Ok(())
334    }
335
336    fn fill(&mut self, _val: u8, _len: usize) -> Result<(), AccessError> {
337        unimplemented!()
338    }
339
340    fn len(&self) -> usize {
341        (self.end - self.start) as usize
342    }
343}
344
345/// The alternate types of incoming packets. For packets with external data,
346/// includes a `RingRange` whose data is the variable portion of the packet
347/// descriptor.
348#[derive(Debug, Copy, Clone)]
349pub enum IncomingPacketType {
350    InBand,
351    Completion,
352    GpaDirect(u32, RingRange),
353    TransferPages(u16, u32, RingRange),
354}
355
356/// An incoming packet.
357#[derive(Debug)]
358pub struct IncomingPacket {
359    pub transaction_id: Option<u64>,
360    pub typ: IncomingPacketType,
361    pub payload: RingRange,
362}
363
364const PACKET_TYPE_IN_BAND: u16 = 6;
365const PACKET_TYPE_TRANSFER_PAGES: u16 = 0x7;
366const PACKET_TYPE_GPA_DIRECT: u16 = 0x9;
367const PACKET_TYPE_COMPLETION: u16 = 0xb;
368
369fn parse_packet<M: RingMem>(
370    ring: &M,
371    ring_off: u32,
372    avail: u32,
373) -> Result<(u32, IncomingPacket), ReadError> {
374    let mut desc = PacketDescriptor::new_zeroed();
375    ring.read_aligned(ring_off as usize, desc.as_mut_bytes());
376    let len = desc.length8 as u32 * 8;
377    if desc.length8 < desc.data_offset8 || desc.data_offset8 < 2 || avail < len {
378        return Err(ReadError::Corrupt(Error::InvalidDescriptorLengths));
379    }
380
381    if (desc.flags & !PACKET_FLAG_COMPLETION_REQUESTED) != 0 {
382        return Err(ReadError::Corrupt(Error::InvalidDescriptorFlags));
383    }
384    let transaction_id = if desc.flags & PACKET_FLAG_COMPLETION_REQUESTED != 0
385        || desc.packet_type == PACKET_TYPE_COMPLETION
386    {
387        Some(desc.transaction_id)
388    } else {
389        None
390    };
391    let typ = match desc.packet_type {
392        PACKET_TYPE_IN_BAND => IncomingPacketType::InBand,
393        PACKET_TYPE_COMPLETION => IncomingPacketType::Completion,
394        PACKET_TYPE_TRANSFER_PAGES => {
395            let mut tph = TransferPageHeader::new_zeroed();
396            ring.read_aligned(ring_off as usize + 16, tph.as_mut_bytes());
397            IncomingPacketType::TransferPages(
398                tph.transfer_page_set_id,
399                tph.range_count,
400                RingRange {
401                    off: ring_off + 24,
402                    size: desc.data_offset8 as u32 * 8 - 24,
403                },
404            )
405        }
406        PACKET_TYPE_GPA_DIRECT => {
407            let mut gph = GpaDirectHeader::new_zeroed();
408            ring.read_aligned(ring_off as usize + 16, gph.as_mut_bytes());
409            if gph.range_count == 0 {
410                return Err(ReadError::Corrupt(
411                    Error::InvalidDescriptorGpaDirectRangeCount,
412                ));
413            }
414            IncomingPacketType::GpaDirect(
415                gph.range_count,
416                RingRange {
417                    off: ring_off + 24,
418                    size: desc.data_offset8 as u32 * 8 - 24,
419                },
420            )
421        }
422        _ => return Err(ReadError::Corrupt(Error::InvalidDescriptorType)),
423    };
424    let payload = RingRange {
425        off: ring_off + desc.data_offset8 as u32 * 8,
426        size: (desc.length8 - desc.data_offset8) as u32 * 8,
427    };
428    Ok((
429        len,
430        IncomingPacket {
431            transaction_id,
432            typ,
433            payload,
434        },
435    ))
436}
437
438/// The size of the control region in 32-bit words.
439pub const CONTROL_WORD_COUNT: usize = 32;
440
441/// A trait for memory backing a ring buffer.
442pub trait RingMem: Send {
443    /// Returns the control page.
444    fn control(&self) -> &[AtomicU32; CONTROL_WORD_COUNT];
445
446    /// Reads from the data portion of the ring, wrapping (once) at the end of
447    /// the ring. Precondition: `addr + data.len() <= self.len() * 2`.
448    fn read_at(&self, addr: usize, data: &mut [u8]);
449
450    /// Reads from the data portion of the ring, as in [`RingMem::read_at`]. `addr` and
451    /// `data.len()` must be multiples of 8.
452    ///
453    /// `read_at` may be faster for large or variable-sized reads.
454    fn read_aligned(&self, addr: usize, data: &mut [u8]) {
455        debug_assert!(addr.is_multiple_of(8));
456        debug_assert!(data.len().is_multiple_of(8));
457        self.read_at(addr, data)
458    }
459
460    /// Writes to the data portion of the ring, wrapping (once) at the end of
461    /// the ring. Precondition: `addr + data.len() <= self.len() * 2`.
462    fn write_at(&self, addr: usize, data: &[u8]);
463
464    /// Writes to the data portion of the ring, as in [`RingMem::write_at`]. `addr` and
465    /// `data.len()` must be multiples of 8.
466    ///
467    /// `write_at` may be faster for large or variable-sized writes.
468    fn write_aligned(&self, addr: usize, data: &[u8]) {
469        debug_assert!(addr.is_multiple_of(8));
470        debug_assert!(data.len().is_multiple_of(8));
471        self.write_at(addr, data)
472    }
473
474    /// Returns the length of the ring in bytes.
475    fn len(&self) -> usize;
476}
477
478/// Implementation of `RingMem` for references. Useful for tests.
479impl<T: RingMem + Sync> RingMem for &'_ T {
480    fn control(&self) -> &[AtomicU32; CONTROL_WORD_COUNT] {
481        (*self).control()
482    }
483    fn read_at(&self, addr: usize, data: &mut [u8]) {
484        (*self).read_at(addr, data)
485    }
486    fn write_at(&self, addr: usize, data: &[u8]) {
487        (*self).write_at(addr, data)
488    }
489    fn len(&self) -> usize {
490        (*self).len()
491    }
492
493    fn read_aligned(&self, addr: usize, data: &mut [u8]) {
494        (*self).read_aligned(addr, data)
495    }
496
497    fn write_aligned(&self, addr: usize, data: &[u8]) {
498        (*self).write_aligned(addr, data)
499    }
500}
501
502#[derive(Debug)]
503pub struct SingleMappedRingMem<T>(pub T);
504
505impl<T: AsRef<[AtomicU8]>> SingleMappedRingMem<T> {
506    fn control_range(&self) -> &[AtomicU8; PAGE_SIZE] {
507        self.0.as_ref()[..PAGE_SIZE].try_into().unwrap()
508    }
509
510    fn data(&self) -> &[AtomicU8] {
511        &self.0.as_ref()[PAGE_SIZE..]
512    }
513}
514
515impl<T: AsRef<[AtomicU8]> + Send> RingMem for SingleMappedRingMem<T> {
516    fn read_at(&self, mut addr: usize, data: &mut [u8]) {
517        if addr >= self.len() {
518            addr -= self.len();
519        }
520        let this_data = self.data();
521        if addr + data.len() <= self.len() {
522            this_data[addr..addr + data.len()].atomic_read(data);
523        } else {
524            let data_len = data.len();
525            let (first, last) = data.split_at_mut(self.len() - addr);
526            this_data[addr..].atomic_read(first);
527            this_data[..data_len - (self.len() - addr)].atomic_read(last);
528        }
529    }
530
531    fn write_at(&self, mut addr: usize, data: &[u8]) {
532        if addr > self.len() {
533            addr -= self.len();
534        }
535        let this_data = self.data();
536        if addr + data.len() <= self.len() {
537            this_data[addr..addr + data.len()].atomic_write(data);
538        } else {
539            let (first, last) = data.split_at(self.len() - addr);
540            this_data[addr..].atomic_write(first);
541            this_data[..data.len() - (self.len() - addr)].atomic_write(last);
542        }
543    }
544
545    fn control(&self) -> &[AtomicU32; CONTROL_WORD_COUNT] {
546        self.control_range().as_atomic_slice().unwrap()[..CONTROL_WORD_COUNT]
547            .try_into()
548            .unwrap()
549    }
550
551    fn len(&self) -> usize {
552        self.data().len()
553    }
554}
555
556/// An implementation of `RingMem` over a flat allocation. Useful for tests.
557#[derive(Clone)]
558pub struct FlatRingMem {
559    inner: Arc<FlatRingInner>,
560}
561
562struct FlatRingInner {
563    control: [AtomicU32; CONTROL_WORD_COUNT],
564    data: Vec<AtomicU8>,
565}
566
567impl FlatRingMem {
568    /// Allocates a new memory.
569    pub fn new(len: usize) -> Self {
570        let mut data = Vec::new();
571        data.resize_with(len, Default::default);
572        Self {
573            inner: Arc::new(FlatRingInner {
574                control: [0; CONTROL_WORD_COUNT].map(Into::into),
575                data,
576            }),
577        }
578    }
579}
580
581impl Debug for FlatRingMem {
582    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
583        f.debug_struct("FlatRingMem").finish()
584    }
585}
586
587impl RingMem for FlatRingMem {
588    fn read_at(&self, mut addr: usize, data: &mut [u8]) {
589        if addr > self.len() {
590            addr -= self.len();
591        }
592        if addr + data.len() <= self.len() {
593            self.inner.data[addr..addr + data.len()].atomic_read(data);
594        } else {
595            let data_len = data.len();
596            let (first, last) = data.split_at_mut(self.len() - addr);
597            self.inner.data[addr..].atomic_read(first);
598            self.inner.data[..data_len - (self.len() - addr)].atomic_read(last);
599        }
600    }
601
602    fn write_at(&self, mut addr: usize, data: &[u8]) {
603        if addr > self.len() {
604            addr -= self.len();
605        }
606        if addr + data.len() <= self.len() {
607            self.inner.data[addr..addr + data.len()].atomic_write(data);
608        } else {
609            let (first, last) = data.split_at(self.len() - addr);
610            self.inner.data[addr..].atomic_write(first);
611            self.inner.data[..data.len() - (self.len() - addr)].atomic_write(last);
612        }
613    }
614
615    fn control(&self) -> &[AtomicU32; CONTROL_WORD_COUNT] {
616        &self.inner.control
617    }
618
619    fn len(&self) -> usize {
620        self.inner.data.len()
621    }
622}
623
624/// A trait for ring buffer memory divided into discontiguous pages.
625pub trait PagedMemory: Send {
626    /// Returns the control page.
627    fn control(&self) -> &[AtomicU8; PAGE_SIZE];
628    /// Returns the number of data pages.
629    fn data_page_count(&self) -> usize;
630    /// Returns a data page.
631    ///
632    /// For performance reasons, `page` may be in `0..data_page_count*2`,
633    /// representing the ring logically mapped twice consecutively. The
634    /// implementation should return the same page for `n` and `n +
635    /// data_page_count`.
636    fn data(&self, page: usize) -> &[AtomicU8; PAGE_SIZE];
637}
638
639/// An implementation of [`RingMem`] on top of discontiguous pages.
640#[derive(Debug, Clone)]
641pub struct PagedRingMem<T>(T);
642
643impl<T: PagedMemory> PagedRingMem<T> {
644    /// Returns a new ring memory wrapping a type implementing [`PagedMemory`].
645    pub fn new(inner: T) -> Self {
646        Self(inner)
647    }
648}
649
650impl<T: PagedMemory> RingMem for PagedRingMem<T> {
651    fn len(&self) -> usize {
652        self.0.data_page_count() * PAGE_SIZE
653    }
654
655    fn read_at(&self, mut addr: usize, mut data: &mut [u8]) {
656        while !data.is_empty() {
657            let page = addr / PAGE_SIZE;
658            let offset = addr % PAGE_SIZE;
659            let offset_end = PAGE_SIZE.min(offset + data.len());
660            let len = offset_end - offset;
661            let (this, next) = data.split_at_mut(len);
662            self.0.data(page)[offset..offset_end].atomic_read(this);
663            addr += len;
664            data = next;
665        }
666    }
667
668    fn write_at(&self, mut addr: usize, mut data: &[u8]) {
669        while !data.is_empty() {
670            let page = addr / PAGE_SIZE;
671            let offset = addr % PAGE_SIZE;
672            let offset_end = PAGE_SIZE.min(offset + data.len());
673            let len = offset_end - offset;
674            let (this, next) = data.split_at(len);
675            self.0.data(page)[offset..offset_end].atomic_write(this);
676            addr += len;
677            data = next;
678        }
679    }
680
681    #[inline]
682    fn read_aligned(&self, addr: usize, data: &mut [u8]) {
683        debug_assert!(addr.is_multiple_of(8));
684        debug_assert!(data.len().is_multiple_of(8));
685        for (i, b) in data.chunks_exact_mut(8).enumerate() {
686            let addr = (addr & !7) + i * 8;
687            let page = addr / PAGE_SIZE;
688            let offset = addr % PAGE_SIZE;
689            b.copy_from_slice(
690                &self.0.data(page)[offset..offset + 8]
691                    .as_atomic::<AtomicU64>()
692                    .unwrap()
693                    .load(Ordering::Relaxed)
694                    .to_ne_bytes(),
695            );
696        }
697    }
698
699    #[inline]
700    fn write_aligned(&self, addr: usize, data: &[u8]) {
701        debug_assert!(addr.is_multiple_of(8));
702        debug_assert!(data.len().is_multiple_of(8));
703        for (i, b) in data.chunks_exact(8).enumerate() {
704            let addr = (addr & !7) + i * 8;
705            let page = addr / PAGE_SIZE;
706            let offset = addr % PAGE_SIZE;
707            self.0.data(page)[offset..offset + 8]
708                .as_atomic::<AtomicU64>()
709                .unwrap()
710                .store(u64::from_ne_bytes(b.try_into().unwrap()), Ordering::Relaxed);
711        }
712    }
713
714    #[inline]
715    fn control(&self) -> &[AtomicU32; CONTROL_WORD_COUNT] {
716        self.0.control().as_atomic_slice().unwrap()[..CONTROL_WORD_COUNT]
717            .try_into()
718            .unwrap()
719    }
720}
721
722/// Information about an outgoing packet.
723#[derive(Debug)]
724pub struct OutgoingPacket<'a> {
725    pub transaction_id: u64,
726    pub size: usize,
727    pub typ: OutgoingPacketType<'a>,
728}
729
730/// The outgoing packet type variants.
731#[derive(Debug, Copy, Clone)]
732pub enum OutgoingPacketType<'a> {
733    /// A non-transactional data packet.
734    InBandNoCompletion,
735    /// A transactional data packet.
736    InBandWithCompletion,
737    /// A completion packet.
738    Completion,
739    /// A GPA direct packet, which can reference memory outside the ring by address.
740    ///
741    /// Not supported on the host side of the ring.
742    GpaDirect(&'a [PagedRange<'a>]),
743    /// A transfer page packet, which can reference memory outside the ring by a
744    /// buffer ID and a set of offsets into some pre-established buffer
745    /// (typically a GPADL).
746    ///
747    /// Used by networking. Should not be used in new devices--just embed the
748    /// buffer offsets in the device-specific packet payload.
749    TransferPages(u16, &'a [TransferPageRange]),
750}
751
752/// Namespace type with methods to compute packet sizes, for use with
753/// `set_pending_send_size`.
754pub struct PacketSize(());
755
756impl PacketSize {
757    /// Computes the size of an in-band packet.
758    pub const fn in_band(payload_len: usize) -> usize {
759        size_of::<PacketDescriptor>() + ((payload_len + 7) & !7) + size_of::<Footer>()
760    }
761
762    /// Computes the size of a completion packet.
763    pub const fn completion(payload_len: usize) -> usize {
764        Self::in_band(payload_len)
765    }
766
767    // Computes the size of a gpa direct packet.
768    // pub fn gpa_direct()
769
770    /// Computes the size of a transfer page packet.
771    pub const fn transfer_pages(count: usize, payload_len: usize) -> usize {
772        Self::in_band(payload_len)
773            + size_of::<TransferPageHeader>()
774            + count * size_of::<TransferPageRange>()
775    }
776}
777
778/// A trait shared by the incoming and outgoing ring buffers. Used primarily
779/// with `RingRange::reader` and `RingRange::writer`.
780pub trait Ring {
781    /// The underlying memory type.
782    type Memory: RingMem;
783
784    /// The backing memory of the ring buffer.
785    fn mem(&self) -> &Self::Memory;
786}
787
788/// The interface to the receiving endpoint of a ring buffer.
789#[derive(Debug)]
790pub struct IncomingRing<M: RingMem> {
791    inner: InnerRing<M>,
792}
793
794impl<M: RingMem> Inspect for IncomingRing<M> {
795    fn inspect(&self, req: inspect::Request<'_>) {
796        self.inner.inspect(req);
797    }
798}
799
800/// The current incoming ring state.
801#[derive(Debug, Clone, Inspect)]
802pub struct IncomingOffset {
803    #[inspect(hex)]
804    cached_in: u32,
805    #[inspect(hex)]
806    committed_out: u32,
807    #[inspect(hex)]
808    next_out: u32,
809}
810
811impl IncomingOffset {
812    /// Reverts the removal of packets that have not yet been committed.
813    pub fn revert(&mut self) {
814        self.next_out = self.committed_out;
815    }
816}
817
818impl<M: RingMem> Ring for IncomingRing<M> {
819    type Memory = M;
820    fn mem(&self) -> &Self::Memory {
821        &self.inner.mem
822    }
823}
824
825impl<M: RingMem> IncomingRing<M> {
826    /// Returns a new incoming ring. Fails if the ring memory is not sized or
827    /// aligned correctly or if the ring control data is corrupt.
828    pub fn new(mem: M) -> Result<Self, Error> {
829        let inner = InnerRing::new(mem)?;
830        // Start with interrupts masked.
831        let control = inner.control();
832        control.interrupt_mask().store(1, Ordering::Relaxed);
833        Ok(Self { inner })
834    }
835
836    /// Indicates whether pending send size notification is supported on
837    /// the vmbus ring.
838    pub fn supports_pending_send_size(&self) -> bool {
839        let feature_bits = self.inner.control().feature_bits().load(Ordering::Relaxed);
840        (feature_bits & FEATURE_SUPPORTS_PENDING_SEND_SIZE) != 0
841    }
842
843    /// Enables or disables the interrupt mask, declaring to the opposite
844    /// endpoint that interrupts should not or should be sent for a ring
845    /// empty-to-non-empty transition.
846    pub fn set_interrupt_mask(&self, state: bool) {
847        self.inner
848            .control()
849            .interrupt_mask()
850            .store(state as u32, Ordering::SeqCst);
851    }
852
853    /// Verifies that interrupts are currently unmasked.
854    ///
855    /// This can be used to check that ring state is consistent.
856    pub fn verify_interrupts_unmasked(&self) -> Result<(), Error> {
857        if self
858            .inner
859            .control()
860            .interrupt_mask()
861            .load(Ordering::Relaxed)
862            == 0
863        {
864            Ok(())
865        } else {
866            Err(Error::InterruptsExternallyMasked)
867        }
868    }
869
870    /// Returns the current incoming offset, for passing to `read` and
871    /// `commit_read`.
872    pub fn incoming(&self) -> Result<IncomingOffset, Error> {
873        let control = self.inner.control();
874        let next_out = self
875            .inner
876            .validate(control.outp().load(Ordering::Relaxed))?;
877        let cached_in = self.inner.validate(control.inp().load(Ordering::Relaxed))?;
878        Ok(IncomingOffset {
879            next_out,
880            cached_in,
881            committed_out: next_out,
882        })
883    }
884
885    /// Returns true if there are any packets to read.
886    pub fn can_read(&self, incoming: &mut IncomingOffset) -> Result<bool, Error> {
887        let can_read = if incoming.next_out != incoming.cached_in {
888            true
889        } else {
890            let inp = self
891                .inner
892                .validate(self.inner.control().inp().load(Ordering::Acquire))?;
893            // Cache the new offset to ensure a stable result.
894            incoming.cached_in = inp;
895            incoming.next_out != inp
896        };
897        Ok(can_read)
898    }
899
900    /// Commits a series of packet reads, returning whether the opposite
901    /// endpoint should be signaled.
902    pub fn commit_read(&self, ptrs: &mut IncomingOffset) -> bool {
903        if ptrs.committed_out == ptrs.next_out {
904            return false;
905        }
906        let control = self.inner.control();
907        control.outp().store(ptrs.next_out, Ordering::SeqCst);
908        let pending_send_size = control.pending_send_size().load(Ordering::SeqCst);
909        // Some implementations set the pending send size to the size of the
910        // ring minus 1. The intent is that a signal arrive when the ring is
911        // completely empty, but this is invalid since the maximum writable ring
912        // size in the size of the ring minus 8. Mask off the low bits to work
913        // around this.
914        let pending_send_size = pending_send_size & !7;
915        let signal = if pending_send_size != 0 {
916            if let Ok(inp) = self.inner.validate(control.inp().load(Ordering::SeqCst)) {
917                let old_free = self.inner.free(inp, ptrs.committed_out);
918                let new_free = self.inner.free(inp, ptrs.next_out);
919                old_free < pending_send_size && new_free >= pending_send_size
920            } else {
921                false
922            }
923        } else {
924            false
925        };
926        ptrs.committed_out = ptrs.next_out;
927        signal
928    }
929
930    /// Parses the next packet descriptor, returning the parsed information and
931    /// a range that can be used to read the packet. The caller should commit
932    /// the read with `commit_read` to free up space in the ring.
933    pub fn read(&self, ptrs: &mut IncomingOffset) -> Result<IncomingPacket, ReadError> {
934        let outp = ptrs.next_out;
935        let mut inp = ptrs.cached_in;
936        if inp == outp {
937            inp = self
938                .inner
939                .validate(self.inner.control().inp().load(Ordering::Acquire))?;
940            if inp == outp {
941                return Err(ReadError::Empty);
942            }
943            ptrs.cached_in = inp;
944        }
945        let avail = self.inner.available(inp, outp);
946        if avail < 16 {
947            return Err(ReadError::Corrupt(Error::InvalidDataAvailable));
948        }
949        let (len, packet) = parse_packet(&self.inner.mem, outp, avail)?;
950        ptrs.next_out = self
951            .inner
952            .add_pointer(outp, len + size_of::<Footer>() as u32);
953
954        Ok(packet)
955    }
956}
957
958/// The sending side of a ring buffer.
959#[derive(Debug)]
960pub struct OutgoingRing<M: RingMem> {
961    inner: InnerRing<M>,
962}
963
964impl<M: RingMem> Inspect for OutgoingRing<M> {
965    fn inspect(&self, req: inspect::Request<'_>) {
966        self.inner.inspect(req);
967    }
968}
969
970/// An outgoing ring offset, used to determine the position to write packets to.
971#[derive(Debug, Clone, Inspect)]
972pub struct OutgoingOffset {
973    #[inspect(hex)]
974    cached_out: u32,
975    #[inspect(hex)]
976    committed_in: u32,
977    #[inspect(hex)]
978    next_in: u32,
979}
980
981impl OutgoingOffset {
982    /// Reverts the insertion of packets that have not yet been committed.
983    pub fn revert(&mut self) {
984        self.next_in = self.committed_in;
985    }
986}
987
988impl<M: RingMem> Ring for OutgoingRing<M> {
989    type Memory = M;
990    fn mem(&self) -> &Self::Memory {
991        &self.inner.mem
992    }
993}
994
995impl<M: RingMem> OutgoingRing<M> {
996    /// Returns a new outgoing ring over `mem`.
997    pub fn new(mem: M) -> Result<Self, Error> {
998        let inner = InnerRing::new(mem)?;
999        // Report to the opposite endpoint that we will send interrupts for a
1000        // ring full to ring non-full transition. Feature bits are set by the
1001        // sending side.
1002        let control = inner.control();
1003        control
1004            .feature_bits()
1005            .store(FEATURE_SUPPORTS_PENDING_SEND_SIZE, Ordering::Relaxed);
1006        // Start with no interrupt requested.
1007        control.pending_send_size().store(0, Ordering::Relaxed);
1008        Ok(Self { inner })
1009    }
1010
1011    /// Returns the current outgoing offset, for passing to `write` and
1012    /// ultimately `commit_write`.
1013    pub fn outgoing(&self) -> Result<OutgoingOffset, Error> {
1014        let control = self.inner.control();
1015        let next_in = self.inner.validate(control.inp().load(Ordering::Relaxed))?;
1016        let cached_out = self
1017            .inner
1018            .validate(control.outp().load(Ordering::Relaxed))?;
1019        Ok(OutgoingOffset {
1020            cached_out,
1021            committed_in: next_in,
1022            next_in,
1023        })
1024    }
1025
1026    /// Sets the pending send size: the number of bytes that should be free in
1027    /// the ring before the opposite endpoint sends a ring-non-full signal.
1028    ///
1029    /// Fails if the packet size is larger than the ring's maximum packet size.
1030    pub fn set_pending_send_size(&self, len: usize) -> Result<(), Error> {
1031        if len > self.maximum_packet_size() {
1032            return Err(Error::InvalidMessageLength);
1033        }
1034        self.inner
1035            .control()
1036            .pending_send_size()
1037            .store((len as u32 + 7) & !7, Ordering::SeqCst);
1038
1039        Ok(())
1040    }
1041
1042    /// Returns the maximum packet size that can fit in the ring.
1043    pub fn maximum_packet_size(&self) -> usize {
1044        self.inner.len() as usize - 8
1045    }
1046
1047    /// Returns whether a packet can fit in the ring starting at the specified
1048    /// offset.
1049    pub fn can_write(&self, ptrs: &mut OutgoingOffset, len: usize) -> Result<bool, Error> {
1050        let can_write = if self.inner.free(ptrs.next_in, ptrs.cached_out) as usize >= len {
1051            true
1052        } else {
1053            let outp = self
1054                .inner
1055                .validate(self.inner.control().outp().load(Ordering::Relaxed))?;
1056
1057            // Cache the new offset to ensure a stable result.
1058            ptrs.cached_out = outp;
1059            self.inner.free(ptrs.next_in, outp) as usize >= len
1060        };
1061        Ok(can_write)
1062    }
1063
1064    /// Commits a series of writes that ended at the specified offset, returning
1065    /// whether the opposite endpoint should be signaled.
1066    pub fn commit_write(&self, ptrs: &mut OutgoingOffset) -> bool {
1067        if ptrs.committed_in == ptrs.next_in {
1068            return false;
1069        }
1070        let inp = ptrs.next_in;
1071
1072        // Update the ring offset and check if the opposite endpoint needs to be
1073        // signaled. This is the case only if interrupts are unmasked and the
1074        // ring was previously empty before this write.
1075        let control = self.inner.control();
1076        control.inp().store(inp, Ordering::SeqCst);
1077        let needs_interrupt = control.interrupt_mask().load(Ordering::SeqCst) == 0
1078            && control.outp().load(Ordering::SeqCst) == ptrs.committed_in;
1079
1080        ptrs.committed_in = inp;
1081        needs_interrupt
1082    }
1083
1084    /// Writes the header of the next packet and returns the ring range for the
1085    /// payload. The caller should write the payload, then commit the write (or
1086    /// multiple writes) with `commit_write`.
1087    ///
1088    /// Returns `Err(RingFull(len))` if the ring is full, where `len` is the
1089    /// number of bytes needed to write the requested packet.
1090    pub fn write(
1091        &self,
1092        ptrs: &mut OutgoingOffset,
1093        packet: &OutgoingPacket<'_>,
1094    ) -> Result<RingRange, WriteError> {
1095        const DESCRIPTOR_SIZE: usize = size_of::<PacketDescriptor>();
1096        let (packet_type, header_size, flags) = match packet.typ {
1097            OutgoingPacketType::InBandNoCompletion => (PACKET_TYPE_IN_BAND, DESCRIPTOR_SIZE, 0),
1098            OutgoingPacketType::InBandWithCompletion => (
1099                PACKET_TYPE_IN_BAND,
1100                DESCRIPTOR_SIZE,
1101                PACKET_FLAG_COMPLETION_REQUESTED,
1102            ),
1103            OutgoingPacketType::Completion => (PACKET_TYPE_COMPLETION, DESCRIPTOR_SIZE, 0),
1104            OutgoingPacketType::GpaDirect(ranges) => (
1105                PACKET_TYPE_GPA_DIRECT,
1106                DESCRIPTOR_SIZE
1107                    + size_of::<GpaDirectHeader>()
1108                    + ranges.iter().fold(0, |a, range| {
1109                        a + size_of::<GpaRange>() + size_of_val(range.gpns())
1110                    }),
1111                PACKET_FLAG_COMPLETION_REQUESTED,
1112            ),
1113            OutgoingPacketType::TransferPages(_, ranges) => (
1114                PACKET_TYPE_TRANSFER_PAGES,
1115                DESCRIPTOR_SIZE + size_of::<TransferPageHeader>() + size_of_val(ranges),
1116                PACKET_FLAG_COMPLETION_REQUESTED,
1117            ),
1118        };
1119        let msg_len = (packet.size + header_size).div_ceil(8) * 8;
1120        let total_msg_len = (msg_len + size_of::<Footer>()) as u32;
1121        if total_msg_len >= self.inner.len() - 8 {
1122            return Err(WriteError::Corrupt(Error::InvalidMessageLength));
1123        }
1124        let inp = ptrs.next_in;
1125        let mut outp = ptrs.cached_out;
1126        if self.inner.free(inp, outp) < total_msg_len {
1127            outp = self
1128                .inner
1129                .validate(self.inner.control().outp().load(Ordering::Relaxed))?;
1130            if self.inner.free(inp, outp) < total_msg_len {
1131                return Err(WriteError::Full(total_msg_len as usize));
1132            }
1133            ptrs.cached_out = outp;
1134        }
1135        let desc = PacketDescriptor {
1136            packet_type,
1137            data_offset8: header_size as u16 / 8,
1138            length8: (msg_len / 8) as u16,
1139            flags,
1140            transaction_id: packet.transaction_id,
1141        };
1142
1143        let footer = Footer {
1144            reserved: 0,
1145            offset: inp,
1146        };
1147
1148        let off = inp as usize;
1149        self.inner.mem.write_aligned(off, desc.as_bytes());
1150        match packet.typ {
1151            OutgoingPacketType::GpaDirect(ranges) => {
1152                let mut writer = RingRange {
1153                    off: (off + DESCRIPTOR_SIZE) as u32,
1154                    size: header_size as u32,
1155                }
1156                .writer(self);
1157                let gpa_header = GpaDirectHeader {
1158                    reserved: 0,
1159                    range_count: ranges.len() as u32,
1160                };
1161                writer
1162                    .write(gpa_header.as_bytes())
1163                    .map_err(|_| WriteError::Corrupt(Error::InvalidMessageLength))?;
1164
1165                for range in ranges {
1166                    let gpa_rng = GpaRange {
1167                        len: range.len() as u32,
1168                        offset: range.offset() as u32,
1169                    };
1170                    writer
1171                        .write(gpa_rng.as_bytes())
1172                        .map_err(|_| WriteError::Corrupt(Error::InvalidMessageLength))?;
1173                    writer
1174                        .write(range.gpns().as_bytes())
1175                        .map_err(|_| WriteError::Corrupt(Error::InvalidMessageLength))?;
1176                }
1177            }
1178            OutgoingPacketType::TransferPages(tp_id, ranges) => {
1179                let tp_header = TransferPageHeader {
1180                    transfer_page_set_id: tp_id,
1181                    reserved: 0,
1182                    range_count: ranges.len() as u32,
1183                };
1184                self.inner
1185                    .mem
1186                    .write_aligned(off + DESCRIPTOR_SIZE, tp_header.as_bytes());
1187                for (i, range) in ranges.iter().enumerate() {
1188                    self.inner.mem.write_aligned(
1189                        off + DESCRIPTOR_SIZE + size_of_val(&tp_header) + i * 8,
1190                        range.as_bytes(),
1191                    );
1192                }
1193            }
1194            _ => (),
1195        }
1196
1197        self.inner
1198            .mem
1199            .write_aligned(off + msg_len, footer.as_bytes());
1200        ptrs.next_in = self.inner.add_pointer(inp, total_msg_len);
1201        Ok(RingRange {
1202            off: inp + header_size as u32,
1203            size: packet.size as u32,
1204        })
1205    }
1206}
1207
1208struct InnerRing<M: RingMem> {
1209    mem: M,
1210    size: u32,
1211}
1212
1213impl<M: RingMem> Inspect for InnerRing<M> {
1214    fn inspect(&self, req: inspect::Request<'_>) {
1215        req.respond()
1216            .hex("ring_size", self.size)
1217            .field("control", self.control());
1218    }
1219}
1220
1221/// Inspects ring buffer state without creating an IncomingRing or OutgoingRing
1222/// structure.
1223///
1224/// # Panics
1225///
1226/// Panics if control_page is not aligned.
1227pub fn inspect_ring(control_page: &guestmem::Page, response: &mut inspect::Response<'_>) {
1228    let control = Control::from_page(control_page).expect("control page is not aligned");
1229    response.field("control", control);
1230}
1231
1232/// Returns whether a ring buffer is in a state where the receiving end might
1233/// need a signal.
1234pub fn reader_needs_signal(control_page: &guestmem::Page) -> bool {
1235    Control::from_page(control_page).is_some_and(|control| {
1236        control.interrupt_mask().load(Ordering::Relaxed) == 0
1237            && (control.inp().load(Ordering::Relaxed) != control.outp().load(Ordering::Relaxed))
1238    })
1239}
1240
1241/// Returns whether a ring buffer is in a state where the sending end might need
1242/// a signal.
1243pub fn writer_needs_signal(control_page: &guestmem::Page, ring_size: u32) -> bool {
1244    Control::from_page(control_page).is_some_and(|control| {
1245        let pending_size = control.pending_send_size().load(Ordering::Relaxed);
1246        pending_size != 0
1247            && ring_free(
1248                ring_size,
1249                control.inp().load(Ordering::Relaxed),
1250                control.outp().load(Ordering::Relaxed),
1251            ) >= pending_size
1252    })
1253}
1254
1255impl<M: RingMem> Debug for InnerRing<M> {
1256    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1257        f.debug_struct("InnerRing")
1258            .field("control", &self.control())
1259            .field("size", &self.size)
1260            .finish()
1261    }
1262}
1263
1264impl<M: RingMem> InnerRing<M> {
1265    pub fn new(mem: M) -> Result<Self, Error> {
1266        let ring_size = u32::try_from(mem.len()).map_err(|_| Error::InvalidRingMemory)?;
1267        if ring_size % 4096 != 0 {
1268            return Err(Error::InvalidRingMemory);
1269        }
1270        let ring = InnerRing {
1271            mem,
1272            size: ring_size,
1273        };
1274        Ok(ring)
1275    }
1276
1277    fn control(&self) -> Control<'_> {
1278        Control(self.mem.control())
1279    }
1280
1281    fn len(&self) -> u32 {
1282        self.size
1283    }
1284
1285    fn validate(&self, p: u32) -> Result<u32, Error> {
1286        if p >= self.size || !p.is_multiple_of(8) {
1287            Err(Error::InvalidRingPointer)
1288        } else {
1289            Ok(p)
1290        }
1291    }
1292
1293    fn add_pointer(&self, p: u32, off: u32) -> u32 {
1294        let np = p + off;
1295        if np >= self.size {
1296            assert!(np < self.size * 2);
1297            np - self.size
1298        } else {
1299            np
1300        }
1301    }
1302
1303    fn available(&self, inp: u32, outp: u32) -> u32 {
1304        if inp > outp {
1305            // |____outp....inp_____|
1306            inp - outp
1307        } else {
1308            // |....inp____outp.....|
1309            self.size + inp - outp
1310        }
1311    }
1312
1313    fn free(&self, inp: u32, outp: u32) -> u32 {
1314        ring_free(self.size, inp, outp)
1315    }
1316}
1317
1318fn ring_free(size: u32, inp: u32, outp: u32) -> u32 {
1319    // It's not possible to fully fill the ring since that state would be
1320    // indistinguishable from the empty ring. So subtract 8 bytes from the
1321    // result.
1322    if outp > inp {
1323        // |....inp____outp.....|
1324        outp - inp - 8
1325    } else {
1326        // |____outp....inp_____|
1327        size - (inp - outp) - 8
1328    }
1329}
1330
1331#[cfg(test)]
1332mod tests {
1333    use super::*;
1334
1335    fn write_simple<T: RingMem>(out_ring: &mut OutgoingRing<T>, buf: &[u8]) -> Option<bool> {
1336        let mut outgoing = out_ring.outgoing().unwrap();
1337        match out_ring.write(
1338            &mut outgoing,
1339            &OutgoingPacket {
1340                typ: OutgoingPacketType::InBandNoCompletion,
1341                size: buf.len(),
1342                transaction_id: 0,
1343            },
1344        ) {
1345            Ok(range) => {
1346                range.writer(out_ring).write(buf).unwrap();
1347                Some(out_ring.commit_write(&mut outgoing))
1348            }
1349            Err(WriteError::Full(_)) => None,
1350            Err(err) => panic!("{}", err),
1351        }
1352    }
1353
1354    fn read_simple<T: RingMem>(in_ring: &mut IncomingRing<T>) -> (Vec<u8>, bool) {
1355        let mut incoming = in_ring.incoming().unwrap();
1356        let msg = in_ring
1357            .read(&mut incoming)
1358            .unwrap()
1359            .payload
1360            .reader(in_ring)
1361            .read_all()
1362            .unwrap();
1363        let signal = in_ring.commit_read(&mut incoming);
1364        (msg, signal)
1365    }
1366
1367    #[test]
1368    fn test_ring() {
1369        let rmem = FlatRingMem::new(16384);
1370        let mut in_ring = IncomingRing::new(&rmem).unwrap();
1371        in_ring.set_interrupt_mask(false);
1372        let mut out_ring = OutgoingRing::new(&rmem).unwrap();
1373
1374        let p = &[1, 2, 3, 4, 5, 6, 7, 8];
1375        assert!(write_simple(&mut out_ring, p).unwrap());
1376
1377        let (msg, signal) = read_simple(&mut in_ring);
1378        assert!(!signal);
1379
1380        assert_eq!(p, &msg[..]);
1381    }
1382
1383    #[test]
1384    fn test_interrupt_mask() {
1385        let rmem = FlatRingMem::new(16384);
1386        let mut in_ring = IncomingRing::new(&rmem).unwrap();
1387        let mut out_ring = OutgoingRing::new(&rmem).unwrap();
1388
1389        // Interrupts are masked, so no signal is expected.
1390        assert!(!write_simple(&mut out_ring, &[1, 2, 3]).unwrap());
1391        assert!(!read_simple(&mut in_ring).1);
1392
1393        // Unmask interrupts, then try again, expecting a signal this time.
1394        in_ring.set_interrupt_mask(false);
1395        assert!(write_simple(&mut out_ring, &[1, 2, 3]).unwrap());
1396        assert!(!read_simple(&mut in_ring).1);
1397    }
1398
1399    #[test]
1400    fn test_pending_send_size() {
1401        let rmem = FlatRingMem::new(16384);
1402        let mut in_ring = IncomingRing::new(&rmem).unwrap();
1403        let mut out_ring = OutgoingRing::new(&rmem).unwrap();
1404
1405        // Fill the ring up with some packets.
1406        write_simple(&mut out_ring, &[1; 4000]).unwrap();
1407        write_simple(&mut out_ring, &[2; 4000]).unwrap();
1408        write_simple(&mut out_ring, &[3; 4000]).unwrap();
1409        write_simple(&mut out_ring, &[4; 4000]).unwrap();
1410        assert!(write_simple(&mut out_ring, &[5; 4000]).is_none());
1411
1412        // No pending send size yet.
1413        assert!(!read_simple(&mut in_ring).1);
1414
1415        // Fill the ring back up.
1416        write_simple(&mut out_ring, &[5; 4000]).unwrap();
1417        assert!(write_simple(&mut out_ring, &[6; 4000]).is_none());
1418
1419        // Set a pending send size for two packets worth of space (packet size +
1420        // 16 bytes for the descriptor and 8 bytes for the footer).
1421        out_ring.set_pending_send_size(4024 * 2).unwrap();
1422
1423        // There should be a signal after two packets, then no more signals.
1424        assert!(!read_simple(&mut in_ring).1);
1425        assert!(read_simple(&mut in_ring).1);
1426        assert!(!read_simple(&mut in_ring).1);
1427    }
1428}