nvme/workers/
coordinator.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Coordinator between queues and hot add/remove of namespaces.
5
6use super::IoQueueEntrySizes;
7use super::admin::AdminConfig;
8use super::admin::AdminHandler;
9use super::admin::AdminState;
10use super::admin::NsidConflict;
11use crate::queue::DoorbellMemory;
12use crate::queue::InvalidDoorbell;
13use disk_backend::Disk;
14use futures::FutureExt;
15use futures::StreamExt;
16use futures_concurrency::future::Race;
17use guestmem::GuestMemory;
18use guid::Guid;
19use inspect::Inspect;
20use inspect::InspectMut;
21use mesh::rpc::PendingRpc;
22use mesh::rpc::Rpc;
23use mesh::rpc::RpcSend;
24use pal_async::task::Spawn;
25use pal_async::task::Task;
26use parking_lot::Mutex;
27use parking_lot::RwLock;
28use std::future::pending;
29use std::sync::Arc;
30use task_control::TaskControl;
31use vmcore::interrupt::Interrupt;
32use vmcore::vm_task::VmTaskDriver;
33use vmcore::vm_task::VmTaskDriverSource;
34
35#[derive(InspectMut)]
36pub struct NvmeWorkers {
37    #[inspect(skip)]
38    _task: Task<()>,
39    #[inspect(flatten, send = "CoordinatorRequest::Inspect")]
40    send: mesh::Sender<CoordinatorRequest>,
41    #[inspect(skip)]
42    doorbells: Arc<RwLock<DoorbellMemory>>,
43    #[inspect(skip)]
44    state: EnableState,
45}
46
47#[derive(Debug)]
48enum EnableState {
49    Disabled,
50    Enabling(PendingRpc<()>),
51    Enabled,
52    Resetting(PendingRpc<()>),
53}
54
55impl NvmeWorkers {
56    pub fn new(
57        driver_source: &VmTaskDriverSource,
58        mem: GuestMemory,
59        interrupts: Vec<Interrupt>,
60        max_sqs: u16,
61        max_cqs: u16,
62        qe_sizes: Arc<Mutex<IoQueueEntrySizes>>,
63        subsystem_id: Guid,
64    ) -> Self {
65        let num_qids = 2 + max_sqs.max(max_cqs) * 2;
66        let doorbells = Arc::new(RwLock::new(DoorbellMemory::new(num_qids)));
67        let driver = driver_source.simple();
68        let handler: AdminHandler = AdminHandler::new(
69            driver.clone(),
70            AdminConfig {
71                driver_source: driver_source.clone(),
72                mem,
73                interrupts,
74                doorbells: doorbells.clone(),
75                subsystem_id,
76                max_sqs,
77                max_cqs,
78                qe_sizes,
79            },
80        );
81        let coordinator = Coordinator {
82            driver: driver.clone(),
83            admin: TaskControl::new(handler),
84            reset: None,
85        };
86        let (send, recv) = mesh::mpsc_channel();
87        let task = driver.spawn("nvme-coord", coordinator.run(recv));
88        Self {
89            _task: task,
90            send,
91            doorbells,
92            state: EnableState::Disabled,
93        }
94    }
95
96    pub fn client(&self) -> NvmeControllerClient {
97        NvmeControllerClient {
98            send: self.send.clone(),
99        }
100    }
101
102    pub fn doorbell(&self, db_id: u16, value: u32) {
103        if let Err(InvalidDoorbell) = self.doorbells.read().try_write(db_id, value) {
104            tracelimit::error_ratelimited!(db_id, "write to invalid doorbell index");
105        }
106    }
107
108    pub fn enable(&mut self, asq: u64, asqs: u16, acq: u64, acqs: u16) {
109        if let EnableState::Disabled = self.state {
110            self.state = EnableState::Enabling(self.send.call(
111                CoordinatorRequest::EnableAdmin,
112                EnableAdminParams {
113                    asq,
114                    asqs,
115                    acq,
116                    acqs,
117                },
118            ));
119        } else {
120            panic!("not disabled: {:?}", self.state);
121        }
122    }
123
124    pub fn poll_enabled(&mut self) -> bool {
125        if let EnableState::Enabling(recv) = &mut self.state {
126            if recv.now_or_never().is_some() {
127                self.state = EnableState::Enabled;
128                true
129            } else {
130                false
131            }
132        } else {
133            panic!("not enabling: {:?}", self.state)
134        }
135    }
136
137    pub fn controller_reset(&mut self) {
138        if let EnableState::Enabled = self.state {
139            self.state =
140                EnableState::Resetting(self.send.call(CoordinatorRequest::ControllerReset, ()));
141        } else {
142            panic!("not enabled: {:?}", self.state);
143        }
144    }
145
146    pub fn poll_controller_reset(&mut self) -> bool {
147        if let EnableState::Resetting(recv) = &mut self.state {
148            if recv.now_or_never().is_some() {
149                self.state = EnableState::Disabled;
150                true
151            } else {
152                false
153            }
154        } else {
155            panic!("not resetting: {:?}", self.state)
156        }
157    }
158
159    // Reset the workers from whatever state they are in.
160    pub async fn reset(&mut self) {
161        loop {
162            match &mut self.state {
163                EnableState::Disabled => break,
164                EnableState::Enabling(recv) => {
165                    recv.await.unwrap();
166                    self.state = EnableState::Enabled;
167                }
168                EnableState::Enabled => {
169                    self.controller_reset();
170                }
171                EnableState::Resetting(recv) => {
172                    recv.await.unwrap();
173                    self.state = EnableState::Disabled;
174                }
175            }
176        }
177    }
178}
179
180/// Client for modifying the NVMe controller state at runtime.
181#[derive(Debug)]
182pub struct NvmeControllerClient {
183    send: mesh::Sender<CoordinatorRequest>,
184}
185
186impl NvmeControllerClient {
187    /// Adds a namespace.
188    pub async fn add_namespace(&self, nsid: u32, disk: Disk) -> Result<(), NsidConflict> {
189        self.send
190            .call(CoordinatorRequest::AddNamespace, (nsid, disk))
191            .await
192            .unwrap()
193    }
194
195    /// Removes a namespace.
196    pub async fn remove_namespace(&self, nsid: u32) -> bool {
197        self.send
198            .call(CoordinatorRequest::RemoveNamespace, nsid)
199            .await
200            .unwrap()
201    }
202}
203
204#[derive(Inspect)]
205struct Coordinator {
206    driver: VmTaskDriver,
207    #[inspect(flatten)]
208    admin: TaskControl<AdminHandler, AdminState>,
209    #[inspect(with = "Option::is_some")]
210    reset: Option<Rpc<(), ()>>,
211}
212
213enum CoordinatorRequest {
214    EnableAdmin(Rpc<EnableAdminParams, ()>),
215    AddNamespace(Rpc<(u32, Disk), Result<(), NsidConflict>>),
216    RemoveNamespace(Rpc<u32, bool>),
217    Inspect(inspect::Deferred),
218    ControllerReset(Rpc<(), ()>),
219}
220
221struct EnableAdminParams {
222    asq: u64,
223    asqs: u16,
224    acq: u64,
225    acqs: u16,
226}
227
228impl Coordinator {
229    async fn run(mut self, mut recv: mesh::Receiver<CoordinatorRequest>) {
230        loop {
231            enum Event {
232                Request(Option<CoordinatorRequest>),
233                ResetComplete,
234            }
235
236            let controller_reset = async {
237                if self.reset.is_some() {
238                    self.admin.stop().await;
239                    if let Some(state) = self.admin.state_mut() {
240                        state.drain().await;
241                        self.admin.remove();
242                    }
243                } else {
244                    pending().await
245                }
246            };
247
248            let event = (
249                recv.next().map(Event::Request),
250                controller_reset.map(|_| Event::ResetComplete),
251            )
252                .race()
253                .await;
254
255            match event {
256                Event::Request(Some(req)) => match req {
257                    CoordinatorRequest::EnableAdmin(rpc) => rpc.handle_sync(
258                        |EnableAdminParams {
259                             asq,
260                             asqs,
261                             acq,
262                             acqs,
263                         }| {
264                            if !self.admin.has_state() {
265                                let state =
266                                    AdminState::new(self.admin.task(), asq, asqs, acq, acqs);
267                                self.admin.insert(&self.driver, "nvme-admin", state);
268                                self.admin.start();
269                            } else {
270                                tracelimit::warn_ratelimited!("duplicate attempt to enable admin");
271                            }
272                        },
273                    ),
274                    CoordinatorRequest::AddNamespace(rpc) => {
275                        rpc.handle(async |(nsid, disk)| {
276                            let running = self.admin.stop().await;
277                            let (admin, state) = self.admin.get_mut();
278                            let r = admin.add_namespace(state, nsid, disk).await;
279                            if running {
280                                self.admin.start();
281                            }
282                            r
283                        })
284                        .await
285                    }
286                    CoordinatorRequest::RemoveNamespace(rpc) => {
287                        rpc.handle(async |nsid| {
288                            let running = self.admin.stop().await;
289                            let (admin, state) = self.admin.get_mut();
290                            let r = admin.remove_namespace(state, nsid).await;
291                            if running {
292                                self.admin.start();
293                            }
294                            r
295                        })
296                        .await
297                    }
298                    CoordinatorRequest::ControllerReset(rpc) => {
299                        assert!(self.reset.is_none());
300                        self.reset = Some(rpc);
301                    }
302                    CoordinatorRequest::Inspect(req) => req.inspect(&self),
303                },
304                Event::Request(None) => break,
305                Event::ResetComplete => {
306                    self.reset.take().unwrap().complete(());
307                }
308            }
309        }
310    }
311}