1use super::IoQueueEntrySizes;
7use super::admin::AdminConfig;
8use super::admin::AdminHandler;
9use super::admin::AdminState;
10use super::admin::NsidConflict;
11use crate::queue::DoorbellMemory;
12use crate::queue::InvalidDoorbell;
13use disk_backend::Disk;
14use futures::FutureExt;
15use futures::StreamExt;
16use futures_concurrency::future::Race;
17use guestmem::GuestMemory;
18use guid::Guid;
19use inspect::Inspect;
20use inspect::InspectMut;
21use mesh::rpc::PendingRpc;
22use mesh::rpc::Rpc;
23use mesh::rpc::RpcSend;
24use pal_async::task::Spawn;
25use pal_async::task::Task;
26use parking_lot::Mutex;
27use parking_lot::RwLock;
28use std::future::pending;
29use std::sync::Arc;
30use task_control::TaskControl;
31use vmcore::interrupt::Interrupt;
32use vmcore::vm_task::VmTaskDriver;
33use vmcore::vm_task::VmTaskDriverSource;
34
35#[derive(InspectMut)]
36pub struct NvmeWorkers {
37 #[inspect(skip)]
38 _task: Task<()>,
39 #[inspect(flatten, send = "CoordinatorRequest::Inspect")]
40 send: mesh::Sender<CoordinatorRequest>,
41 #[inspect(skip)]
42 doorbells: Arc<RwLock<DoorbellMemory>>,
43 #[inspect(skip)]
44 state: EnableState,
45}
46
47#[derive(Debug)]
48enum EnableState {
49 Disabled,
50 Enabling(PendingRpc<()>),
51 Enabled,
52 Resetting(PendingRpc<()>),
53}
54
55impl NvmeWorkers {
56 pub fn new(
57 driver_source: &VmTaskDriverSource,
58 mem: GuestMemory,
59 interrupts: Vec<Interrupt>,
60 max_sqs: u16,
61 max_cqs: u16,
62 qe_sizes: Arc<Mutex<IoQueueEntrySizes>>,
63 subsystem_id: Guid,
64 ) -> Self {
65 let num_qids = 2 + max_sqs.max(max_cqs) * 2;
66 let doorbells = Arc::new(RwLock::new(DoorbellMemory::new(num_qids)));
67 let driver = driver_source.simple();
68 let handler: AdminHandler = AdminHandler::new(
69 driver.clone(),
70 AdminConfig {
71 driver_source: driver_source.clone(),
72 mem,
73 interrupts,
74 doorbells: doorbells.clone(),
75 subsystem_id,
76 max_sqs,
77 max_cqs,
78 qe_sizes,
79 },
80 );
81 let coordinator = Coordinator {
82 driver: driver.clone(),
83 admin: TaskControl::new(handler),
84 reset: None,
85 };
86 let (send, recv) = mesh::mpsc_channel();
87 let task = driver.spawn("nvme-coord", coordinator.run(recv));
88 Self {
89 _task: task,
90 send,
91 doorbells,
92 state: EnableState::Disabled,
93 }
94 }
95
96 pub fn client(&self) -> NvmeControllerClient {
97 NvmeControllerClient {
98 send: self.send.clone(),
99 }
100 }
101
102 pub fn doorbell(&self, db_id: u16, value: u32) {
103 if let Err(InvalidDoorbell) = self.doorbells.read().try_write(db_id, value) {
104 tracelimit::error_ratelimited!(db_id, "write to invalid doorbell index");
105 }
106 }
107
108 pub fn enable(&mut self, asq: u64, asqs: u16, acq: u64, acqs: u16) {
109 if let EnableState::Disabled = self.state {
110 self.state = EnableState::Enabling(self.send.call(
111 CoordinatorRequest::EnableAdmin,
112 EnableAdminParams {
113 asq,
114 asqs,
115 acq,
116 acqs,
117 },
118 ));
119 } else {
120 panic!("not disabled: {:?}", self.state);
121 }
122 }
123
124 pub fn poll_enabled(&mut self) -> bool {
125 if let EnableState::Enabling(recv) = &mut self.state {
126 if recv.now_or_never().is_some() {
127 self.state = EnableState::Enabled;
128 true
129 } else {
130 false
131 }
132 } else {
133 panic!("not enabling: {:?}", self.state)
134 }
135 }
136
137 pub fn controller_reset(&mut self) {
138 if let EnableState::Enabled = self.state {
139 self.state =
140 EnableState::Resetting(self.send.call(CoordinatorRequest::ControllerReset, ()));
141 } else {
142 panic!("not enabled: {:?}", self.state);
143 }
144 }
145
146 pub fn poll_controller_reset(&mut self) -> bool {
147 if let EnableState::Resetting(recv) = &mut self.state {
148 if recv.now_or_never().is_some() {
149 self.state = EnableState::Disabled;
150 true
151 } else {
152 false
153 }
154 } else {
155 panic!("not resetting: {:?}", self.state)
156 }
157 }
158
159 pub async fn reset(&mut self) {
161 loop {
162 match &mut self.state {
163 EnableState::Disabled => break,
164 EnableState::Enabling(recv) => {
165 recv.await.unwrap();
166 self.state = EnableState::Enabled;
167 }
168 EnableState::Enabled => {
169 self.controller_reset();
170 }
171 EnableState::Resetting(recv) => {
172 recv.await.unwrap();
173 self.state = EnableState::Disabled;
174 }
175 }
176 }
177 }
178}
179
180#[derive(Debug)]
182pub struct NvmeControllerClient {
183 send: mesh::Sender<CoordinatorRequest>,
184}
185
186impl NvmeControllerClient {
187 pub async fn add_namespace(&self, nsid: u32, disk: Disk) -> Result<(), NsidConflict> {
189 self.send
190 .call(CoordinatorRequest::AddNamespace, (nsid, disk))
191 .await
192 .unwrap()
193 }
194
195 pub async fn remove_namespace(&self, nsid: u32) -> bool {
197 self.send
198 .call(CoordinatorRequest::RemoveNamespace, nsid)
199 .await
200 .unwrap()
201 }
202}
203
204#[derive(Inspect)]
205struct Coordinator {
206 driver: VmTaskDriver,
207 #[inspect(flatten)]
208 admin: TaskControl<AdminHandler, AdminState>,
209 #[inspect(with = "Option::is_some")]
210 reset: Option<Rpc<(), ()>>,
211}
212
213enum CoordinatorRequest {
214 EnableAdmin(Rpc<EnableAdminParams, ()>),
215 AddNamespace(Rpc<(u32, Disk), Result<(), NsidConflict>>),
216 RemoveNamespace(Rpc<u32, bool>),
217 Inspect(inspect::Deferred),
218 ControllerReset(Rpc<(), ()>),
219}
220
221struct EnableAdminParams {
222 asq: u64,
223 asqs: u16,
224 acq: u64,
225 acqs: u16,
226}
227
228impl Coordinator {
229 async fn run(mut self, mut recv: mesh::Receiver<CoordinatorRequest>) {
230 loop {
231 enum Event {
232 Request(Option<CoordinatorRequest>),
233 ResetComplete,
234 }
235
236 let controller_reset = async {
237 if self.reset.is_some() {
238 self.admin.stop().await;
239 if let Some(state) = self.admin.state_mut() {
240 state.drain().await;
241 self.admin.remove();
242 }
243 } else {
244 pending().await
245 }
246 };
247
248 let event = (
249 recv.next().map(Event::Request),
250 controller_reset.map(|_| Event::ResetComplete),
251 )
252 .race()
253 .await;
254
255 match event {
256 Event::Request(Some(req)) => match req {
257 CoordinatorRequest::EnableAdmin(rpc) => rpc.handle_sync(
258 |EnableAdminParams {
259 asq,
260 asqs,
261 acq,
262 acqs,
263 }| {
264 if !self.admin.has_state() {
265 let state =
266 AdminState::new(self.admin.task(), asq, asqs, acq, acqs);
267 self.admin.insert(&self.driver, "nvme-admin", state);
268 self.admin.start();
269 } else {
270 tracelimit::warn_ratelimited!("duplicate attempt to enable admin");
271 }
272 },
273 ),
274 CoordinatorRequest::AddNamespace(rpc) => {
275 rpc.handle(async |(nsid, disk)| {
276 let running = self.admin.stop().await;
277 let (admin, state) = self.admin.get_mut();
278 let r = admin.add_namespace(state, nsid, disk).await;
279 if running {
280 self.admin.start();
281 }
282 r
283 })
284 .await
285 }
286 CoordinatorRequest::RemoveNamespace(rpc) => {
287 rpc.handle(async |nsid| {
288 let running = self.admin.stop().await;
289 let (admin, state) = self.admin.get_mut();
290 let r = admin.remove_namespace(state, nsid).await;
291 if running {
292 self.admin.start();
293 }
294 r
295 })
296 .await
297 }
298 CoordinatorRequest::ControllerReset(rpc) => {
299 assert!(self.reset.is_none());
300 self.reset = Some(rpc);
301 }
302 CoordinatorRequest::Inspect(req) => req.inspect(&self),
303 },
304 Event::Request(None) => break,
305 Event::ResetComplete => {
306 self.reset.take().unwrap().complete(());
307 }
308 }
309 }
310 }
311}