1use super::spec;
7use crate::NVME_PAGE_SHIFT;
8use crate::Namespace;
9use crate::NamespaceError;
10use crate::NvmeDriverSavedState;
11use crate::RequestError;
12use crate::driver::save_restore::IoQueueSavedState;
13use crate::queue_pair::Issuer;
14use crate::queue_pair::QueuePair;
15use crate::queue_pair::admin_cmd;
16use crate::registers::Bar0;
17use crate::registers::DeviceRegisters;
18use anyhow::Context as _;
19use futures::StreamExt;
20use futures::future::join_all;
21use inspect::Inspect;
22use mesh::payload::Protobuf;
23use mesh::rpc::Rpc;
24use mesh::rpc::RpcSend;
25use pal_async::task::Spawn;
26use pal_async::task::Task;
27use save_restore::NvmeDriverWorkerSavedState;
28use std::sync::Arc;
29use std::sync::OnceLock;
30use task_control::AsyncRun;
31use task_control::InspectTask;
32use task_control::TaskControl;
33use thiserror::Error;
34use tracing::Instrument;
35use tracing::info_span;
36use user_driver::DeviceBacking;
37use user_driver::backoff::Backoff;
38use user_driver::interrupt::DeviceInterrupt;
39use user_driver::memory::MemoryBlock;
40use vmcore::vm_task::VmTaskDriver;
41use vmcore::vm_task::VmTaskDriverSource;
42use zerocopy::FromBytes;
43use zerocopy::FromZeros;
44use zerocopy::IntoBytes;
45
46#[derive(Inspect)]
55pub struct NvmeDriver<T: DeviceBacking> {
56 #[inspect(flatten)]
57 task: Option<TaskControl<DriverWorkerTask<T>, WorkerState>>,
58 device_id: String,
59 identify: Option<Arc<spec::IdentifyController>>,
60 #[inspect(skip)]
61 driver: VmTaskDriver,
62 #[inspect(skip)]
63 admin: Option<Arc<Issuer>>,
64 #[inspect(skip)]
65 io_issuers: Arc<IoIssuers>,
66 #[inspect(skip)]
67 rescan_event: Arc<event_listener::Event>,
68 #[inspect(skip)]
70 namespaces: Vec<Arc<Namespace>>,
71 nvme_keepalive: bool,
73}
74
75#[derive(Inspect)]
76struct DriverWorkerTask<T: DeviceBacking> {
77 device: T,
78 #[inspect(skip)]
79 driver: VmTaskDriver,
80 registers: Arc<DeviceRegisters<T>>,
81 admin: Option<QueuePair>,
82 #[inspect(iter_by_index)]
83 io: Vec<IoQueue>,
84 io_issuers: Arc<IoIssuers>,
85 #[inspect(skip)]
86 recv: mesh::Receiver<NvmeWorkerRequest>,
87}
88
89#[derive(Inspect)]
90struct WorkerState {
91 max_io_queues: u16,
92 qsize: u16,
93 #[inspect(skip)]
94 async_event_task: Task<()>,
95}
96
97#[derive(Debug, Error)]
99pub enum RestoreError {
100 #[error("invalid data")]
101 InvalidData,
102}
103
104#[derive(Inspect)]
105struct IoQueue {
106 queue: QueuePair,
107 iv: u16,
108 cpu: u32,
109}
110
111impl IoQueue {
112 pub async fn save(&self) -> anyhow::Result<IoQueueSavedState> {
113 Ok(IoQueueSavedState {
114 cpu: self.cpu,
115 iv: self.iv as u32,
116 queue_data: self.queue.save().await?,
117 })
118 }
119
120 pub fn restore(
121 spawner: VmTaskDriver,
122 interrupt: DeviceInterrupt,
123 registers: Arc<DeviceRegisters<impl DeviceBacking>>,
124 mem_block: MemoryBlock,
125 saved_state: &IoQueueSavedState,
126 ) -> anyhow::Result<Self> {
127 let IoQueueSavedState {
128 cpu,
129 iv,
130 queue_data,
131 } = saved_state;
132 let queue =
133 QueuePair::restore(spawner, interrupt, registers.clone(), mem_block, queue_data)?;
134
135 Ok(Self {
136 queue,
137 iv: *iv as u16,
138 cpu: *cpu,
139 })
140 }
141}
142
143#[derive(Debug, Inspect)]
144#[inspect(transparent)]
145pub(crate) struct IoIssuers {
146 #[inspect(iter_by_index)]
147 per_cpu: Vec<OnceLock<IoIssuer>>,
148 #[inspect(skip)]
149 send: mesh::Sender<NvmeWorkerRequest>,
150}
151
152#[derive(Debug, Clone, Inspect)]
153struct IoIssuer {
154 #[inspect(flatten)]
155 issuer: Arc<Issuer>,
156 cpu: u32,
157}
158
159#[derive(Debug)]
160enum NvmeWorkerRequest {
161 CreateIssuer(Rpc<u32, ()>),
162 Save(Rpc<(), anyhow::Result<NvmeDriverWorkerSavedState>>),
164}
165
166impl<T: DeviceBacking> NvmeDriver<T> {
167 pub async fn new(
169 driver_source: &VmTaskDriverSource,
170 cpu_count: u32,
171 device: T,
172 ) -> anyhow::Result<Self> {
173 let pci_id = device.id().to_owned();
174 let mut this = Self::new_disabled(driver_source, cpu_count, device)
175 .instrument(tracing::info_span!("nvme_new_disabled", pci_id))
176 .await?;
177 match this
178 .enable(cpu_count as u16)
179 .instrument(tracing::info_span!("nvme_enable", pci_id))
180 .await
181 {
182 Ok(()) => Ok(this),
183 Err(err) => {
184 tracing::error!(
185 error = err.as_ref() as &dyn std::error::Error,
186 "device initialization failed, shutting down"
187 );
188 this.shutdown().await;
189 Err(err)
190 }
191 }
192 }
193
194 async fn new_disabled(
197 driver_source: &VmTaskDriverSource,
198 cpu_count: u32,
199 mut device: T,
200 ) -> anyhow::Result<Self> {
201 let driver = driver_source.simple();
202 let bar0 = Bar0(
203 device
204 .map_bar(0)
205 .context("failed to map device registers")?,
206 );
207
208 let cc = bar0.cc();
209 if cc.en() || bar0.csts().rdy() {
210 if !bar0
211 .reset(&driver)
212 .instrument(tracing::info_span!(
213 "nvme_already_enabled",
214 pci_id = device.id().to_owned()
215 ))
216 .await
217 {
218 anyhow::bail!("device is gone");
219 }
220 }
221
222 let registers = Arc::new(DeviceRegisters::new(bar0));
223 let cap = registers.cap;
224
225 if cap.mpsmin() != 0 {
226 anyhow::bail!(
227 "unsupported minimum page size: {}",
228 cap.mpsmin() + NVME_PAGE_SHIFT
229 );
230 }
231
232 let (send, recv) = mesh::channel();
233 let io_issuers = Arc::new(IoIssuers {
234 per_cpu: (0..cpu_count).map(|_| OnceLock::new()).collect(),
235 send,
236 });
237
238 Ok(Self {
239 device_id: device.id().to_owned(),
240 task: Some(TaskControl::new(DriverWorkerTask {
241 device,
242 driver: driver.clone(),
243 registers,
244 admin: None,
245 io: Vec::new(),
246 io_issuers: io_issuers.clone(),
247 recv,
248 })),
249 admin: None,
250 identify: None,
251 driver,
252 io_issuers,
253 rescan_event: Default::default(),
254 namespaces: vec![],
255 nvme_keepalive: false,
256 })
257 }
258
259 async fn enable(&mut self, requested_io_queue_count: u16) -> anyhow::Result<()> {
261 const ADMIN_QID: u16 = 0;
262
263 let task = &mut self.task.as_mut().unwrap();
264 let worker = task.task_mut();
265
266 let admin_len = std::cmp::min(QueuePair::MAX_SQ_ENTRIES, QueuePair::MAX_CQ_ENTRIES);
271 let admin_sqes = admin_len;
272 let admin_cqes = admin_len;
273
274 let interrupt0 = worker
275 .device
276 .map_interrupt(0, 0)
277 .context("failed to map interrupt 0")?;
278
279 let admin = QueuePair::new(
281 self.driver.clone(),
282 &worker.device,
283 ADMIN_QID,
284 admin_sqes,
285 admin_cqes,
286 interrupt0,
287 worker.registers.clone(),
288 )
289 .context("failed to create admin queue pair")?;
290
291 let admin = worker.admin.insert(admin);
292
293 worker.registers.bar0.set_aqa(
295 spec::Aqa::new()
296 .with_acqs_z(admin_cqes - 1)
297 .with_asqs_z(admin_sqes - 1),
298 );
299 worker.registers.bar0.set_asq(admin.sq_addr());
300 worker.registers.bar0.set_acq(admin.cq_addr());
301
302 let span = tracing::info_span!("nvme_ctrl_enable", pci_id = worker.device.id().to_owned());
304 let ctrl_enable_span = span.enter();
305 worker.registers.bar0.set_cc(
306 spec::Cc::new()
307 .with_iocqes(4)
308 .with_iosqes(6)
309 .with_en(true)
310 .with_mps(0),
311 );
312
313 let mut backoff = Backoff::new(&self.driver);
315 loop {
316 let csts = worker.registers.bar0.csts();
317 if u32::from(csts) == !0 {
318 anyhow::bail!("device is gone");
319 }
320 if csts.cfs() {
321 worker.registers.bar0.reset(&self.driver).await;
322 anyhow::bail!("device had fatal error");
323 }
324 if csts.rdy() {
325 break;
326 }
327 backoff.back_off().await;
328 }
329 drop(ctrl_enable_span);
330
331 let identify = self
333 .identify
334 .insert(Arc::new(spec::IdentifyController::new_zeroed()));
335
336 admin
337 .issuer()
338 .issue_out(
339 spec::Command {
340 cdw10: spec::Cdw10Identify::new()
341 .with_cns(spec::Cns::CONTROLLER.0)
342 .into(),
343 ..admin_cmd(spec::AdminOpcode::IDENTIFY)
344 },
345 Arc::get_mut(identify).unwrap().as_mut_bytes(),
346 )
347 .await
348 .context("failed to identify controller")?;
349
350 let max_interrupt_count = worker.device.max_interrupt_count();
354 if max_interrupt_count == 0 {
355 anyhow::bail!("bad device behavior: max_interrupt_count == 0");
356 }
357
358 let requested_io_queue_count = if max_interrupt_count < requested_io_queue_count as u32 {
359 tracing::warn!(
360 max_interrupt_count,
361 requested_io_queue_count,
362 "queue count constrained by msi count"
363 );
364 max_interrupt_count as u16
365 } else {
366 requested_io_queue_count
367 };
368
369 let completion = admin
370 .issuer()
371 .issue_neither(spec::Command {
372 cdw10: spec::Cdw10SetFeatures::new()
373 .with_fid(spec::Feature::NUMBER_OF_QUEUES.0)
374 .into(),
375 cdw11: spec::Cdw11FeatureNumberOfQueues::new()
376 .with_ncq_z(requested_io_queue_count - 1)
377 .with_nsq_z(requested_io_queue_count - 1)
378 .into(),
379 ..admin_cmd(spec::AdminOpcode::SET_FEATURES)
380 })
381 .await
382 .context("failed to set number of queues")?;
383
384 let dw0 = spec::Cdw11FeatureNumberOfQueues::from(completion.dw0);
386 let sq_count = dw0.nsq_z() + 1;
387 let cq_count = dw0.ncq_z() + 1;
388 let allocated_io_queue_count = sq_count.min(cq_count);
389 if allocated_io_queue_count < requested_io_queue_count {
390 tracing::warn!(
391 sq_count,
392 cq_count,
393 requested_io_queue_count,
394 "queue count constrained by hardware queue count"
395 );
396 }
397
398 let max_io_queues = allocated_io_queue_count.min(requested_io_queue_count);
399
400 let qsize = {
401 if worker.registers.cap.mqes_z() < 1 {
402 anyhow::bail!("bad device behavior. mqes cannot be 0");
403 }
404
405 let io_cqsize = (QueuePair::MAX_CQ_ENTRIES - 1).min(worker.registers.cap.mqes_z()) + 1;
406 let io_sqsize = (QueuePair::MAX_SQ_ENTRIES - 1).min(worker.registers.cap.mqes_z()) + 1;
407
408 io_cqsize.min(io_sqsize)
410 };
411
412 let async_event_task = self.driver.spawn("nvme_async_event", {
414 let admin = admin.issuer().clone();
415 let rescan_event = self.rescan_event.clone();
416 async move {
417 if let Err(err) = handle_asynchronous_events(&admin, &rescan_event).await {
418 tracing::error!(
419 error = err.as_ref() as &dyn std::error::Error,
420 "asynchronous event failure, not processing any more"
421 );
422 }
423 }
424 });
425
426 let mut state = WorkerState {
427 qsize,
428 async_event_task,
429 max_io_queues,
430 };
431
432 self.admin = Some(admin.issuer().clone());
433
434 let issuer = worker
437 .create_io_queue(&mut state, 0)
438 .await
439 .context("failed to create io queue 1")?;
440
441 self.io_issuers.per_cpu[0].set(issuer).unwrap();
442 task.insert(&self.driver, "nvme_worker", state);
443 task.start();
444 Ok(())
445 }
446
447 pub async fn shutdown(mut self) {
449 if self.nvme_keepalive {
452 return;
453 }
454 self.reset().await;
455 drop(self);
456 }
457
458 fn reset(&mut self) -> impl Send + std::future::Future<Output = ()> + use<T> {
459 let driver = self.driver.clone();
460 let mut task = std::mem::take(&mut self.task).unwrap();
461 async move {
462 task.stop().await;
463 let (worker, state) = task.into_inner();
464 if let Some(state) = state {
465 state.async_event_task.cancel().await;
466 }
467 let _io_responses = join_all(worker.io.into_iter().map(|io| io.queue.shutdown())).await;
470 let _admin_responses;
471 if let Some(admin) = worker.admin {
472 _admin_responses = admin.shutdown().await;
473 }
474 worker.registers.bar0.reset(&driver).await;
475 }
476 }
477
478 pub async fn namespace(&self, nsid: u32) -> Result<Namespace, NamespaceError> {
480 Namespace::new(
481 &self.driver,
482 self.admin.as_ref().unwrap().clone(),
483 self.rescan_event.clone(),
484 self.identify.clone().unwrap(),
485 &self.io_issuers,
486 nsid,
487 )
488 .await
489 }
490
491 pub fn fallback_cpu_count(&self) -> usize {
494 self.io_issuers
495 .per_cpu
496 .iter()
497 .enumerate()
498 .filter(|&(cpu, c)| c.get().is_some_and(|c| c.cpu != cpu as u32))
499 .count()
500 }
501
502 pub async fn save(&mut self) -> anyhow::Result<NvmeDriverSavedState> {
504 if self.identify.is_none() {
506 return Err(save_restore::Error::InvalidState.into());
507 }
508 self.nvme_keepalive = true;
509 match self
510 .io_issuers
511 .send
512 .call(NvmeWorkerRequest::Save, ())
513 .await?
514 {
515 Ok(s) => {
516 Ok(NvmeDriverSavedState {
524 identify_ctrl: spec::IdentifyController::read_from_bytes(
525 self.identify.as_ref().unwrap().as_bytes(),
526 )
527 .unwrap(),
528 device_id: self.device_id.clone(),
529 namespaces: vec![],
531 worker_data: s,
532 })
533 }
534 Err(e) => Err(e),
535 }
536 }
537
538 pub async fn restore(
540 driver_source: &VmTaskDriverSource,
541 cpu_count: u32,
542 mut device: T,
543 saved_state: &NvmeDriverSavedState,
544 ) -> anyhow::Result<Self> {
545 let driver = driver_source.simple();
546 let bar0_mapping = device
547 .map_bar(0)
548 .context("failed to map device registers")?;
549 let bar0 = Bar0(bar0_mapping);
550
551 if !bar0.csts().rdy() {
553 anyhow::bail!("device is gone");
554 }
555
556 let registers = Arc::new(DeviceRegisters::new(bar0));
557
558 let (send, recv) = mesh::channel();
559 let io_issuers = Arc::new(IoIssuers {
560 per_cpu: (0..cpu_count).map(|_| OnceLock::new()).collect(),
561 send,
562 });
563
564 let mut this = Self {
565 device_id: device.id().to_owned(),
566 task: Some(TaskControl::new(DriverWorkerTask {
567 device,
568 driver: driver.clone(),
569 registers: registers.clone(),
570 admin: None, io: Vec::new(),
572 io_issuers: io_issuers.clone(),
573 recv,
574 })),
575 admin: None, identify: Some(Arc::new(
577 spec::IdentifyController::read_from_bytes(saved_state.identify_ctrl.as_bytes())
578 .map_err(|_| RestoreError::InvalidData)?, )),
580 driver: driver.clone(),
581 io_issuers,
582 rescan_event: Default::default(),
583 namespaces: vec![],
584 nvme_keepalive: true,
585 };
586
587 let task = &mut this.task.as_mut().unwrap();
588 let worker = task.task_mut();
589
590 let interrupt0 = worker
592 .device
593 .map_interrupt(0, 0)
594 .context("failed to map interrupt 0")?;
595
596 let dma_client = worker.device.dma_client();
597 let restored_memory = dma_client
598 .attach_pending_buffers()
599 .context("failed to restore allocations")?;
600
601 let admin = saved_state
603 .worker_data
604 .admin
605 .as_ref()
606 .map(|a| {
607 let mem_block = restored_memory
609 .iter()
610 .find(|mem| mem.len() == a.mem_len && a.base_pfn == mem.pfns()[0])
611 .expect("unable to find restored mem block")
612 .to_owned();
613 QueuePair::restore(driver.clone(), interrupt0, registers.clone(), mem_block, a)
614 .unwrap()
615 })
616 .unwrap();
617
618 let admin = worker.admin.insert(admin);
619
620 let async_event_task = this.driver.spawn("nvme_async_event", {
622 let admin = admin.issuer().clone();
623 let rescan_event = this.rescan_event.clone();
624 async move {
625 if let Err(err) = handle_asynchronous_events(&admin, &rescan_event).await {
626 tracing::error!(
627 error = err.as_ref() as &dyn std::error::Error,
628 "asynchronous event failure, not processing any more"
629 );
630 }
631 }
632 });
633
634 let state = WorkerState {
635 qsize: saved_state.worker_data.qsize,
636 async_event_task,
637 max_io_queues: saved_state.worker_data.max_io_queues,
638 };
639
640 this.admin = Some(admin.issuer().clone());
641
642 worker.io = saved_state
645 .worker_data
646 .io
647 .iter()
648 .flat_map(|q| -> Result<IoQueue, anyhow::Error> {
649 let interrupt = worker
650 .device
651 .map_interrupt(q.iv, q.cpu)
652 .context("failed to map interrupt")?;
653 let mem_block = restored_memory
654 .iter()
655 .find(|mem| {
656 mem.len() == q.queue_data.mem_len && q.queue_data.base_pfn == mem.pfns()[0]
657 })
658 .expect("unable to find restored mem block")
659 .to_owned();
660 let q =
661 IoQueue::restore(driver.clone(), interrupt, registers.clone(), mem_block, q)?;
662 let issuer = IoIssuer {
663 issuer: q.queue.issuer().clone(),
664 cpu: q.cpu,
665 };
666 this.io_issuers.per_cpu[q.cpu as usize].set(issuer).unwrap();
667 Ok(q)
668 })
669 .collect();
670
671 for ns in &saved_state.namespaces {
673 this.namespaces.push(Arc::new(Namespace::restore(
677 &driver,
678 admin.issuer().clone(),
679 this.rescan_event.clone(),
680 this.identify.clone().unwrap(),
681 &this.io_issuers,
682 ns,
683 )?));
684 }
685
686 task.insert(&this.driver, "nvme_worker", state);
687 task.start();
688
689 Ok(this)
690 }
691
692 pub fn update_servicing_flags(&mut self, nvme_keepalive: bool) {
694 self.nvme_keepalive = nvme_keepalive;
695 }
696}
697
698async fn handle_asynchronous_events(
699 admin: &Issuer,
700 rescan_event: &event_listener::Event,
701) -> anyhow::Result<()> {
702 loop {
703 let completion = admin
704 .issue_neither(admin_cmd(spec::AdminOpcode::ASYNCHRONOUS_EVENT_REQUEST))
705 .await
706 .context("asynchronous event request failed")?;
707
708 let dw0 = spec::AsynchronousEventRequestDw0::from(completion.dw0);
709 match spec::AsynchronousEventType(dw0.event_type()) {
710 spec::AsynchronousEventType::NOTICE => {
711 tracing::info!("namespace attribute change event");
712
713 let mut list = [0u32; 1024];
715 admin
716 .issue_out(
717 spec::Command {
718 cdw10: spec::Cdw10GetLogPage::new()
719 .with_lid(spec::LogPageIdentifier::CHANGED_NAMESPACE_LIST.0)
720 .with_numdl_z(1023)
721 .into(),
722 ..admin_cmd(spec::AdminOpcode::GET_LOG_PAGE)
723 },
724 list.as_mut_bytes(),
725 )
726 .await
727 .context("failed to query changed namespace list")?;
728
729 if list[0] != 0 {
730 rescan_event.notify(usize::MAX);
732 }
733 }
734 event_type => {
735 tracing::info!(
736 ?event_type,
737 information = dw0.information(),
738 log_page_identifier = dw0.log_page_identifier(),
739 "unhandled asynchronous event"
740 );
741 }
742 }
743 }
744}
745
746impl<T: DeviceBacking> Drop for NvmeDriver<T> {
747 fn drop(&mut self) {
748 if self.task.is_some() {
749 if !self.nvme_keepalive {
751 let reset = self.reset();
754 self.driver.spawn("nvme_drop", reset).detach();
755 }
756 }
757 }
758}
759
760impl IoIssuers {
761 pub async fn get(&self, cpu: u32) -> Result<&Issuer, RequestError> {
762 if let Some(v) = self.per_cpu[cpu as usize].get() {
763 return Ok(&v.issuer);
764 }
765
766 self.send
767 .call(NvmeWorkerRequest::CreateIssuer, cpu)
768 .await
769 .map_err(RequestError::Gone)?;
770
771 Ok(self.per_cpu[cpu as usize]
772 .get()
773 .expect("issuer was set by rpc")
774 .issuer
775 .as_ref())
776 }
777}
778
779impl<T: DeviceBacking> AsyncRun<WorkerState> for DriverWorkerTask<T> {
780 async fn run(
781 &mut self,
782 stop: &mut task_control::StopTask<'_>,
783 state: &mut WorkerState,
784 ) -> Result<(), task_control::Cancelled> {
785 stop.until_stopped(async {
786 loop {
787 match self.recv.next().await {
788 Some(NvmeWorkerRequest::CreateIssuer(rpc)) => {
789 rpc.handle(async |cpu| self.create_io_issuer(state, cpu).await)
790 .await
791 }
792 Some(NvmeWorkerRequest::Save(rpc)) => {
793 rpc.handle(async |_| self.save(state).await).await
794 }
795 None => break,
796 }
797 }
798 })
799 .await
800 }
801}
802
803impl<T: DeviceBacking> DriverWorkerTask<T> {
804 async fn create_io_issuer(&mut self, state: &mut WorkerState, cpu: u32) {
805 tracing::debug!(cpu, "issuer request");
806 if self.io_issuers.per_cpu[cpu as usize].get().is_some() {
807 return;
808 }
809
810 let issuer = match self
811 .create_io_queue(state, cpu)
812 .instrument(info_span!("create_nvme_io_queue", cpu))
813 .await
814 {
815 Ok(issuer) => issuer,
816 Err(err) => {
817 let (fallback_cpu, fallback) = self.io_issuers.per_cpu[..cpu as usize]
819 .iter()
820 .enumerate()
821 .rev()
822 .find_map(|(i, issuer)| issuer.get().map(|issuer| (i, issuer)))
823 .unwrap();
824
825 tracing::error!(
826 cpu,
827 fallback_cpu,
828 error = err.as_ref() as &dyn std::error::Error,
829 "failed to create io queue, falling back"
830 );
831 fallback.clone()
832 }
833 };
834
835 self.io_issuers.per_cpu[cpu as usize]
836 .set(issuer)
837 .ok()
838 .unwrap();
839 }
840
841 async fn create_io_queue(
842 &mut self,
843 state: &mut WorkerState,
844 cpu: u32,
845 ) -> anyhow::Result<IoIssuer> {
846 if self.io.len() >= state.max_io_queues as usize {
847 anyhow::bail!("no more io queues available");
848 }
849
850 let qid = self.io.len() as u16 + 1;
851
852 tracing::debug!(cpu, qid, "creating io queue");
853
854 let iv = self.io.len() as u16;
856 let interrupt = self
857 .device
858 .map_interrupt(iv.into(), cpu)
859 .context("failed to map interrupt")?;
860
861 let queue = QueuePair::new(
862 self.driver.clone(),
863 &self.device,
864 qid,
865 state.qsize,
866 state.qsize,
867 interrupt,
868 self.registers.clone(),
869 )
870 .with_context(|| format!("failed to create io queue pair {qid}"))?;
871
872 let io_sq_addr = queue.sq_addr();
873 let io_cq_addr = queue.cq_addr();
874
875 self.io.push(IoQueue { queue, iv, cpu });
878 let io_queue = self.io.last_mut().unwrap();
879
880 let admin = self.admin.as_ref().unwrap().issuer().as_ref();
881
882 let mut created_completion_queue = false;
883 let r = async {
884 admin
885 .issue_raw(spec::Command {
886 cdw10: spec::Cdw10CreateIoQueue::new()
887 .with_qid(qid)
888 .with_qsize_z(state.qsize - 1)
889 .into(),
890 cdw11: spec::Cdw11CreateIoCompletionQueue::new()
891 .with_ien(true)
892 .with_iv(iv)
893 .with_pc(true)
894 .into(),
895 dptr: [io_cq_addr, 0],
896 ..admin_cmd(spec::AdminOpcode::CREATE_IO_COMPLETION_QUEUE)
897 })
898 .await
899 .with_context(|| format!("failed to create io completion queue {qid}"))?;
900
901 created_completion_queue = true;
902
903 admin
904 .issue_raw(spec::Command {
905 cdw10: spec::Cdw10CreateIoQueue::new()
906 .with_qid(qid)
907 .with_qsize_z(state.qsize - 1)
908 .into(),
909 cdw11: spec::Cdw11CreateIoSubmissionQueue::new()
910 .with_cqid(qid)
911 .with_pc(true)
912 .into(),
913 dptr: [io_sq_addr, 0],
914 ..admin_cmd(spec::AdminOpcode::CREATE_IO_SUBMISSION_QUEUE)
915 })
916 .await
917 .with_context(|| format!("failed to create io submission queue {qid}"))?;
918
919 Ok(())
920 };
921
922 if let Err(err) = r.await {
923 if created_completion_queue {
924 if let Err(err) = admin
925 .issue_raw(spec::Command {
926 cdw10: spec::Cdw10DeleteIoQueue::new().with_qid(qid).into(),
927 ..admin_cmd(spec::AdminOpcode::DELETE_IO_COMPLETION_QUEUE)
928 })
929 .await
930 {
931 tracing::error!(
932 error = &err as &dyn std::error::Error,
933 "failed to delete completion queue in teardown path"
934 );
935 }
936 }
937 let io = self.io.pop().unwrap();
938 io.queue.shutdown().await;
939 return Err(err);
940 }
941
942 Ok(IoIssuer {
943 issuer: io_queue.queue.issuer().clone(),
944 cpu,
945 })
946 }
947
948 pub async fn save(
950 &mut self,
951 worker_state: &mut WorkerState,
952 ) -> anyhow::Result<NvmeDriverWorkerSavedState> {
953 let admin = match self.admin.as_ref() {
954 Some(a) => Some(a.save().await?),
955 None => None,
956 };
957
958 let io = join_all(self.io.drain(..).map(async |q| q.save().await))
959 .await
960 .into_iter()
961 .flatten()
962 .collect();
963
964 Ok(NvmeDriverWorkerSavedState {
965 admin,
966 io,
967 qsize: worker_state.qsize,
968 max_io_queues: worker_state.max_io_queues,
969 })
970 }
971}
972
973impl<T: DeviceBacking> InspectTask<WorkerState> for DriverWorkerTask<T> {
974 fn inspect(&self, req: inspect::Request<'_>, state: Option<&WorkerState>) {
975 req.respond().merge(self).merge(state);
976 }
977}
978
979pub mod save_restore {
980 use super::*;
981
982 #[derive(Debug, Error)]
984 pub enum Error {
985 #[error("invalid object state")]
987 InvalidState,
988 }
989
990 #[derive(Protobuf, Clone, Debug)]
992 #[mesh(package = "nvme_driver")]
993 pub struct NvmeDriverSavedState {
994 #[mesh(1, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
997 pub identify_ctrl: spec::IdentifyController,
998 #[mesh(2)]
1000 pub device_id: String,
1001 #[mesh(3)]
1003 pub namespaces: Vec<SavedNamespaceData>,
1004 #[mesh(4)]
1006 pub worker_data: NvmeDriverWorkerSavedState,
1007 }
1008
1009 #[derive(Protobuf, Clone, Debug)]
1011 #[mesh(package = "nvme_driver")]
1012 pub struct NvmeDriverWorkerSavedState {
1013 #[mesh(1)]
1015 pub admin: Option<QueuePairSavedState>,
1016 #[mesh(2)]
1018 pub io: Vec<IoQueueSavedState>,
1019 #[mesh(3)]
1021 pub qsize: u16,
1022 #[mesh(4)]
1024 pub max_io_queues: u16,
1025 }
1026
1027 #[derive(Protobuf, Clone, Debug)]
1029 #[mesh(package = "nvme_driver")]
1030 pub struct QueuePairSavedState {
1031 #[mesh(1)]
1033 pub mem_len: usize,
1034 #[mesh(2)]
1036 pub base_pfn: u64,
1037 #[mesh(3)]
1040 pub qid: u16,
1041 #[mesh(4)]
1043 pub sq_entries: u16,
1044 #[mesh(5)]
1046 pub cq_entries: u16,
1047 #[mesh(6)]
1049 pub handler_data: QueueHandlerSavedState,
1050 }
1051
1052 #[derive(Protobuf, Clone, Debug)]
1054 #[mesh(package = "nvme_driver")]
1055 pub struct IoQueueSavedState {
1056 #[mesh(1)]
1057 pub cpu: u32,
1059 #[mesh(2)]
1060 pub iv: u32,
1062 #[mesh(3)]
1063 pub queue_data: QueuePairSavedState,
1064 }
1065
1066 #[derive(Protobuf, Clone, Debug)]
1068 #[mesh(package = "nvme_driver")]
1069 pub struct QueueHandlerSavedState {
1070 #[mesh(1)]
1071 pub sq_state: SubmissionQueueSavedState,
1072 #[mesh(2)]
1073 pub cq_state: CompletionQueueSavedState,
1074 #[mesh(3)]
1075 pub pending_cmds: PendingCommandsSavedState,
1076 }
1077
1078 #[derive(Protobuf, Clone, Debug)]
1079 #[mesh(package = "nvme_driver")]
1080 pub struct SubmissionQueueSavedState {
1081 #[mesh(1)]
1082 pub sqid: u16,
1083 #[mesh(2)]
1084 pub head: u32,
1085 #[mesh(3)]
1086 pub tail: u32,
1087 #[mesh(4)]
1088 pub committed_tail: u32,
1089 #[mesh(5)]
1090 pub len: u32,
1091 }
1092
1093 #[derive(Protobuf, Clone, Debug)]
1094 #[mesh(package = "nvme_driver")]
1095 pub struct CompletionQueueSavedState {
1096 #[mesh(1)]
1097 pub cqid: u16,
1098 #[mesh(2)]
1099 pub head: u32,
1100 #[mesh(3)]
1101 pub committed_head: u32,
1102 #[mesh(4)]
1103 pub len: u32,
1104 #[mesh(5)]
1105 pub phase: bool,
1107 }
1108
1109 #[derive(Protobuf, Clone, Debug)]
1110 #[mesh(package = "nvme_driver")]
1111 pub struct PendingCommandSavedState {
1112 #[mesh(1, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
1113 pub command: spec::Command,
1114 }
1115
1116 #[derive(Protobuf, Clone, Debug)]
1117 #[mesh(package = "nvme_driver")]
1118 pub struct PendingCommandsSavedState {
1119 #[mesh(1)]
1120 pub commands: Vec<PendingCommandSavedState>,
1121 #[mesh(2)]
1122 pub next_cid_high_bits: u16,
1123 #[mesh(3)]
1124 pub cid_key_bits: u32,
1125 }
1126
1127 #[derive(Protobuf, Clone, Debug)]
1129 #[mesh(package = "nvme_driver")]
1130 pub struct SavedNamespaceData {
1131 #[mesh(1)]
1132 pub nsid: u32,
1133 #[mesh(2, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
1134 pub identify_ns: nvme_spec::nvm::IdentifyNamespace,
1135 }
1136}