vmbus_async/
core.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Internal types for performing asynchronous channel IO.
5
6use inspect::Inspect;
7use inspect_counters::Counter;
8use pal_async::multi_waker::MultiWaker;
9use std::task::Context;
10use std::task::Poll;
11use vmbus_channel::ChannelClosed;
12use vmbus_channel::RawAsyncChannel;
13use vmbus_channel::SignalVmbusChannel;
14use vmbus_ring as ring;
15use vmbus_ring::IncomingRing;
16use vmbus_ring::OutgoingOffset;
17use vmbus_ring::OutgoingRing;
18use vmbus_ring::RingMem;
19
20pub struct Core<M: RingMem> {
21    signal: Box<dyn SignalVmbusChannel>,
22    multi_waker: MultiWaker<2>,
23    in_ring: IncomingRing<M>,
24    out_ring: OutgoingRing<M>,
25}
26
27impl<M: RingMem> Inspect for Core<M> {
28    fn inspect(&self, req: inspect::Request<'_>) {
29        req.respond()
30            .field("incoming_ring", &self.in_ring)
31            .field("outgoing_ring", &self.out_ring);
32    }
33}
34
35impl<M: RingMem> Core<M> {
36    pub fn new(channel: RawAsyncChannel<M>) -> Self {
37        let RawAsyncChannel {
38            in_ring,
39            out_ring,
40            signal,
41        } = channel;
42        Self {
43            signal,
44            multi_waker: MultiWaker::new(),
45            in_ring,
46            out_ring,
47        }
48    }
49
50    pub fn in_ring(&self) -> &IncomingRing<M> {
51        &self.in_ring
52    }
53
54    pub fn out_ring(&self) -> &OutgoingRing<M> {
55        &self.out_ring
56    }
57}
58
59#[derive(Debug)]
60pub(crate) enum PollError {
61    Ring(ring::Error),
62    Closed,
63}
64
65impl<M: RingMem> Core<M> {
66    fn poll_ready(&self, cx: &mut Context<'_>, for_outgoing: bool) -> Poll<Result<(), PollError>> {
67        // Poll, wrapping the context with a multi waker context so that both
68        // the incoming and outgoing tasks will be woken when there is a signal.
69        self.multi_waker
70            .poll_wrapped(cx, for_outgoing.into(), |cx| {
71                self.signal
72                    .poll_for_signal(cx)
73                    .map_err(|ChannelClosed| PollError::Closed)
74            })
75    }
76}
77
78impl<M: RingMem> Core<M> {
79    pub fn signal(&self) {
80        self.signal.signal_remote();
81    }
82}
83
84#[derive(Debug, Inspect)]
85pub struct ReadState {
86    #[inspect(flatten)]
87    pub ptrs: ring::IncomingOffset,
88    pub polls: Counter,
89    pub signals: Counter,
90    ready: bool,
91    masked: bool,
92}
93
94impl ReadState {
95    pub fn new(ptrs: ring::IncomingOffset) -> Self {
96        Self {
97            ptrs,
98            polls: Counter::new(),
99            signals: Counter::new(),
100            ready: false,
101            // It's safe to assume interrupts are initially masked, since
102            // setting the mask is an optimization but clearing it is required
103            // to avoid missing notifications.
104            masked: true,
105        }
106    }
107
108    /// Polls the incoming ring for readiness.
109    pub fn poll_ready<M: RingMem>(
110        &mut self,
111        cx: &mut Context<'_>,
112        core: &Core<M>,
113    ) -> Poll<Result<(), PollError>> {
114        while !self.ready {
115            // The ring buffer is believed to be empty. Unmask interrupts before
116            // double checking the ring buffer.
117            if self.masked {
118                core.in_ring.set_interrupt_mask(false);
119                self.masked = false;
120            } else {
121                // Interrupts are not supposed to be masked at this point.
122                // Detect ring control corruption here to avoid hard to diagnose
123                // issues later.
124                core.in_ring
125                    .verify_interrupts_unmasked()
126                    .map_err(PollError::Ring)?;
127            }
128
129            if core
130                .in_ring
131                .can_read(&mut self.ptrs)
132                .map_err(PollError::Ring)?
133            {
134                // The ring has packets.
135                //
136                // N.B. There is no need to mask interrupts again until just
137                // before packets are removed from the ring, since the opposite
138                // endpoint will not signal until there is an empty-to-non-empty
139                // transition.
140                self.ready = true;
141            } else {
142                std::task::ready!(core.poll_ready(cx, false))?;
143                self.polls.increment();
144            }
145        }
146        Poll::Ready(Ok(()))
147    }
148
149    /// Clears the cached ready state. Should be called when the ring buffer is
150    /// known to be empty.
151    pub fn clear_ready(&mut self) {
152        self.ready = false;
153    }
154
155    /// Clears the request for a wakeup when the ring is ready.
156    ///
157    /// Should be called just before removing packets from the ring so that the
158    /// opposite endpoint does not signal the ring-non-empty condition
159    /// unnecessarily.
160    pub fn clear_poll<M: RingMem>(&mut self, core: &Core<M>) {
161        if !self.masked {
162            core.in_ring.set_interrupt_mask(true);
163            self.masked = true;
164        }
165    }
166}
167
168#[derive(Debug, Inspect)]
169pub struct WriteState {
170    #[inspect(flatten)]
171    pub ptrs: OutgoingOffset,
172    pub signals: Counter,
173    pub polls: Counter,
174    ready: bool,
175    pending_size: usize,
176}
177
178impl WriteState {
179    pub fn new(ptrs: OutgoingOffset) -> Self {
180        Self {
181            ptrs,
182            signals: Counter::new(),
183            polls: Counter::new(),
184            ready: false,
185            pending_size: 0,
186        }
187    }
188
189    /// Polls the outgoing ring for readiness to send `send_size` bytes.
190    pub fn poll_ready<M: RingMem>(
191        &mut self,
192        cx: &mut Context<'_>,
193        core: &Core<M>,
194        send_size: usize,
195    ) -> Poll<Result<(), PollError>> {
196        while !self.ready {
197            // The ring buffer is believed to be full. Set the pending send size
198            // before double checking the ring buffer.
199            if self.pending_size < send_size {
200                // Since there is no rush to get data into a full ring,
201                // delay the signal until at least 1/4 of the ring is
202                // available (and until this packet fits) to avoid ping
203                // ponging with the opposite endpoint.
204                self.pending_size = send_size.max(core.out_ring().maximum_packet_size() / 4);
205                core.out_ring
206                    .set_pending_send_size(self.pending_size)
207                    .map_err(PollError::Ring)?;
208            }
209            if core
210                .out_ring
211                .can_write(&mut self.ptrs, send_size)
212                .map_err(PollError::Ring)?
213            {
214                self.ready = true;
215                // Clear the pending send size now if it's larger than the
216                // requested send size, since otherwise spurious interrupts may
217                // arrive.
218                if self.pending_size > send_size {
219                    self.clear_poll(core);
220                }
221            } else {
222                std::task::ready!(core.poll_ready(cx, true))?;
223                self.polls.increment();
224            }
225        }
226        Poll::Ready(Ok(()))
227    }
228
229    /// Clears the cached ready state. Should be called when the ring buffer is
230    /// known to be full.
231    pub fn clear_ready(&mut self) {
232        self.ready = false;
233    }
234
235    /// Clears the request for a wakeup when the ring is ready.
236    ///
237    /// Should be called just before inserting packets into the ring so that the
238    /// opposite endpoint does not signal the ring-non-full condition
239    /// unnecessarily.
240    pub fn clear_poll<M: RingMem>(&mut self, core: &Core<M>) {
241        if self.pending_size != 0 {
242            core.out_ring.set_pending_send_size(0).unwrap();
243            self.pending_size = 0;
244        }
245    }
246}