1use inspect::Inspect;
7use inspect_counters::Counter;
8use pal_async::multi_waker::MultiWaker;
9use std::task::Context;
10use std::task::Poll;
11use vmbus_channel::ChannelClosed;
12use vmbus_channel::RawAsyncChannel;
13use vmbus_channel::SignalVmbusChannel;
14use vmbus_ring as ring;
15use vmbus_ring::IncomingRing;
16use vmbus_ring::OutgoingOffset;
17use vmbus_ring::OutgoingRing;
18use vmbus_ring::RingMem;
19
20pub struct Core<M: RingMem> {
21 signal: Box<dyn SignalVmbusChannel>,
22 multi_waker: MultiWaker<2>,
23 in_ring: IncomingRing<M>,
24 out_ring: OutgoingRing<M>,
25}
26
27impl<M: RingMem> Inspect for Core<M> {
28 fn inspect(&self, req: inspect::Request<'_>) {
29 req.respond()
30 .field("incoming_ring", &self.in_ring)
31 .field("outgoing_ring", &self.out_ring);
32 }
33}
34
35impl<M: RingMem> Core<M> {
36 pub fn new(channel: RawAsyncChannel<M>) -> Self {
37 let RawAsyncChannel {
38 in_ring,
39 out_ring,
40 signal,
41 } = channel;
42 Self {
43 signal,
44 multi_waker: MultiWaker::new(),
45 in_ring,
46 out_ring,
47 }
48 }
49
50 pub fn in_ring(&self) -> &IncomingRing<M> {
51 &self.in_ring
52 }
53
54 pub fn out_ring(&self) -> &OutgoingRing<M> {
55 &self.out_ring
56 }
57}
58
59#[derive(Debug)]
60pub(crate) enum PollError {
61 Ring(ring::Error),
62 Closed,
63}
64
65impl<M: RingMem> Core<M> {
66 fn poll_ready(&self, cx: &mut Context<'_>, for_outgoing: bool) -> Poll<Result<(), PollError>> {
67 self.multi_waker
70 .poll_wrapped(cx, for_outgoing.into(), |cx| {
71 self.signal
72 .poll_for_signal(cx)
73 .map_err(|ChannelClosed| PollError::Closed)
74 })
75 }
76}
77
78impl<M: RingMem> Core<M> {
79 pub fn signal(&self) {
80 self.signal.signal_remote();
81 }
82}
83
84#[derive(Debug, Inspect)]
85pub struct ReadState {
86 #[inspect(flatten)]
87 pub ptrs: ring::IncomingOffset,
88 pub polls: Counter,
89 pub signals: Counter,
90 ready: bool,
91 masked: bool,
92}
93
94impl ReadState {
95 pub fn new(ptrs: ring::IncomingOffset) -> Self {
96 Self {
97 ptrs,
98 polls: Counter::new(),
99 signals: Counter::new(),
100 ready: false,
101 masked: true,
105 }
106 }
107
108 pub fn poll_ready<M: RingMem>(
110 &mut self,
111 cx: &mut Context<'_>,
112 core: &Core<M>,
113 ) -> Poll<Result<(), PollError>> {
114 while !self.ready {
115 if self.masked {
118 core.in_ring.set_interrupt_mask(false);
119 self.masked = false;
120 } else {
121 core.in_ring
125 .verify_interrupts_unmasked()
126 .map_err(PollError::Ring)?;
127 }
128
129 if core
130 .in_ring
131 .can_read(&mut self.ptrs)
132 .map_err(PollError::Ring)?
133 {
134 self.ready = true;
141 } else {
142 std::task::ready!(core.poll_ready(cx, false))?;
143 self.polls.increment();
144 }
145 }
146 Poll::Ready(Ok(()))
147 }
148
149 pub fn clear_ready(&mut self) {
152 self.ready = false;
153 }
154
155 pub fn clear_poll<M: RingMem>(&mut self, core: &Core<M>) {
161 if !self.masked {
162 core.in_ring.set_interrupt_mask(true);
163 self.masked = true;
164 }
165 }
166}
167
168#[derive(Debug, Inspect)]
169pub struct WriteState {
170 #[inspect(flatten)]
171 pub ptrs: OutgoingOffset,
172 pub signals: Counter,
173 pub polls: Counter,
174 ready: bool,
175 pending_size: usize,
176}
177
178impl WriteState {
179 pub fn new(ptrs: OutgoingOffset) -> Self {
180 Self {
181 ptrs,
182 signals: Counter::new(),
183 polls: Counter::new(),
184 ready: false,
185 pending_size: 0,
186 }
187 }
188
189 pub fn poll_ready<M: RingMem>(
191 &mut self,
192 cx: &mut Context<'_>,
193 core: &Core<M>,
194 send_size: usize,
195 ) -> Poll<Result<(), PollError>> {
196 while !self.ready {
197 if self.pending_size < send_size {
200 self.pending_size = send_size.max(core.out_ring().maximum_packet_size() / 4);
205 core.out_ring
206 .set_pending_send_size(self.pending_size)
207 .map_err(PollError::Ring)?;
208 }
209 if core
210 .out_ring
211 .can_write(&mut self.ptrs, send_size)
212 .map_err(PollError::Ring)?
213 {
214 self.ready = true;
215 if self.pending_size > send_size {
219 self.clear_poll(core);
220 }
221 } else {
222 std::task::ready!(core.poll_ready(cx, true))?;
223 self.polls.increment();
224 }
225 }
226 Poll::Ready(Ok(()))
227 }
228
229 pub fn clear_ready(&mut self) {
232 self.ready = false;
233 }
234
235 pub fn clear_poll<M: RingMem>(&mut self, core: &Core<M>) {
241 if self.pending_size != 0 {
242 core.out_ring.set_pending_send_size(0).unwrap();
243 self.pending_size = 0;
244 }
245 }
246}