1use super::spec;
7use crate::NVME_PAGE_SHIFT;
8use crate::Namespace;
9use crate::NamespaceError;
10use crate::NvmeDriverSavedState;
11use crate::RequestError;
12use crate::driver::save_restore::IoQueueSavedState;
13use crate::queue_pair::Issuer;
14use crate::queue_pair::QueuePair;
15use crate::queue_pair::admin_cmd;
16use crate::registers::Bar0;
17use crate::registers::DeviceRegisters;
18use anyhow::Context as _;
19use futures::StreamExt;
20use futures::future::join_all;
21use inspect::Inspect;
22use mesh::payload::Protobuf;
23use mesh::rpc::Rpc;
24use mesh::rpc::RpcSend;
25use pal_async::task::Spawn;
26use pal_async::task::Task;
27use save_restore::NvmeDriverWorkerSavedState;
28use std::sync::Arc;
29use std::sync::OnceLock;
30use task_control::AsyncRun;
31use task_control::InspectTask;
32use task_control::TaskControl;
33use thiserror::Error;
34use tracing::Instrument;
35use tracing::info_span;
36use user_driver::DeviceBacking;
37use user_driver::backoff::Backoff;
38use user_driver::interrupt::DeviceInterrupt;
39use user_driver::memory::MemoryBlock;
40use vmcore::vm_task::VmTaskDriver;
41use vmcore::vm_task::VmTaskDriverSource;
42use zerocopy::FromBytes;
43use zerocopy::FromZeros;
44use zerocopy::IntoBytes;
45
46#[derive(Inspect)]
55pub struct NvmeDriver<T: DeviceBacking> {
56 #[inspect(flatten)]
57 task: Option<TaskControl<DriverWorkerTask<T>, WorkerState>>,
58 device_id: String,
59 identify: Option<Arc<spec::IdentifyController>>,
60 #[inspect(skip)]
61 driver: VmTaskDriver,
62 #[inspect(skip)]
63 admin: Option<Arc<Issuer>>,
64 #[inspect(skip)]
65 io_issuers: Arc<IoIssuers>,
66 #[inspect(skip)]
67 rescan_event: Arc<event_listener::Event>,
68 #[inspect(skip)]
70 namespaces: Vec<Arc<Namespace>>,
71 nvme_keepalive: bool,
73 bounce_buffer: bool,
74}
75
76#[derive(Inspect)]
77struct DriverWorkerTask<T: DeviceBacking> {
78 device: T,
79 #[inspect(skip)]
80 driver: VmTaskDriver,
81 registers: Arc<DeviceRegisters<T>>,
82 admin: Option<QueuePair>,
83 #[inspect(iter_by_index)]
84 io: Vec<IoQueue>,
85 io_issuers: Arc<IoIssuers>,
86 #[inspect(skip)]
87 recv: mesh::Receiver<NvmeWorkerRequest>,
88 bounce_buffer: bool,
89}
90
91#[derive(Inspect)]
92struct WorkerState {
93 max_io_queues: u16,
94 qsize: u16,
95 #[inspect(skip)]
96 async_event_task: Task<()>,
97}
98
99#[derive(Debug, Error)]
101pub enum RestoreError {
102 #[error("invalid data")]
103 InvalidData,
104}
105
106#[derive(Inspect)]
107struct IoQueue {
108 queue: QueuePair,
109 iv: u16,
110 cpu: u32,
111}
112
113impl IoQueue {
114 pub async fn save(&self) -> anyhow::Result<IoQueueSavedState> {
115 Ok(IoQueueSavedState {
116 cpu: self.cpu,
117 iv: self.iv as u32,
118 queue_data: self.queue.save().await?,
119 })
120 }
121
122 pub fn restore(
123 spawner: VmTaskDriver,
124 interrupt: DeviceInterrupt,
125 registers: Arc<DeviceRegisters<impl DeviceBacking>>,
126 mem_block: MemoryBlock,
127 saved_state: &IoQueueSavedState,
128 bounce_buffer: bool,
129 ) -> anyhow::Result<Self> {
130 let IoQueueSavedState {
131 cpu,
132 iv,
133 queue_data,
134 } = saved_state;
135 let queue = QueuePair::restore(
136 spawner,
137 interrupt,
138 registers.clone(),
139 mem_block,
140 queue_data,
141 bounce_buffer,
142 )?;
143
144 Ok(Self {
145 queue,
146 iv: *iv as u16,
147 cpu: *cpu,
148 })
149 }
150}
151
152#[derive(Debug, Inspect)]
153#[inspect(transparent)]
154pub(crate) struct IoIssuers {
155 #[inspect(iter_by_index)]
156 per_cpu: Vec<OnceLock<IoIssuer>>,
157 #[inspect(skip)]
158 send: mesh::Sender<NvmeWorkerRequest>,
159}
160
161#[derive(Debug, Clone, Inspect)]
162struct IoIssuer {
163 #[inspect(flatten)]
164 issuer: Arc<Issuer>,
165 cpu: u32,
166}
167
168#[derive(Debug)]
169enum NvmeWorkerRequest {
170 CreateIssuer(Rpc<u32, ()>),
171 Save(Rpc<(), anyhow::Result<NvmeDriverWorkerSavedState>>),
173}
174
175impl<T: DeviceBacking> NvmeDriver<T> {
176 pub async fn new(
178 driver_source: &VmTaskDriverSource,
179 cpu_count: u32,
180 device: T,
181 bounce_buffer: bool,
182 ) -> anyhow::Result<Self> {
183 let pci_id = device.id().to_owned();
184 let mut this = Self::new_disabled(driver_source, cpu_count, device, bounce_buffer)
185 .instrument(tracing::info_span!("nvme_new_disabled", pci_id))
186 .await?;
187 match this
188 .enable(cpu_count as u16)
189 .instrument(tracing::info_span!("nvme_enable", pci_id))
190 .await
191 {
192 Ok(()) => Ok(this),
193 Err(err) => {
194 tracing::error!(
195 error = err.as_ref() as &dyn std::error::Error,
196 "device initialization failed, shutting down"
197 );
198 this.shutdown().await;
199 Err(err)
200 }
201 }
202 }
203
204 async fn new_disabled(
207 driver_source: &VmTaskDriverSource,
208 cpu_count: u32,
209 mut device: T,
210 bounce_buffer: bool,
211 ) -> anyhow::Result<Self> {
212 let driver = driver_source.simple();
213 let bar0 = Bar0(
214 device
215 .map_bar(0)
216 .context("failed to map device registers")?,
217 );
218
219 let cc = bar0.cc();
220 if cc.en() || bar0.csts().rdy() {
221 if let Err(e) = bar0
222 .reset(&driver)
223 .instrument(tracing::info_span!(
224 "nvme_already_enabled",
225 pci_id = device.id().to_owned()
226 ))
227 .await
228 {
229 anyhow::bail!("device is gone, csts: {:#x}", e);
230 }
231 }
232
233 let registers = Arc::new(DeviceRegisters::new(bar0));
234 let cap = registers.cap;
235
236 if cap.mpsmin() != 0 {
237 anyhow::bail!(
238 "unsupported minimum page size: {}",
239 cap.mpsmin() + NVME_PAGE_SHIFT
240 );
241 }
242
243 let (send, recv) = mesh::channel();
244 let io_issuers = Arc::new(IoIssuers {
245 per_cpu: (0..cpu_count).map(|_| OnceLock::new()).collect(),
246 send,
247 });
248
249 Ok(Self {
250 device_id: device.id().to_owned(),
251 task: Some(TaskControl::new(DriverWorkerTask {
252 device,
253 driver: driver.clone(),
254 registers,
255 admin: None,
256 io: Vec::new(),
257 io_issuers: io_issuers.clone(),
258 recv,
259 bounce_buffer,
260 })),
261 admin: None,
262 identify: None,
263 driver,
264 io_issuers,
265 rescan_event: Default::default(),
266 namespaces: vec![],
267 nvme_keepalive: false,
268 bounce_buffer,
269 })
270 }
271
272 async fn enable(&mut self, requested_io_queue_count: u16) -> anyhow::Result<()> {
274 const ADMIN_QID: u16 = 0;
275
276 let task = &mut self.task.as_mut().unwrap();
277 let worker = task.task_mut();
278
279 let admin_len = std::cmp::min(QueuePair::MAX_SQ_ENTRIES, QueuePair::MAX_CQ_ENTRIES);
284 let admin_sqes = admin_len;
285 let admin_cqes = admin_len;
286
287 let interrupt0 = worker
288 .device
289 .map_interrupt(0, 0)
290 .context("failed to map interrupt 0")?;
291
292 let admin = QueuePair::new(
294 self.driver.clone(),
295 &worker.device,
296 ADMIN_QID,
297 admin_sqes,
298 admin_cqes,
299 interrupt0,
300 worker.registers.clone(),
301 self.bounce_buffer,
302 )
303 .context("failed to create admin queue pair")?;
304
305 let admin = worker.admin.insert(admin);
306
307 worker.registers.bar0.set_aqa(
309 spec::Aqa::new()
310 .with_acqs_z(admin_cqes - 1)
311 .with_asqs_z(admin_sqes - 1),
312 );
313 worker.registers.bar0.set_asq(admin.sq_addr());
314 worker.registers.bar0.set_acq(admin.cq_addr());
315
316 let span = tracing::info_span!("nvme_ctrl_enable", pci_id = worker.device.id().to_owned());
318 let ctrl_enable_span = span.enter();
319 worker.registers.bar0.set_cc(
320 spec::Cc::new()
321 .with_iocqes(4)
322 .with_iosqes(6)
323 .with_en(true)
324 .with_mps(0),
325 );
326
327 let mut backoff = Backoff::new(&self.driver);
329 loop {
330 let csts = worker.registers.bar0.csts();
331 let csts_val: u32 = csts.into();
332 if csts_val == !0 {
333 anyhow::bail!("device is gone, csts: {:#x}", csts_val);
334 }
335 if csts.cfs() {
336 let after_reset = if let Err(e) = worker.registers.bar0.reset(&self.driver).await {
338 e
339 } else {
340 0
341 };
342 anyhow::bail!(
343 "device had fatal error, csts: {:#x}, after reset: {:#}",
344 csts_val,
345 after_reset
346 );
347 }
348 if csts.rdy() {
349 break;
350 }
351 backoff.back_off().await;
352 }
353 drop(ctrl_enable_span);
354
355 let identify = self
357 .identify
358 .insert(Arc::new(spec::IdentifyController::new_zeroed()));
359
360 admin
361 .issuer()
362 .issue_out(
363 spec::Command {
364 cdw10: spec::Cdw10Identify::new()
365 .with_cns(spec::Cns::CONTROLLER.0)
366 .into(),
367 ..admin_cmd(spec::AdminOpcode::IDENTIFY)
368 },
369 Arc::get_mut(identify).unwrap().as_mut_bytes(),
370 )
371 .await
372 .context("failed to identify controller")?;
373
374 let max_interrupt_count = worker.device.max_interrupt_count();
378 if max_interrupt_count == 0 {
379 anyhow::bail!("bad device behavior: max_interrupt_count == 0");
380 }
381
382 let requested_io_queue_count = if max_interrupt_count < requested_io_queue_count as u32 {
383 tracing::warn!(
384 max_interrupt_count,
385 requested_io_queue_count,
386 "queue count constrained by msi count"
387 );
388 max_interrupt_count as u16
389 } else {
390 requested_io_queue_count
391 };
392
393 let completion = admin
394 .issuer()
395 .issue_neither(spec::Command {
396 cdw10: spec::Cdw10SetFeatures::new()
397 .with_fid(spec::Feature::NUMBER_OF_QUEUES.0)
398 .into(),
399 cdw11: spec::Cdw11FeatureNumberOfQueues::new()
400 .with_ncq_z(requested_io_queue_count - 1)
401 .with_nsq_z(requested_io_queue_count - 1)
402 .into(),
403 ..admin_cmd(spec::AdminOpcode::SET_FEATURES)
404 })
405 .await
406 .context("failed to set number of queues")?;
407
408 let dw0 = spec::Cdw11FeatureNumberOfQueues::from(completion.dw0);
410 let sq_count = dw0.nsq_z() + 1;
411 let cq_count = dw0.ncq_z() + 1;
412 let allocated_io_queue_count = sq_count.min(cq_count);
413 if allocated_io_queue_count < requested_io_queue_count {
414 tracing::warn!(
415 sq_count,
416 cq_count,
417 requested_io_queue_count,
418 "queue count constrained by hardware queue count"
419 );
420 }
421
422 let max_io_queues = allocated_io_queue_count.min(requested_io_queue_count);
423
424 let qsize = {
425 if worker.registers.cap.mqes_z() < 1 {
426 anyhow::bail!("bad device behavior. mqes cannot be 0");
427 }
428
429 let io_cqsize = (QueuePair::MAX_CQ_ENTRIES - 1).min(worker.registers.cap.mqes_z()) + 1;
430 let io_sqsize = (QueuePair::MAX_SQ_ENTRIES - 1).min(worker.registers.cap.mqes_z()) + 1;
431
432 io_cqsize.min(io_sqsize)
434 };
435
436 let async_event_task = self.driver.spawn("nvme_async_event", {
438 let admin = admin.issuer().clone();
439 let rescan_event = self.rescan_event.clone();
440 async move {
441 if let Err(err) = handle_asynchronous_events(&admin, &rescan_event).await {
442 tracing::error!(
443 error = err.as_ref() as &dyn std::error::Error,
444 "asynchronous event failure, not processing any more"
445 );
446 }
447 }
448 });
449
450 let mut state = WorkerState {
451 qsize,
452 async_event_task,
453 max_io_queues,
454 };
455
456 self.admin = Some(admin.issuer().clone());
457
458 let issuer = worker
461 .create_io_queue(&mut state, 0)
462 .await
463 .context("failed to create io queue 1")?;
464
465 self.io_issuers.per_cpu[0].set(issuer).unwrap();
466 task.insert(&self.driver, "nvme_worker", state);
467 task.start();
468 Ok(())
469 }
470
471 pub async fn shutdown(mut self) {
473 if self.nvme_keepalive {
476 return;
477 }
478 self.reset().await;
479 drop(self);
480 }
481
482 fn reset(&mut self) -> impl Send + std::future::Future<Output = ()> + use<T> {
483 let driver = self.driver.clone();
484 let mut task = std::mem::take(&mut self.task).unwrap();
485 async move {
486 task.stop().await;
487 let (worker, state) = task.into_inner();
488 if let Some(state) = state {
489 state.async_event_task.cancel().await;
490 }
491 let _io_responses = join_all(worker.io.into_iter().map(|io| io.queue.shutdown())).await;
494 let _admin_responses;
495 if let Some(admin) = worker.admin {
496 _admin_responses = admin.shutdown().await;
497 }
498 if let Err(e) = worker.registers.bar0.reset(&driver).await {
499 tracing::info!(csts = e, "device reset failed");
500 }
501 }
502 }
503
504 pub async fn namespace(&self, nsid: u32) -> Result<Namespace, NamespaceError> {
506 Namespace::new(
507 &self.driver,
508 self.admin.as_ref().unwrap().clone(),
509 self.rescan_event.clone(),
510 self.identify.clone().unwrap(),
511 &self.io_issuers,
512 nsid,
513 )
514 .await
515 }
516
517 pub fn fallback_cpu_count(&self) -> usize {
520 self.io_issuers
521 .per_cpu
522 .iter()
523 .enumerate()
524 .filter(|&(cpu, c)| c.get().is_some_and(|c| c.cpu != cpu as u32))
525 .count()
526 }
527
528 pub async fn save(&mut self) -> anyhow::Result<NvmeDriverSavedState> {
530 if self.identify.is_none() {
532 return Err(save_restore::Error::InvalidState.into());
533 }
534 self.nvme_keepalive = true;
535 match self
536 .io_issuers
537 .send
538 .call(NvmeWorkerRequest::Save, ())
539 .await?
540 {
541 Ok(s) => {
542 Ok(NvmeDriverSavedState {
550 identify_ctrl: spec::IdentifyController::read_from_bytes(
551 self.identify.as_ref().unwrap().as_bytes(),
552 )
553 .unwrap(),
554 device_id: self.device_id.clone(),
555 namespaces: vec![],
557 worker_data: s,
558 })
559 }
560 Err(e) => Err(e),
561 }
562 }
563
564 pub async fn restore(
566 driver_source: &VmTaskDriverSource,
567 cpu_count: u32,
568 mut device: T,
569 saved_state: &NvmeDriverSavedState,
570 bounce_buffer: bool,
571 ) -> anyhow::Result<Self> {
572 let driver = driver_source.simple();
573 let bar0_mapping = device
574 .map_bar(0)
575 .context("failed to map device registers")?;
576 let bar0 = Bar0(bar0_mapping);
577
578 let csts = bar0.csts();
580 if !csts.rdy() {
581 anyhow::bail!(
582 "device is not ready during restore, csts: {:#x}",
583 u32::from(csts)
584 );
585 }
586
587 let registers = Arc::new(DeviceRegisters::new(bar0));
588
589 let (send, recv) = mesh::channel();
590 let io_issuers = Arc::new(IoIssuers {
591 per_cpu: (0..cpu_count).map(|_| OnceLock::new()).collect(),
592 send,
593 });
594
595 let mut this = Self {
596 device_id: device.id().to_owned(),
597 task: Some(TaskControl::new(DriverWorkerTask {
598 device,
599 driver: driver.clone(),
600 registers: registers.clone(),
601 admin: None, io: Vec::new(),
603 io_issuers: io_issuers.clone(),
604 recv,
605 bounce_buffer,
606 })),
607 admin: None, identify: Some(Arc::new(
609 spec::IdentifyController::read_from_bytes(saved_state.identify_ctrl.as_bytes())
610 .map_err(|_| RestoreError::InvalidData)?, )),
612 driver: driver.clone(),
613 io_issuers,
614 rescan_event: Default::default(),
615 namespaces: vec![],
616 nvme_keepalive: true,
617 bounce_buffer,
618 };
619
620 let task = &mut this.task.as_mut().unwrap();
621 let worker = task.task_mut();
622
623 let interrupt0 = worker
625 .device
626 .map_interrupt(0, 0)
627 .context("failed to map interrupt 0")?;
628
629 let dma_client = worker.device.dma_client();
630 let restored_memory = dma_client
631 .attach_pending_buffers()
632 .context("failed to restore allocations")?;
633
634 let admin = saved_state
636 .worker_data
637 .admin
638 .as_ref()
639 .map(|a| {
640 let mem_block = restored_memory
642 .iter()
643 .find(|mem| mem.len() == a.mem_len && a.base_pfn == mem.pfns()[0])
644 .expect("unable to find restored mem block")
645 .to_owned();
646 QueuePair::restore(
647 driver.clone(),
648 interrupt0,
649 registers.clone(),
650 mem_block,
651 a,
652 bounce_buffer,
653 )
654 .unwrap()
655 })
656 .unwrap();
657
658 let admin = worker.admin.insert(admin);
659
660 let async_event_task = this.driver.spawn("nvme_async_event", {
662 let admin = admin.issuer().clone();
663 let rescan_event = this.rescan_event.clone();
664 async move {
665 if let Err(err) = handle_asynchronous_events(&admin, &rescan_event).await {
666 tracing::error!(
667 error = err.as_ref() as &dyn std::error::Error,
668 "asynchronous event failure, not processing any more"
669 );
670 }
671 }
672 });
673
674 let state = WorkerState {
675 qsize: saved_state.worker_data.qsize,
676 async_event_task,
677 max_io_queues: saved_state.worker_data.max_io_queues,
678 };
679
680 this.admin = Some(admin.issuer().clone());
681
682 worker.io = saved_state
685 .worker_data
686 .io
687 .iter()
688 .flat_map(|q| -> Result<IoQueue, anyhow::Error> {
689 let interrupt = worker
690 .device
691 .map_interrupt(q.iv, q.cpu)
692 .context("failed to map interrupt")?;
693 let mem_block = restored_memory
694 .iter()
695 .find(|mem| {
696 mem.len() == q.queue_data.mem_len && q.queue_data.base_pfn == mem.pfns()[0]
697 })
698 .expect("unable to find restored mem block")
699 .to_owned();
700 let q = IoQueue::restore(
701 driver.clone(),
702 interrupt,
703 registers.clone(),
704 mem_block,
705 q,
706 bounce_buffer,
707 )?;
708 let issuer = IoIssuer {
709 issuer: q.queue.issuer().clone(),
710 cpu: q.cpu,
711 };
712 this.io_issuers.per_cpu[q.cpu as usize].set(issuer).unwrap();
713 Ok(q)
714 })
715 .collect();
716
717 for ns in &saved_state.namespaces {
719 this.namespaces.push(Arc::new(Namespace::restore(
723 &driver,
724 admin.issuer().clone(),
725 this.rescan_event.clone(),
726 this.identify.clone().unwrap(),
727 &this.io_issuers,
728 ns,
729 )?));
730 }
731
732 task.insert(&this.driver, "nvme_worker", state);
733 task.start();
734
735 Ok(this)
736 }
737
738 pub fn update_servicing_flags(&mut self, nvme_keepalive: bool) {
740 self.nvme_keepalive = nvme_keepalive;
741 }
742}
743
744async fn handle_asynchronous_events(
745 admin: &Issuer,
746 rescan_event: &event_listener::Event,
747) -> anyhow::Result<()> {
748 loop {
749 let completion = admin
750 .issue_neither(admin_cmd(spec::AdminOpcode::ASYNCHRONOUS_EVENT_REQUEST))
751 .await
752 .context("asynchronous event request failed")?;
753
754 let dw0 = spec::AsynchronousEventRequestDw0::from(completion.dw0);
755 match spec::AsynchronousEventType(dw0.event_type()) {
756 spec::AsynchronousEventType::NOTICE => {
757 tracing::info!("namespace attribute change event");
758
759 let mut list = [0u32; 1024];
761 admin
762 .issue_out(
763 spec::Command {
764 cdw10: spec::Cdw10GetLogPage::new()
765 .with_lid(spec::LogPageIdentifier::CHANGED_NAMESPACE_LIST.0)
766 .with_numdl_z(1023)
767 .into(),
768 ..admin_cmd(spec::AdminOpcode::GET_LOG_PAGE)
769 },
770 list.as_mut_bytes(),
771 )
772 .await
773 .context("failed to query changed namespace list")?;
774
775 if list[0] != 0 {
776 rescan_event.notify(usize::MAX);
778 }
779 }
780 event_type => {
781 tracing::info!(
782 ?event_type,
783 information = dw0.information(),
784 log_page_identifier = dw0.log_page_identifier(),
785 "unhandled asynchronous event"
786 );
787 }
788 }
789 }
790}
791
792impl<T: DeviceBacking> Drop for NvmeDriver<T> {
793 fn drop(&mut self) {
794 if self.task.is_some() {
795 if !self.nvme_keepalive {
797 let reset = self.reset();
800 self.driver.spawn("nvme_drop", reset).detach();
801 }
802 }
803 }
804}
805
806impl IoIssuers {
807 pub async fn get(&self, cpu: u32) -> Result<&Issuer, RequestError> {
808 if let Some(v) = self.per_cpu[cpu as usize].get() {
809 return Ok(&v.issuer);
810 }
811
812 self.send
813 .call(NvmeWorkerRequest::CreateIssuer, cpu)
814 .await
815 .map_err(RequestError::Gone)?;
816
817 Ok(self.per_cpu[cpu as usize]
818 .get()
819 .expect("issuer was set by rpc")
820 .issuer
821 .as_ref())
822 }
823}
824
825impl<T: DeviceBacking> AsyncRun<WorkerState> for DriverWorkerTask<T> {
826 async fn run(
827 &mut self,
828 stop: &mut task_control::StopTask<'_>,
829 state: &mut WorkerState,
830 ) -> Result<(), task_control::Cancelled> {
831 stop.until_stopped(async {
832 loop {
833 match self.recv.next().await {
834 Some(NvmeWorkerRequest::CreateIssuer(rpc)) => {
835 rpc.handle(async |cpu| self.create_io_issuer(state, cpu).await)
836 .await
837 }
838 Some(NvmeWorkerRequest::Save(rpc)) => {
839 rpc.handle(async |_| self.save(state).await).await
840 }
841 None => break,
842 }
843 }
844 })
845 .await
846 }
847}
848
849impl<T: DeviceBacking> DriverWorkerTask<T> {
850 async fn create_io_issuer(&mut self, state: &mut WorkerState, cpu: u32) {
851 tracing::debug!(cpu, "issuer request");
852 if self.io_issuers.per_cpu[cpu as usize].get().is_some() {
853 return;
854 }
855
856 let issuer = match self
857 .create_io_queue(state, cpu)
858 .instrument(info_span!("create_nvme_io_queue", cpu))
859 .await
860 {
861 Ok(issuer) => issuer,
862 Err(err) => {
863 let (fallback_cpu, fallback) = self.io_issuers.per_cpu[..cpu as usize]
865 .iter()
866 .enumerate()
867 .rev()
868 .find_map(|(i, issuer)| issuer.get().map(|issuer| (i, issuer)))
869 .unwrap();
870
871 tracing::error!(
872 cpu,
873 fallback_cpu,
874 error = err.as_ref() as &dyn std::error::Error,
875 "failed to create io queue, falling back"
876 );
877 fallback.clone()
878 }
879 };
880
881 self.io_issuers.per_cpu[cpu as usize]
882 .set(issuer)
883 .ok()
884 .unwrap();
885 }
886
887 async fn create_io_queue(
888 &mut self,
889 state: &mut WorkerState,
890 cpu: u32,
891 ) -> anyhow::Result<IoIssuer> {
892 if self.io.len() >= state.max_io_queues as usize {
893 anyhow::bail!("no more io queues available");
894 }
895
896 let qid = self.io.len() as u16 + 1;
897
898 tracing::debug!(cpu, qid, "creating io queue");
899
900 let iv = self.io.len() as u16;
902 let interrupt = self
903 .device
904 .map_interrupt(iv.into(), cpu)
905 .context("failed to map interrupt")?;
906
907 let queue = QueuePair::new(
908 self.driver.clone(),
909 &self.device,
910 qid,
911 state.qsize,
912 state.qsize,
913 interrupt,
914 self.registers.clone(),
915 self.bounce_buffer,
916 )
917 .with_context(|| format!("failed to create io queue pair {qid}"))?;
918
919 let io_sq_addr = queue.sq_addr();
920 let io_cq_addr = queue.cq_addr();
921
922 self.io.push(IoQueue { queue, iv, cpu });
925 let io_queue = self.io.last_mut().unwrap();
926
927 let admin = self.admin.as_ref().unwrap().issuer().as_ref();
928
929 let mut created_completion_queue = false;
930 let r = async {
931 admin
932 .issue_raw(spec::Command {
933 cdw10: spec::Cdw10CreateIoQueue::new()
934 .with_qid(qid)
935 .with_qsize_z(state.qsize - 1)
936 .into(),
937 cdw11: spec::Cdw11CreateIoCompletionQueue::new()
938 .with_ien(true)
939 .with_iv(iv)
940 .with_pc(true)
941 .into(),
942 dptr: [io_cq_addr, 0],
943 ..admin_cmd(spec::AdminOpcode::CREATE_IO_COMPLETION_QUEUE)
944 })
945 .await
946 .with_context(|| format!("failed to create io completion queue {qid}"))?;
947
948 created_completion_queue = true;
949
950 admin
951 .issue_raw(spec::Command {
952 cdw10: spec::Cdw10CreateIoQueue::new()
953 .with_qid(qid)
954 .with_qsize_z(state.qsize - 1)
955 .into(),
956 cdw11: spec::Cdw11CreateIoSubmissionQueue::new()
957 .with_cqid(qid)
958 .with_pc(true)
959 .into(),
960 dptr: [io_sq_addr, 0],
961 ..admin_cmd(spec::AdminOpcode::CREATE_IO_SUBMISSION_QUEUE)
962 })
963 .await
964 .with_context(|| format!("failed to create io submission queue {qid}"))?;
965
966 Ok(())
967 };
968
969 if let Err(err) = r.await {
970 if created_completion_queue {
971 if let Err(err) = admin
972 .issue_raw(spec::Command {
973 cdw10: spec::Cdw10DeleteIoQueue::new().with_qid(qid).into(),
974 ..admin_cmd(spec::AdminOpcode::DELETE_IO_COMPLETION_QUEUE)
975 })
976 .await
977 {
978 tracing::error!(
979 error = &err as &dyn std::error::Error,
980 "failed to delete completion queue in teardown path"
981 );
982 }
983 }
984 let io = self.io.pop().unwrap();
985 io.queue.shutdown().await;
986 return Err(err);
987 }
988
989 Ok(IoIssuer {
990 issuer: io_queue.queue.issuer().clone(),
991 cpu,
992 })
993 }
994
995 pub async fn save(
997 &mut self,
998 worker_state: &mut WorkerState,
999 ) -> anyhow::Result<NvmeDriverWorkerSavedState> {
1000 let admin = match self.admin.as_ref() {
1001 Some(a) => Some(a.save().await?),
1002 None => None,
1003 };
1004
1005 let io = join_all(self.io.drain(..).map(async |q| q.save().await))
1006 .await
1007 .into_iter()
1008 .flatten()
1009 .collect();
1010
1011 Ok(NvmeDriverWorkerSavedState {
1012 admin,
1013 io,
1014 qsize: worker_state.qsize,
1015 max_io_queues: worker_state.max_io_queues,
1016 })
1017 }
1018}
1019
1020impl<T: DeviceBacking> InspectTask<WorkerState> for DriverWorkerTask<T> {
1021 fn inspect(&self, req: inspect::Request<'_>, state: Option<&WorkerState>) {
1022 req.respond().merge(self).merge(state);
1023 }
1024}
1025
1026pub mod save_restore {
1027 use super::*;
1028
1029 #[derive(Debug, Error)]
1031 pub enum Error {
1032 #[error("invalid object state")]
1034 InvalidState,
1035 }
1036
1037 #[derive(Protobuf, Clone, Debug)]
1039 #[mesh(package = "nvme_driver")]
1040 pub struct NvmeDriverSavedState {
1041 #[mesh(1, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
1044 pub identify_ctrl: spec::IdentifyController,
1045 #[mesh(2)]
1047 pub device_id: String,
1048 #[mesh(3)]
1050 pub namespaces: Vec<SavedNamespaceData>,
1051 #[mesh(4)]
1053 pub worker_data: NvmeDriverWorkerSavedState,
1054 }
1055
1056 #[derive(Protobuf, Clone, Debug)]
1058 #[mesh(package = "nvme_driver")]
1059 pub struct NvmeDriverWorkerSavedState {
1060 #[mesh(1)]
1062 pub admin: Option<QueuePairSavedState>,
1063 #[mesh(2)]
1065 pub io: Vec<IoQueueSavedState>,
1066 #[mesh(3)]
1068 pub qsize: u16,
1069 #[mesh(4)]
1071 pub max_io_queues: u16,
1072 }
1073
1074 #[derive(Protobuf, Clone, Debug)]
1076 #[mesh(package = "nvme_driver")]
1077 pub struct QueuePairSavedState {
1078 #[mesh(1)]
1080 pub mem_len: usize,
1081 #[mesh(2)]
1083 pub base_pfn: u64,
1084 #[mesh(3)]
1087 pub qid: u16,
1088 #[mesh(4)]
1090 pub sq_entries: u16,
1091 #[mesh(5)]
1093 pub cq_entries: u16,
1094 #[mesh(6)]
1096 pub handler_data: QueueHandlerSavedState,
1097 }
1098
1099 #[derive(Protobuf, Clone, Debug)]
1101 #[mesh(package = "nvme_driver")]
1102 pub struct IoQueueSavedState {
1103 #[mesh(1)]
1104 pub cpu: u32,
1106 #[mesh(2)]
1107 pub iv: u32,
1109 #[mesh(3)]
1110 pub queue_data: QueuePairSavedState,
1111 }
1112
1113 #[derive(Protobuf, Clone, Debug)]
1115 #[mesh(package = "nvme_driver")]
1116 pub struct QueueHandlerSavedState {
1117 #[mesh(1)]
1118 pub sq_state: SubmissionQueueSavedState,
1119 #[mesh(2)]
1120 pub cq_state: CompletionQueueSavedState,
1121 #[mesh(3)]
1122 pub pending_cmds: PendingCommandsSavedState,
1123 }
1124
1125 #[derive(Protobuf, Clone, Debug)]
1126 #[mesh(package = "nvme_driver")]
1127 pub struct SubmissionQueueSavedState {
1128 #[mesh(1)]
1129 pub sqid: u16,
1130 #[mesh(2)]
1131 pub head: u32,
1132 #[mesh(3)]
1133 pub tail: u32,
1134 #[mesh(4)]
1135 pub committed_tail: u32,
1136 #[mesh(5)]
1137 pub len: u32,
1138 }
1139
1140 #[derive(Protobuf, Clone, Debug)]
1141 #[mesh(package = "nvme_driver")]
1142 pub struct CompletionQueueSavedState {
1143 #[mesh(1)]
1144 pub cqid: u16,
1145 #[mesh(2)]
1146 pub head: u32,
1147 #[mesh(3)]
1148 pub committed_head: u32,
1149 #[mesh(4)]
1150 pub len: u32,
1151 #[mesh(5)]
1152 pub phase: bool,
1154 }
1155
1156 #[derive(Protobuf, Clone, Debug)]
1157 #[mesh(package = "nvme_driver")]
1158 pub struct PendingCommandSavedState {
1159 #[mesh(1, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
1160 pub command: spec::Command,
1161 }
1162
1163 #[derive(Protobuf, Clone, Debug)]
1164 #[mesh(package = "nvme_driver")]
1165 pub struct PendingCommandsSavedState {
1166 #[mesh(1)]
1167 pub commands: Vec<PendingCommandSavedState>,
1168 #[mesh(2)]
1169 pub next_cid_high_bits: u16,
1170 #[mesh(3)]
1171 pub cid_key_bits: u32,
1172 }
1173
1174 #[derive(Protobuf, Clone, Debug)]
1176 #[mesh(package = "nvme_driver")]
1177 pub struct SavedNamespaceData {
1178 #[mesh(1)]
1179 pub nsid: u32,
1180 #[mesh(2, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
1181 pub identify_ns: nvme_spec::nvm::IdentifyNamespace,
1182 }
1183}