1use super::IoQueueEntrySizes;
7use super::admin::AdminConfig;
8use super::admin::AdminHandler;
9use super::admin::AdminState;
10use super::admin::NsidConflict;
11use crate::queue::DoorbellMemory;
12use crate::queue::InvalidDoorbell;
13use disk_backend::Disk;
14use futures::FutureExt;
15use futures::StreamExt;
16use futures_concurrency::future::Race;
17use guestmem::GuestMemory;
18use guid::Guid;
19use inspect::Inspect;
20use inspect::InspectMut;
21use mesh::rpc::PendingRpc;
22use mesh::rpc::Rpc;
23use mesh::rpc::RpcSend;
24use nvme_resources::fault::FaultConfiguration;
25use pal_async::task::Spawn;
26use pal_async::task::Task;
27use parking_lot::RwLock;
28use std::future::pending;
29use std::sync::Arc;
30use task_control::TaskControl;
31use vmcore::interrupt::Interrupt;
32use vmcore::vm_task::VmTaskDriver;
33use vmcore::vm_task::VmTaskDriverSource;
34
35#[derive(InspectMut)]
36pub struct NvmeWorkers {
37 #[inspect(skip)]
38 _task: Task<()>,
39 #[inspect(flatten, send = "CoordinatorRequest::Inspect")]
40 send: mesh::Sender<CoordinatorRequest>,
41 #[inspect(skip)]
42 doorbells: Arc<RwLock<DoorbellMemory>>,
43 #[inspect(skip)]
44 state: EnableState,
45}
46
47#[derive(Debug)]
48enum EnableState {
49 Disabled,
50 Enabling(PendingRpc<()>),
51 Enabled,
52 Resetting(PendingRpc<()>),
53}
54
55impl NvmeWorkers {
56 pub fn new(
57 driver_source: &VmTaskDriverSource,
58 mem: GuestMemory,
59 interrupts: Vec<Interrupt>,
60 max_sqs: u16,
61 max_cqs: u16,
62 qe_sizes: Arc<parking_lot::Mutex<IoQueueEntrySizes>>,
63 subsystem_id: Guid,
64 fault_configuration: FaultConfiguration,
65 ) -> Self {
66 let num_qids = 2 + max_sqs.max(max_cqs) * 2;
67 let doorbells = Arc::new(RwLock::new(DoorbellMemory::new(num_qids)));
68
69 let driver = driver_source.simple();
70 let handler: AdminHandler = AdminHandler::new(
71 driver.clone(),
72 AdminConfig {
73 driver_source: driver_source.clone(),
74 mem,
75 interrupts,
76 doorbells: doorbells.clone(),
77 subsystem_id,
78 max_sqs,
79 max_cqs,
80 qe_sizes,
81 fault_configuration,
82 },
83 );
84 let coordinator = Coordinator {
85 driver: driver.clone(),
86 admin: TaskControl::new(handler),
87 reset: None,
88 };
89 let (send, recv) = mesh::mpsc_channel();
90 let task = driver.spawn("nvme-coord", coordinator.run(recv));
91 Self {
92 _task: task,
93 send,
94 doorbells,
95 state: EnableState::Disabled,
96 }
97 }
98
99 pub fn client(&self) -> NvmeFaultControllerClient {
100 NvmeFaultControllerClient {
101 send: self.send.clone(),
102 }
103 }
104
105 pub fn doorbell(&self, db_id: u16, value: u32) {
106 if let Err(InvalidDoorbell) = self.doorbells.read().try_write(db_id, value) {
107 tracelimit::error_ratelimited!(db_id, "write to invalid doorbell index");
108 }
109 }
110
111 pub fn enable(&mut self, asq: u64, asqs: u16, acq: u64, acqs: u16) {
112 if let EnableState::Disabled = self.state {
113 self.state = EnableState::Enabling(self.send.call(
114 CoordinatorRequest::EnableAdmin,
115 EnableAdminParams {
116 asq,
117 asqs,
118 acq,
119 acqs,
120 },
121 ));
122 } else {
123 panic!("not disabled: {:?}", self.state);
124 }
125 }
126
127 pub fn poll_enabled(&mut self) -> bool {
128 if let EnableState::Enabling(recv) = &mut self.state {
129 if recv.now_or_never().is_some() {
130 self.state = EnableState::Enabled;
131 true
132 } else {
133 false
134 }
135 } else {
136 panic!("not enabling: {:?}", self.state)
137 }
138 }
139
140 pub fn controller_reset(&mut self) {
141 if let EnableState::Enabled = self.state {
142 self.state =
143 EnableState::Resetting(self.send.call(CoordinatorRequest::ControllerReset, ()));
144 } else {
145 panic!("not enabled: {:?}", self.state);
146 }
147 }
148
149 pub fn poll_controller_reset(&mut self) -> bool {
150 if let EnableState::Resetting(recv) = &mut self.state {
151 if recv.now_or_never().is_some() {
152 self.state = EnableState::Disabled;
153 true
154 } else {
155 false
156 }
157 } else {
158 panic!("not resetting: {:?}", self.state)
159 }
160 }
161
162 pub async fn reset(&mut self) {
164 loop {
165 match &mut self.state {
166 EnableState::Disabled => break,
167 EnableState::Enabling(recv) => {
168 recv.await.unwrap();
169 self.state = EnableState::Enabled;
170 }
171 EnableState::Enabled => {
172 self.controller_reset();
173 }
174 EnableState::Resetting(recv) => {
175 recv.await.unwrap();
176 self.state = EnableState::Disabled;
177 }
178 }
179 }
180 }
181}
182
183#[derive(Debug)]
185pub struct NvmeFaultControllerClient {
186 send: mesh::Sender<CoordinatorRequest>,
187}
188
189impl NvmeFaultControllerClient {
190 pub async fn add_namespace(&self, nsid: u32, disk: Disk) -> Result<(), NsidConflict> {
192 self.send
193 .call(CoordinatorRequest::AddNamespace, (nsid, disk))
194 .await
195 .unwrap()
196 }
197
198 pub async fn remove_namespace(&self, nsid: u32) -> bool {
200 self.send
201 .call(CoordinatorRequest::RemoveNamespace, nsid)
202 .await
203 .unwrap()
204 }
205}
206
207#[derive(Inspect)]
208struct Coordinator {
209 driver: VmTaskDriver,
210 #[inspect(flatten)]
211 admin: TaskControl<AdminHandler, AdminState>,
212 #[inspect(with = "Option::is_some")]
213 reset: Option<Rpc<(), ()>>,
214}
215
216enum CoordinatorRequest {
217 EnableAdmin(Rpc<EnableAdminParams, ()>),
218 AddNamespace(Rpc<(u32, Disk), Result<(), NsidConflict>>),
219 RemoveNamespace(Rpc<u32, bool>),
220 Inspect(inspect::Deferred),
221 ControllerReset(Rpc<(), ()>),
222}
223
224struct EnableAdminParams {
225 asq: u64,
226 asqs: u16,
227 acq: u64,
228 acqs: u16,
229}
230
231impl Coordinator {
232 async fn run(mut self, mut recv: mesh::Receiver<CoordinatorRequest>) {
233 loop {
234 enum Event {
235 Request(Option<CoordinatorRequest>),
236 ResetComplete,
237 }
238
239 let controller_reset = async {
240 if self.reset.is_some() {
241 self.admin.stop().await;
242 if let Some(state) = self.admin.state_mut() {
243 state.drain().await;
244 self.admin.remove();
245 }
246 } else {
247 pending().await
248 }
249 };
250
251 let event = (
252 recv.next().map(Event::Request),
253 controller_reset.map(|_| Event::ResetComplete),
254 )
255 .race()
256 .await;
257
258 match event {
259 Event::Request(Some(req)) => match req {
260 CoordinatorRequest::EnableAdmin(rpc) => rpc.handle_sync(
261 |EnableAdminParams {
262 asq,
263 asqs,
264 acq,
265 acqs,
266 }| {
267 if !self.admin.has_state() {
268 let state =
269 AdminState::new(self.admin.task(), asq, asqs, acq, acqs);
270 self.admin.insert(&self.driver, "nvme-admin", state);
271 self.admin.start();
272 } else {
273 tracelimit::warn_ratelimited!("duplicate attempt to enable admin");
274 }
275 },
276 ),
277 CoordinatorRequest::AddNamespace(rpc) => {
278 rpc.handle(async |(nsid, disk)| {
279 let running = self.admin.stop().await;
280 let (admin, state) = self.admin.get_mut();
281 let r = admin.add_namespace(state, nsid, disk).await;
282 if running {
283 self.admin.start();
284 }
285 r
286 })
287 .await
288 }
289 CoordinatorRequest::RemoveNamespace(rpc) => {
290 rpc.handle(async |nsid| {
291 let running = self.admin.stop().await;
292 let (admin, state) = self.admin.get_mut();
293 let r = admin.remove_namespace(state, nsid).await;
294 if running {
295 self.admin.start();
296 }
297 r
298 })
299 .await
300 }
301 CoordinatorRequest::ControllerReset(rpc) => {
302 assert!(self.reset.is_none());
303 self.reset = Some(rpc);
304 }
305 CoordinatorRequest::Inspect(req) => req.inspect(&self),
306 },
307 Event::Request(None) => break,
308 Event::ResetComplete => {
309 self.reset.take().unwrap().complete(());
310 }
311 }
312 }
313 }
314}