nvme_test/workers/
coordinator.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Coordinator between queues and hot add/remove of namespaces.
5
6use super::IoQueueEntrySizes;
7use super::admin::AdminConfig;
8use super::admin::AdminHandler;
9use super::admin::AdminState;
10use super::admin::NsidConflict;
11use crate::queue::DoorbellMemory;
12use crate::queue::InvalidDoorbell;
13use disk_backend::Disk;
14use futures::FutureExt;
15use futures::StreamExt;
16use futures_concurrency::future::Race;
17use guestmem::GuestMemory;
18use guid::Guid;
19use inspect::Inspect;
20use inspect::InspectMut;
21use mesh::rpc::PendingRpc;
22use mesh::rpc::Rpc;
23use mesh::rpc::RpcSend;
24use nvme_resources::fault::FaultConfiguration;
25use pal_async::task::Spawn;
26use pal_async::task::Task;
27use parking_lot::RwLock;
28use std::future::pending;
29use std::sync::Arc;
30use task_control::TaskControl;
31use vmcore::interrupt::Interrupt;
32use vmcore::vm_task::VmTaskDriver;
33use vmcore::vm_task::VmTaskDriverSource;
34
35#[derive(InspectMut)]
36pub struct NvmeWorkers {
37    #[inspect(skip)]
38    _task: Task<()>,
39    #[inspect(flatten, send = "CoordinatorRequest::Inspect")]
40    send: mesh::Sender<CoordinatorRequest>,
41    #[inspect(skip)]
42    doorbells: Arc<RwLock<DoorbellMemory>>,
43    #[inspect(skip)]
44    state: EnableState,
45}
46
47#[derive(Debug)]
48enum EnableState {
49    Disabled,
50    Enabling(PendingRpc<()>),
51    Enabled,
52    Resetting(PendingRpc<()>),
53}
54
55impl NvmeWorkers {
56    pub fn new(
57        driver_source: &VmTaskDriverSource,
58        mem: GuestMemory,
59        interrupts: Vec<Interrupt>,
60        max_sqs: u16,
61        max_cqs: u16,
62        qe_sizes: Arc<parking_lot::Mutex<IoQueueEntrySizes>>,
63        subsystem_id: Guid,
64        fault_configuration: FaultConfiguration,
65    ) -> Self {
66        let num_qids = 2 + max_sqs.max(max_cqs) * 2;
67        let doorbells = Arc::new(RwLock::new(DoorbellMemory::new(num_qids)));
68
69        let driver = driver_source.simple();
70        let handler: AdminHandler = AdminHandler::new(
71            driver.clone(),
72            AdminConfig {
73                driver_source: driver_source.clone(),
74                mem,
75                interrupts,
76                doorbells: doorbells.clone(),
77                subsystem_id,
78                max_sqs,
79                max_cqs,
80                qe_sizes,
81                fault_configuration,
82            },
83        );
84        let coordinator = Coordinator {
85            driver: driver.clone(),
86            admin: TaskControl::new(handler),
87            reset: None,
88        };
89        let (send, recv) = mesh::mpsc_channel();
90        let task = driver.spawn("nvme-coord", coordinator.run(recv));
91        Self {
92            _task: task,
93            send,
94            doorbells,
95            state: EnableState::Disabled,
96        }
97    }
98
99    pub fn client(&self) -> NvmeFaultControllerClient {
100        NvmeFaultControllerClient {
101            send: self.send.clone(),
102        }
103    }
104
105    pub fn doorbell(&self, db_id: u16, value: u32) {
106        if let Err(InvalidDoorbell) = self.doorbells.read().try_write(db_id, value) {
107            tracelimit::error_ratelimited!(db_id, "write to invalid doorbell index");
108        }
109    }
110
111    pub fn enable(&mut self, asq: u64, asqs: u16, acq: u64, acqs: u16) {
112        if let EnableState::Disabled = self.state {
113            self.state = EnableState::Enabling(self.send.call(
114                CoordinatorRequest::EnableAdmin,
115                EnableAdminParams {
116                    asq,
117                    asqs,
118                    acq,
119                    acqs,
120                },
121            ));
122        } else {
123            panic!("not disabled: {:?}", self.state);
124        }
125    }
126
127    pub fn poll_enabled(&mut self) -> bool {
128        if let EnableState::Enabling(recv) = &mut self.state {
129            if recv.now_or_never().is_some() {
130                self.state = EnableState::Enabled;
131                true
132            } else {
133                false
134            }
135        } else {
136            panic!("not enabling: {:?}", self.state)
137        }
138    }
139
140    pub fn controller_reset(&mut self) {
141        if let EnableState::Enabled = self.state {
142            self.state =
143                EnableState::Resetting(self.send.call(CoordinatorRequest::ControllerReset, ()));
144        } else {
145            panic!("not enabled: {:?}", self.state);
146        }
147    }
148
149    pub fn poll_controller_reset(&mut self) -> bool {
150        if let EnableState::Resetting(recv) = &mut self.state {
151            if recv.now_or_never().is_some() {
152                self.state = EnableState::Disabled;
153                true
154            } else {
155                false
156            }
157        } else {
158            panic!("not resetting: {:?}", self.state)
159        }
160    }
161
162    // Reset the workers from whatever state they are in.
163    pub async fn reset(&mut self) {
164        loop {
165            match &mut self.state {
166                EnableState::Disabled => break,
167                EnableState::Enabling(recv) => {
168                    recv.await.unwrap();
169                    self.state = EnableState::Enabled;
170                }
171                EnableState::Enabled => {
172                    self.controller_reset();
173                }
174                EnableState::Resetting(recv) => {
175                    recv.await.unwrap();
176                    self.state = EnableState::Disabled;
177                }
178            }
179        }
180    }
181}
182
183/// Client for modifying the NVMe controller state at runtime.
184#[derive(Debug)]
185pub struct NvmeFaultControllerClient {
186    send: mesh::Sender<CoordinatorRequest>,
187}
188
189impl NvmeFaultControllerClient {
190    /// Adds a namespace.
191    pub async fn add_namespace(&self, nsid: u32, disk: Disk) -> Result<(), NsidConflict> {
192        self.send
193            .call(CoordinatorRequest::AddNamespace, (nsid, disk))
194            .await
195            .unwrap()
196    }
197
198    /// Removes a namespace.
199    pub async fn remove_namespace(&self, nsid: u32) -> bool {
200        self.send
201            .call(CoordinatorRequest::RemoveNamespace, nsid)
202            .await
203            .unwrap()
204    }
205}
206
207#[derive(Inspect)]
208struct Coordinator {
209    driver: VmTaskDriver,
210    #[inspect(flatten)]
211    admin: TaskControl<AdminHandler, AdminState>,
212    #[inspect(with = "Option::is_some")]
213    reset: Option<Rpc<(), ()>>,
214}
215
216enum CoordinatorRequest {
217    EnableAdmin(Rpc<EnableAdminParams, ()>),
218    AddNamespace(Rpc<(u32, Disk), Result<(), NsidConflict>>),
219    RemoveNamespace(Rpc<u32, bool>),
220    Inspect(inspect::Deferred),
221    ControllerReset(Rpc<(), ()>),
222}
223
224struct EnableAdminParams {
225    asq: u64,
226    asqs: u16,
227    acq: u64,
228    acqs: u16,
229}
230
231impl Coordinator {
232    async fn run(mut self, mut recv: mesh::Receiver<CoordinatorRequest>) {
233        loop {
234            enum Event {
235                Request(Option<CoordinatorRequest>),
236                ResetComplete,
237            }
238
239            let controller_reset = async {
240                if self.reset.is_some() {
241                    self.admin.stop().await;
242                    if let Some(state) = self.admin.state_mut() {
243                        state.drain().await;
244                        self.admin.remove();
245                    }
246                } else {
247                    pending().await
248                }
249            };
250
251            let event = (
252                recv.next().map(Event::Request),
253                controller_reset.map(|_| Event::ResetComplete),
254            )
255                .race()
256                .await;
257
258            match event {
259                Event::Request(Some(req)) => match req {
260                    CoordinatorRequest::EnableAdmin(rpc) => rpc.handle_sync(
261                        |EnableAdminParams {
262                             asq,
263                             asqs,
264                             acq,
265                             acqs,
266                         }| {
267                            if !self.admin.has_state() {
268                                let state =
269                                    AdminState::new(self.admin.task(), asq, asqs, acq, acqs);
270                                self.admin.insert(&self.driver, "nvme-admin", state);
271                                self.admin.start();
272                            } else {
273                                tracelimit::warn_ratelimited!("duplicate attempt to enable admin");
274                            }
275                        },
276                    ),
277                    CoordinatorRequest::AddNamespace(rpc) => {
278                        rpc.handle(async |(nsid, disk)| {
279                            let running = self.admin.stop().await;
280                            let (admin, state) = self.admin.get_mut();
281                            let r = admin.add_namespace(state, nsid, disk).await;
282                            if running {
283                                self.admin.start();
284                            }
285                            r
286                        })
287                        .await
288                    }
289                    CoordinatorRequest::RemoveNamespace(rpc) => {
290                        rpc.handle(async |nsid| {
291                            let running = self.admin.stop().await;
292                            let (admin, state) = self.admin.get_mut();
293                            let r = admin.remove_namespace(state, nsid).await;
294                            if running {
295                                self.admin.start();
296                            }
297                            r
298                        })
299                        .await
300                    }
301                    CoordinatorRequest::ControllerReset(rpc) => {
302                        assert!(self.reset.is_none());
303                        self.reset = Some(rpc);
304                    }
305                    CoordinatorRequest::Inspect(req) => req.inspect(&self),
306                },
307                Event::Request(None) => break,
308                Event::ResetComplete => {
309                    self.reset.take().unwrap().complete(());
310                }
311            }
312        }
313    }
314}