hv1_emulator/
message_queues.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Emulation of an unbounded synic message queue.
5//!
6//! This is separate from the synic emulator because it can be used with the
7//! real hypervisor as well.
8//!
9//! FUTURE: This should be replaced with a bounded queue, with some kind of back
10//! pressure mechanism.
11
12use hvdef::HvError;
13use hvdef::HvMessage;
14use hvdef::NUM_SINTS;
15use inspect::Inspect;
16use parking_lot::Mutex;
17use std::collections::VecDeque;
18use std::sync::atomic::AtomicU16;
19use std::sync::atomic::Ordering;
20use virt::x86::vp::SynicMessageQueues;
21
22/// A set of synic message queues, one per sint.
23#[derive(Inspect, Debug)]
24pub struct MessageQueues {
25    #[inspect(
26        flatten,
27        with = "|x| inspect::adhoc(|req| inspect::iter_by_index(x.lock().iter().map(|q| q.len())).inspect(req))"
28    )]
29    queues: Mutex<[VecDeque<HvMessage>; NUM_SINTS]>,
30    #[inspect(skip)]
31    pending: AtomicU16,
32}
33
34impl MessageQueues {
35    /// Returns a new empty instance.
36    pub fn new() -> Self {
37        Self {
38            queues: Default::default(),
39            pending: Default::default(),
40        }
41    }
42
43    /// Saves the queue state.
44    pub fn save(&self) -> SynicMessageQueues {
45        let queues = self
46            .queues
47            .lock()
48            .iter()
49            .map(|queue| queue.iter().map(|m| zerocopy::transmute!(*m)).collect())
50            .collect::<Vec<_>>()
51            .try_into()
52            .unwrap();
53
54        SynicMessageQueues { queues }
55    }
56
57    /// Restores the queue state.
58    pub fn restore(&self, value: &SynicMessageQueues) {
59        let queues = &mut self.queues.lock();
60        for (dest, src) in queues.iter_mut().zip(&value.queues) {
61            dest.truncate(0);
62            dest.extend(src.iter().map(|m| {
63                let m: HvMessage = zerocopy::transmute!(*m);
64                m
65            }));
66        }
67
68        let pending = queues
69            .iter()
70            .enumerate()
71            .fold(0, |p, (i, q)| p | ((!q.is_empty() as u16) << i));
72
73        self.pending.store(pending, Ordering::Relaxed);
74    }
75
76    /// Enqueues a message to be posted to the guest.
77    pub fn enqueue_message(&self, sint: u8, message: &HvMessage) -> bool {
78        let mut queues = self.queues.lock();
79        queues[sint as usize].push_back(*message);
80        let mask = 1 << sint;
81        self.pending.fetch_or(mask, Ordering::Relaxed) & mask == 0
82    }
83
84    /// Returns the bitmap of the sints that have pending messages.
85    pub fn pending_sints(&self) -> u16 {
86        self.pending.load(Ordering::Relaxed)
87    }
88
89    /// Posts any pending messages, using `post_message`.
90    ///
91    /// If `post_message` returns `Err(HvError::ObjectInUse)`, then the message
92    /// is retained in the queue. Otherwise, it is removed.
93    ///
94    /// Returns the sints that are still pending.
95    pub fn post_pending_messages(
96        &self,
97        sints: u16,
98        mut post_message: impl FnMut(u8, &HvMessage) -> Result<(), HvError>,
99    ) -> u16 {
100        for (sint_index, queue) in self.queues.lock().iter_mut().enumerate() {
101            let sint = sint_index as u8;
102            let mask = 1 << sint;
103            if sints & mask == 0 {
104                continue;
105            }
106
107            self.pending.fetch_and(!mask, Ordering::Relaxed);
108            while let Some(message) = queue.front() {
109                match post_message(sint, message) {
110                    Ok(()) => {
111                        tracing::debug!(sint, "posted sint message");
112                    }
113                    Err(HvError::ObjectInUse) => {
114                        tracing::debug!(sint, "message slot in use");
115                        self.pending.fetch_or(mask, Ordering::Relaxed);
116                        break;
117                    }
118                    Err(err) => {
119                        tracelimit::error_ratelimited!(
120                            error = &err as &dyn std::error::Error,
121                            sint,
122                            "dropping sint message"
123                        );
124                    }
125                }
126                queue.pop_front();
127            }
128        }
129        self.pending.load(Ordering::Relaxed)
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::MessageQueues;
136    use hvdef::HvError;
137    use hvdef::HvMessage;
138    use hvdef::HvMessageType;
139
140    #[test]
141    fn test_message_queues() {
142        let queues = MessageQueues::new();
143
144        let message = HvMessage::new(HvMessageType(0), 0, &[]);
145
146        queues.enqueue_message(0, &message);
147        queues.enqueue_message(2, &message);
148        queues.enqueue_message(4, &message);
149        queues.enqueue_message(4, &message);
150        assert_eq!(queues.pending_sints(), 0b10101);
151
152        let mut sints = 0;
153        queues.post_pending_messages(!1, |sint, _message| {
154            if sints & (1 << sint) == 0 {
155                sints |= 1 << sint;
156                Ok(())
157            } else {
158                Err(HvError::ObjectInUse)
159            }
160        });
161
162        assert_eq!(queues.pending_sints(), 0b10001);
163        assert_eq!(sints, 0b10100);
164    }
165}