nvme_driver/
driver.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Implementation of the device driver core.
5
6use super::spec;
7use crate::NVME_PAGE_SHIFT;
8use crate::Namespace;
9use crate::NamespaceError;
10use crate::NvmeDriverSavedState;
11use crate::RequestError;
12use crate::driver::save_restore::IoQueueSavedState;
13use crate::queue_pair::Issuer;
14use crate::queue_pair::QueuePair;
15use crate::queue_pair::admin_cmd;
16use crate::registers::Bar0;
17use crate::registers::DeviceRegisters;
18use anyhow::Context as _;
19use futures::StreamExt;
20use futures::future::join_all;
21use inspect::Inspect;
22use mesh::payload::Protobuf;
23use mesh::rpc::Rpc;
24use mesh::rpc::RpcSend;
25use pal_async::task::Spawn;
26use pal_async::task::Task;
27use save_restore::NvmeDriverWorkerSavedState;
28use std::sync::Arc;
29use std::sync::OnceLock;
30use task_control::AsyncRun;
31use task_control::InspectTask;
32use task_control::TaskControl;
33use thiserror::Error;
34use tracing::Instrument;
35use tracing::info_span;
36use user_driver::DeviceBacking;
37use user_driver::backoff::Backoff;
38use user_driver::interrupt::DeviceInterrupt;
39use user_driver::memory::MemoryBlock;
40use vmcore::vm_task::VmTaskDriver;
41use vmcore::vm_task::VmTaskDriverSource;
42use zerocopy::FromBytes;
43use zerocopy::FromZeros;
44use zerocopy::IntoBytes;
45
46/// An NVMe driver.
47///
48/// Note that if this is dropped, the process will abort. Call
49/// [`NvmeDriver::shutdown`] to drop this.
50///
51/// Further, note that this is an internal interface to be used
52/// only by `NvmeDisk`! Remove any sanitization in `fuzz_nvm_driver.rs`
53/// if this struct is used anywhere else.
54#[derive(Inspect)]
55pub struct NvmeDriver<T: DeviceBacking> {
56    #[inspect(flatten)]
57    task: Option<TaskControl<DriverWorkerTask<T>, WorkerState>>,
58    device_id: String,
59    identify: Option<Arc<spec::IdentifyController>>,
60    #[inspect(skip)]
61    driver: VmTaskDriver,
62    #[inspect(skip)]
63    admin: Option<Arc<Issuer>>,
64    #[inspect(skip)]
65    io_issuers: Arc<IoIssuers>,
66    #[inspect(skip)]
67    rescan_event: Arc<event_listener::Event>,
68    /// NVMe namespaces associated with this driver.
69    #[inspect(skip)]
70    namespaces: Vec<Arc<Namespace>>,
71    /// Keeps the controller connected (CC.EN==1) while servicing.
72    nvme_keepalive: bool,
73}
74
75#[derive(Inspect)]
76struct DriverWorkerTask<T: DeviceBacking> {
77    device: T,
78    #[inspect(skip)]
79    driver: VmTaskDriver,
80    registers: Arc<DeviceRegisters<T>>,
81    admin: Option<QueuePair>,
82    #[inspect(iter_by_index)]
83    io: Vec<IoQueue>,
84    io_issuers: Arc<IoIssuers>,
85    #[inspect(skip)]
86    recv: mesh::Receiver<NvmeWorkerRequest>,
87}
88
89#[derive(Inspect)]
90struct WorkerState {
91    max_io_queues: u16,
92    qsize: u16,
93    #[inspect(skip)]
94    async_event_task: Task<()>,
95}
96
97/// An error restoring from saved state.
98#[derive(Debug, Error)]
99pub enum RestoreError {
100    #[error("invalid data")]
101    InvalidData,
102}
103
104#[derive(Inspect)]
105struct IoQueue {
106    queue: QueuePair,
107    iv: u16,
108    cpu: u32,
109}
110
111impl IoQueue {
112    pub async fn save(&self) -> anyhow::Result<IoQueueSavedState> {
113        Ok(IoQueueSavedState {
114            cpu: self.cpu,
115            iv: self.iv as u32,
116            queue_data: self.queue.save().await?,
117        })
118    }
119
120    pub fn restore(
121        spawner: VmTaskDriver,
122        interrupt: DeviceInterrupt,
123        registers: Arc<DeviceRegisters<impl DeviceBacking>>,
124        mem_block: MemoryBlock,
125        saved_state: &IoQueueSavedState,
126    ) -> anyhow::Result<Self> {
127        let IoQueueSavedState {
128            cpu,
129            iv,
130            queue_data,
131        } = saved_state;
132        let queue =
133            QueuePair::restore(spawner, interrupt, registers.clone(), mem_block, queue_data)?;
134
135        Ok(Self {
136            queue,
137            iv: *iv as u16,
138            cpu: *cpu,
139        })
140    }
141}
142
143#[derive(Debug, Inspect)]
144#[inspect(transparent)]
145pub(crate) struct IoIssuers {
146    #[inspect(iter_by_index)]
147    per_cpu: Vec<OnceLock<IoIssuer>>,
148    #[inspect(skip)]
149    send: mesh::Sender<NvmeWorkerRequest>,
150}
151
152#[derive(Debug, Clone, Inspect)]
153struct IoIssuer {
154    #[inspect(flatten)]
155    issuer: Arc<Issuer>,
156    cpu: u32,
157}
158
159#[derive(Debug)]
160enum NvmeWorkerRequest {
161    CreateIssuer(Rpc<u32, ()>),
162    /// Save worker state.
163    Save(Rpc<(), anyhow::Result<NvmeDriverWorkerSavedState>>),
164}
165
166impl<T: DeviceBacking> NvmeDriver<T> {
167    /// Initializes the driver.
168    pub async fn new(
169        driver_source: &VmTaskDriverSource,
170        cpu_count: u32,
171        device: T,
172    ) -> anyhow::Result<Self> {
173        let pci_id = device.id().to_owned();
174        let mut this = Self::new_disabled(driver_source, cpu_count, device)
175            .instrument(tracing::info_span!("nvme_new_disabled", pci_id))
176            .await?;
177        match this
178            .enable(cpu_count as u16)
179            .instrument(tracing::info_span!("nvme_enable", pci_id))
180            .await
181        {
182            Ok(()) => Ok(this),
183            Err(err) => {
184                tracing::error!(
185                    error = err.as_ref() as &dyn std::error::Error,
186                    "device initialization failed, shutting down"
187                );
188                this.shutdown().await;
189                Err(err)
190            }
191        }
192    }
193
194    /// Initializes but does not enable the device. DMA memory
195    /// is preallocated from backing device.
196    async fn new_disabled(
197        driver_source: &VmTaskDriverSource,
198        cpu_count: u32,
199        mut device: T,
200    ) -> anyhow::Result<Self> {
201        let driver = driver_source.simple();
202        let bar0 = Bar0(
203            device
204                .map_bar(0)
205                .context("failed to map device registers")?,
206        );
207
208        let cc = bar0.cc();
209        if cc.en() || bar0.csts().rdy() {
210            if !bar0
211                .reset(&driver)
212                .instrument(tracing::info_span!(
213                    "nvme_already_enabled",
214                    pci_id = device.id().to_owned()
215                ))
216                .await
217            {
218                anyhow::bail!("device is gone");
219            }
220        }
221
222        let registers = Arc::new(DeviceRegisters::new(bar0));
223        let cap = registers.cap;
224
225        if cap.mpsmin() != 0 {
226            anyhow::bail!(
227                "unsupported minimum page size: {}",
228                cap.mpsmin() + NVME_PAGE_SHIFT
229            );
230        }
231
232        let (send, recv) = mesh::channel();
233        let io_issuers = Arc::new(IoIssuers {
234            per_cpu: (0..cpu_count).map(|_| OnceLock::new()).collect(),
235            send,
236        });
237
238        Ok(Self {
239            device_id: device.id().to_owned(),
240            task: Some(TaskControl::new(DriverWorkerTask {
241                device,
242                driver: driver.clone(),
243                registers,
244                admin: None,
245                io: Vec::new(),
246                io_issuers: io_issuers.clone(),
247                recv,
248            })),
249            admin: None,
250            identify: None,
251            driver,
252            io_issuers,
253            rescan_event: Default::default(),
254            namespaces: vec![],
255            nvme_keepalive: false,
256        })
257    }
258
259    /// Enables the device, aliasing the admin queue memory and adding IO queues.
260    async fn enable(&mut self, requested_io_queue_count: u16) -> anyhow::Result<()> {
261        const ADMIN_QID: u16 = 0;
262
263        let task = &mut self.task.as_mut().unwrap();
264        let worker = task.task_mut();
265
266        // Request the admin queue pair be the same size to avoid potential
267        // device bugs where differing sizes might be a less common scenario
268        //
269        // Namely: using differing sizes revealed a bug in the initial NvmeDirectV2 implementation
270        let admin_len = std::cmp::min(QueuePair::MAX_SQ_ENTRIES, QueuePair::MAX_CQ_ENTRIES);
271        let admin_sqes = admin_len;
272        let admin_cqes = admin_len;
273
274        let interrupt0 = worker
275            .device
276            .map_interrupt(0, 0)
277            .context("failed to map interrupt 0")?;
278
279        // Start the admin queue pair.
280        let admin = QueuePair::new(
281            self.driver.clone(),
282            &worker.device,
283            ADMIN_QID,
284            admin_sqes,
285            admin_cqes,
286            interrupt0,
287            worker.registers.clone(),
288        )
289        .context("failed to create admin queue pair")?;
290
291        let admin = worker.admin.insert(admin);
292
293        // Register the admin queue with the controller.
294        worker.registers.bar0.set_aqa(
295            spec::Aqa::new()
296                .with_acqs_z(admin_cqes - 1)
297                .with_asqs_z(admin_sqes - 1),
298        );
299        worker.registers.bar0.set_asq(admin.sq_addr());
300        worker.registers.bar0.set_acq(admin.cq_addr());
301
302        // Enable the controller.
303        let span = tracing::info_span!("nvme_ctrl_enable", pci_id = worker.device.id().to_owned());
304        let ctrl_enable_span = span.enter();
305        worker.registers.bar0.set_cc(
306            spec::Cc::new()
307                .with_iocqes(4)
308                .with_iosqes(6)
309                .with_en(true)
310                .with_mps(0),
311        );
312
313        // Wait for the controller to be ready.
314        let mut backoff = Backoff::new(&self.driver);
315        loop {
316            let csts = worker.registers.bar0.csts();
317            if u32::from(csts) == !0 {
318                anyhow::bail!("device is gone");
319            }
320            if csts.cfs() {
321                worker.registers.bar0.reset(&self.driver).await;
322                anyhow::bail!("device had fatal error");
323            }
324            if csts.rdy() {
325                break;
326            }
327            backoff.back_off().await;
328        }
329        drop(ctrl_enable_span);
330
331        // Get the controller identify structure.
332        let identify = self
333            .identify
334            .insert(Arc::new(spec::IdentifyController::new_zeroed()));
335
336        admin
337            .issuer()
338            .issue_out(
339                spec::Command {
340                    cdw10: spec::Cdw10Identify::new()
341                        .with_cns(spec::Cns::CONTROLLER.0)
342                        .into(),
343                    ..admin_cmd(spec::AdminOpcode::IDENTIFY)
344                },
345                Arc::get_mut(identify).unwrap().as_mut_bytes(),
346            )
347            .await
348            .context("failed to identify controller")?;
349
350        // Configure the number of IO queues.
351        //
352        // Note that interrupt zero is shared between IO queue 1 and the admin queue.
353        let max_interrupt_count = worker.device.max_interrupt_count();
354        if max_interrupt_count == 0 {
355            anyhow::bail!("bad device behavior: max_interrupt_count == 0");
356        }
357
358        let requested_io_queue_count = if max_interrupt_count < requested_io_queue_count as u32 {
359            tracing::warn!(
360                max_interrupt_count,
361                requested_io_queue_count,
362                "queue count constrained by msi count"
363            );
364            max_interrupt_count as u16
365        } else {
366            requested_io_queue_count
367        };
368
369        let completion = admin
370            .issuer()
371            .issue_neither(spec::Command {
372                cdw10: spec::Cdw10SetFeatures::new()
373                    .with_fid(spec::Feature::NUMBER_OF_QUEUES.0)
374                    .into(),
375                cdw11: spec::Cdw11FeatureNumberOfQueues::new()
376                    .with_ncq_z(requested_io_queue_count - 1)
377                    .with_nsq_z(requested_io_queue_count - 1)
378                    .into(),
379                ..admin_cmd(spec::AdminOpcode::SET_FEATURES)
380            })
381            .await
382            .context("failed to set number of queues")?;
383
384        // See how many queues are actually available.
385        let dw0 = spec::Cdw11FeatureNumberOfQueues::from(completion.dw0);
386        let sq_count = dw0.nsq_z() + 1;
387        let cq_count = dw0.ncq_z() + 1;
388        let allocated_io_queue_count = sq_count.min(cq_count);
389        if allocated_io_queue_count < requested_io_queue_count {
390            tracing::warn!(
391                sq_count,
392                cq_count,
393                requested_io_queue_count,
394                "queue count constrained by hardware queue count"
395            );
396        }
397
398        let max_io_queues = allocated_io_queue_count.min(requested_io_queue_count);
399
400        let qsize = {
401            if worker.registers.cap.mqes_z() < 1 {
402                anyhow::bail!("bad device behavior. mqes cannot be 0");
403            }
404
405            let io_cqsize = (QueuePair::MAX_CQ_ENTRIES - 1).min(worker.registers.cap.mqes_z()) + 1;
406            let io_sqsize = (QueuePair::MAX_SQ_ENTRIES - 1).min(worker.registers.cap.mqes_z()) + 1;
407
408            // Some hardware (such as ASAP) require that the sq and cq have the same size.
409            io_cqsize.min(io_sqsize)
410        };
411
412        // Spawn a task to handle asynchronous events.
413        let async_event_task = self.driver.spawn("nvme_async_event", {
414            let admin = admin.issuer().clone();
415            let rescan_event = self.rescan_event.clone();
416            async move {
417                if let Err(err) = handle_asynchronous_events(&admin, &rescan_event).await {
418                    tracing::error!(
419                        error = err.as_ref() as &dyn std::error::Error,
420                        "asynchronous event failure, not processing any more"
421                    );
422                }
423            }
424        });
425
426        let mut state = WorkerState {
427            qsize,
428            async_event_task,
429            max_io_queues,
430        };
431
432        self.admin = Some(admin.issuer().clone());
433
434        // Pre-create the IO queue 1 for CPU 0. The other queues will be created
435        // lazily. Numbering for I/O queues starts with 1 (0 is Admin).
436        let issuer = worker
437            .create_io_queue(&mut state, 0)
438            .await
439            .context("failed to create io queue 1")?;
440
441        self.io_issuers.per_cpu[0].set(issuer).unwrap();
442        task.insert(&self.driver, "nvme_worker", state);
443        task.start();
444        Ok(())
445    }
446
447    /// Shuts the device down.
448    pub async fn shutdown(mut self) {
449        // If nvme_keepalive was requested, return early.
450        // The memory is still aliased as we don't flush pending IOs.
451        if self.nvme_keepalive {
452            return;
453        }
454        self.reset().await;
455        drop(self);
456    }
457
458    fn reset(&mut self) -> impl Send + std::future::Future<Output = ()> + use<T> {
459        let driver = self.driver.clone();
460        let mut task = std::mem::take(&mut self.task).unwrap();
461        async move {
462            task.stop().await;
463            let (worker, state) = task.into_inner();
464            if let Some(state) = state {
465                state.async_event_task.cancel().await;
466            }
467            // Hold onto responses until the reset completes so that waiting IOs do
468            // not think the memory is unaliased by the device.
469            let _io_responses = join_all(worker.io.into_iter().map(|io| io.queue.shutdown())).await;
470            let _admin_responses;
471            if let Some(admin) = worker.admin {
472                _admin_responses = admin.shutdown().await;
473            }
474            worker.registers.bar0.reset(&driver).await;
475        }
476    }
477
478    /// Gets the namespace with namespace ID `nsid`.
479    pub async fn namespace(&self, nsid: u32) -> Result<Namespace, NamespaceError> {
480        Namespace::new(
481            &self.driver,
482            self.admin.as_ref().unwrap().clone(),
483            self.rescan_event.clone(),
484            self.identify.clone().unwrap(),
485            &self.io_issuers,
486            nsid,
487        )
488        .await
489    }
490
491    /// Returns the number of CPUs that are in fallback mode (that are using a
492    /// remote CPU's queue due to a failure or resource limitation).
493    pub fn fallback_cpu_count(&self) -> usize {
494        self.io_issuers
495            .per_cpu
496            .iter()
497            .enumerate()
498            .filter(|&(cpu, c)| c.get().is_some_and(|c| c.cpu != cpu as u32))
499            .count()
500    }
501
502    /// Saves the NVMe driver state during servicing.
503    pub async fn save(&mut self) -> anyhow::Result<NvmeDriverSavedState> {
504        // Nothing to save if Identify Controller was never queried.
505        if self.identify.is_none() {
506            return Err(save_restore::Error::InvalidState.into());
507        }
508        self.nvme_keepalive = true;
509        match self
510            .io_issuers
511            .send
512            .call(NvmeWorkerRequest::Save, ())
513            .await?
514        {
515            Ok(s) => {
516                // TODO: The decision is to re-query namespace data after the restore.
517                // Leaving the code in place so it can be restored in future.
518                // The reason is uncertainty about namespace change during servicing.
519                // ------
520                // for ns in &self.namespaces {
521                //     s.namespaces.push(ns.save()?);
522                // }
523                Ok(NvmeDriverSavedState {
524                    identify_ctrl: spec::IdentifyController::read_from_bytes(
525                        self.identify.as_ref().unwrap().as_bytes(),
526                    )
527                    .unwrap(),
528                    device_id: self.device_id.clone(),
529                    // TODO: See the description above, save the vector once resolved.
530                    namespaces: vec![],
531                    worker_data: s,
532                })
533            }
534            Err(e) => Err(e),
535        }
536    }
537
538    /// Restores NVMe driver state after servicing.
539    pub async fn restore(
540        driver_source: &VmTaskDriverSource,
541        cpu_count: u32,
542        mut device: T,
543        saved_state: &NvmeDriverSavedState,
544    ) -> anyhow::Result<Self> {
545        let driver = driver_source.simple();
546        let bar0_mapping = device
547            .map_bar(0)
548            .context("failed to map device registers")?;
549        let bar0 = Bar0(bar0_mapping);
550
551        // It is expected the device to be alive when restoring.
552        if !bar0.csts().rdy() {
553            anyhow::bail!("device is gone");
554        }
555
556        let registers = Arc::new(DeviceRegisters::new(bar0));
557
558        let (send, recv) = mesh::channel();
559        let io_issuers = Arc::new(IoIssuers {
560            per_cpu: (0..cpu_count).map(|_| OnceLock::new()).collect(),
561            send,
562        });
563
564        let mut this = Self {
565            device_id: device.id().to_owned(),
566            task: Some(TaskControl::new(DriverWorkerTask {
567                device,
568                driver: driver.clone(),
569                registers: registers.clone(),
570                admin: None, // Updated below.
571                io: Vec::new(),
572                io_issuers: io_issuers.clone(),
573                recv,
574            })),
575            admin: None, // Updated below.
576            identify: Some(Arc::new(
577                spec::IdentifyController::read_from_bytes(saved_state.identify_ctrl.as_bytes())
578                    .map_err(|_| RestoreError::InvalidData)?, // TODO: zerocopy: map_err (https://github.com/microsoft/openvmm/issues/759)
579            )),
580            driver: driver.clone(),
581            io_issuers,
582            rescan_event: Default::default(),
583            namespaces: vec![],
584            nvme_keepalive: true,
585        };
586
587        let task = &mut this.task.as_mut().unwrap();
588        let worker = task.task_mut();
589
590        // Interrupt 0 is shared between admin queue and I/O queue 1.
591        let interrupt0 = worker
592            .device
593            .map_interrupt(0, 0)
594            .context("failed to map interrupt 0")?;
595
596        let dma_client = worker.device.dma_client();
597        let restored_memory = dma_client
598            .attach_pending_buffers()
599            .context("failed to restore allocations")?;
600
601        // Restore the admin queue pair.
602        let admin = saved_state
603            .worker_data
604            .admin
605            .as_ref()
606            .map(|a| {
607                // Restore memory block for admin queue pair.
608                let mem_block = restored_memory
609                    .iter()
610                    .find(|mem| mem.len() == a.mem_len && a.base_pfn == mem.pfns()[0])
611                    .expect("unable to find restored mem block")
612                    .to_owned();
613                QueuePair::restore(driver.clone(), interrupt0, registers.clone(), mem_block, a)
614                    .unwrap()
615            })
616            .unwrap();
617
618        let admin = worker.admin.insert(admin);
619
620        // Spawn a task to handle asynchronous events.
621        let async_event_task = this.driver.spawn("nvme_async_event", {
622            let admin = admin.issuer().clone();
623            let rescan_event = this.rescan_event.clone();
624            async move {
625                if let Err(err) = handle_asynchronous_events(&admin, &rescan_event).await {
626                    tracing::error!(
627                        error = err.as_ref() as &dyn std::error::Error,
628                        "asynchronous event failure, not processing any more"
629                    );
630                }
631            }
632        });
633
634        let state = WorkerState {
635            qsize: saved_state.worker_data.qsize,
636            async_event_task,
637            max_io_queues: saved_state.worker_data.max_io_queues,
638        };
639
640        this.admin = Some(admin.issuer().clone());
641
642        // Restore I/O queues.
643        // Interrupt vector 0 is shared between Admin queue and I/O queue #1.
644        worker.io = saved_state
645            .worker_data
646            .io
647            .iter()
648            .flat_map(|q| -> Result<IoQueue, anyhow::Error> {
649                let interrupt = worker
650                    .device
651                    .map_interrupt(q.iv, q.cpu)
652                    .context("failed to map interrupt")?;
653                let mem_block = restored_memory
654                    .iter()
655                    .find(|mem| {
656                        mem.len() == q.queue_data.mem_len && q.queue_data.base_pfn == mem.pfns()[0]
657                    })
658                    .expect("unable to find restored mem block")
659                    .to_owned();
660                let q =
661                    IoQueue::restore(driver.clone(), interrupt, registers.clone(), mem_block, q)?;
662                let issuer = IoIssuer {
663                    issuer: q.queue.issuer().clone(),
664                    cpu: q.cpu,
665                };
666                this.io_issuers.per_cpu[q.cpu as usize].set(issuer).unwrap();
667                Ok(q)
668            })
669            .collect();
670
671        // Restore namespace(s).
672        for ns in &saved_state.namespaces {
673            // TODO: Current approach is to re-query namespace data after servicing
674            // and this array will be empty. Once we confirm that we can process
675            // namespace change notification AEN, the restore code will be re-added.
676            this.namespaces.push(Arc::new(Namespace::restore(
677                &driver,
678                admin.issuer().clone(),
679                this.rescan_event.clone(),
680                this.identify.clone().unwrap(),
681                &this.io_issuers,
682                ns,
683            )?));
684        }
685
686        task.insert(&this.driver, "nvme_worker", state);
687        task.start();
688
689        Ok(this)
690    }
691
692    /// Change device's behavior when servicing.
693    pub fn update_servicing_flags(&mut self, nvme_keepalive: bool) {
694        self.nvme_keepalive = nvme_keepalive;
695    }
696}
697
698async fn handle_asynchronous_events(
699    admin: &Issuer,
700    rescan_event: &event_listener::Event,
701) -> anyhow::Result<()> {
702    loop {
703        let completion = admin
704            .issue_neither(admin_cmd(spec::AdminOpcode::ASYNCHRONOUS_EVENT_REQUEST))
705            .await
706            .context("asynchronous event request failed")?;
707
708        let dw0 = spec::AsynchronousEventRequestDw0::from(completion.dw0);
709        match spec::AsynchronousEventType(dw0.event_type()) {
710            spec::AsynchronousEventType::NOTICE => {
711                tracing::info!("namespace attribute change event");
712
713                // Clear the namespace list.
714                let mut list = [0u32; 1024];
715                admin
716                    .issue_out(
717                        spec::Command {
718                            cdw10: spec::Cdw10GetLogPage::new()
719                                .with_lid(spec::LogPageIdentifier::CHANGED_NAMESPACE_LIST.0)
720                                .with_numdl_z(1023)
721                                .into(),
722                            ..admin_cmd(spec::AdminOpcode::GET_LOG_PAGE)
723                        },
724                        list.as_mut_bytes(),
725                    )
726                    .await
727                    .context("failed to query changed namespace list")?;
728
729                if list[0] != 0 {
730                    // For simplicity, tell all namespaces to rescan.
731                    rescan_event.notify(usize::MAX);
732                }
733            }
734            event_type => {
735                tracing::info!(
736                    ?event_type,
737                    information = dw0.information(),
738                    log_page_identifier = dw0.log_page_identifier(),
739                    "unhandled asynchronous event"
740                );
741            }
742        }
743    }
744}
745
746impl<T: DeviceBacking> Drop for NvmeDriver<T> {
747    fn drop(&mut self) {
748        if self.task.is_some() {
749            // Do not reset NVMe device when nvme_keepalive is requested.
750            if !self.nvme_keepalive {
751                // Reset the device asynchronously so that pending IOs are not
752                // dropped while their memory is aliased.
753                let reset = self.reset();
754                self.driver.spawn("nvme_drop", reset).detach();
755            }
756        }
757    }
758}
759
760impl IoIssuers {
761    pub async fn get(&self, cpu: u32) -> Result<&Issuer, RequestError> {
762        if let Some(v) = self.per_cpu[cpu as usize].get() {
763            return Ok(&v.issuer);
764        }
765
766        self.send
767            .call(NvmeWorkerRequest::CreateIssuer, cpu)
768            .await
769            .map_err(RequestError::Gone)?;
770
771        Ok(self.per_cpu[cpu as usize]
772            .get()
773            .expect("issuer was set by rpc")
774            .issuer
775            .as_ref())
776    }
777}
778
779impl<T: DeviceBacking> AsyncRun<WorkerState> for DriverWorkerTask<T> {
780    async fn run(
781        &mut self,
782        stop: &mut task_control::StopTask<'_>,
783        state: &mut WorkerState,
784    ) -> Result<(), task_control::Cancelled> {
785        stop.until_stopped(async {
786            loop {
787                match self.recv.next().await {
788                    Some(NvmeWorkerRequest::CreateIssuer(rpc)) => {
789                        rpc.handle(async |cpu| self.create_io_issuer(state, cpu).await)
790                            .await
791                    }
792                    Some(NvmeWorkerRequest::Save(rpc)) => {
793                        rpc.handle(async |_| self.save(state).await).await
794                    }
795                    None => break,
796                }
797            }
798        })
799        .await
800    }
801}
802
803impl<T: DeviceBacking> DriverWorkerTask<T> {
804    async fn create_io_issuer(&mut self, state: &mut WorkerState, cpu: u32) {
805        tracing::debug!(cpu, "issuer request");
806        if self.io_issuers.per_cpu[cpu as usize].get().is_some() {
807            return;
808        }
809
810        let issuer = match self
811            .create_io_queue(state, cpu)
812            .instrument(info_span!("create_nvme_io_queue", cpu))
813            .await
814        {
815            Ok(issuer) => issuer,
816            Err(err) => {
817                // Find a fallback queue close in index to the failed queue.
818                let (fallback_cpu, fallback) = self.io_issuers.per_cpu[..cpu as usize]
819                    .iter()
820                    .enumerate()
821                    .rev()
822                    .find_map(|(i, issuer)| issuer.get().map(|issuer| (i, issuer)))
823                    .unwrap();
824
825                tracing::error!(
826                    cpu,
827                    fallback_cpu,
828                    error = err.as_ref() as &dyn std::error::Error,
829                    "failed to create io queue, falling back"
830                );
831                fallback.clone()
832            }
833        };
834
835        self.io_issuers.per_cpu[cpu as usize]
836            .set(issuer)
837            .ok()
838            .unwrap();
839    }
840
841    async fn create_io_queue(
842        &mut self,
843        state: &mut WorkerState,
844        cpu: u32,
845    ) -> anyhow::Result<IoIssuer> {
846        if self.io.len() >= state.max_io_queues as usize {
847            anyhow::bail!("no more io queues available");
848        }
849
850        let qid = self.io.len() as u16 + 1;
851
852        tracing::debug!(cpu, qid, "creating io queue");
853
854        // Share IO queue 1's interrupt with the admin queue.
855        let iv = self.io.len() as u16;
856        let interrupt = self
857            .device
858            .map_interrupt(iv.into(), cpu)
859            .context("failed to map interrupt")?;
860
861        let queue = QueuePair::new(
862            self.driver.clone(),
863            &self.device,
864            qid,
865            state.qsize,
866            state.qsize,
867            interrupt,
868            self.registers.clone(),
869        )
870        .with_context(|| format!("failed to create io queue pair {qid}"))?;
871
872        let io_sq_addr = queue.sq_addr();
873        let io_cq_addr = queue.cq_addr();
874
875        // Add the queue pair before aliasing its memory with the device so
876        // that it can be torn down correctly on failure.
877        self.io.push(IoQueue { queue, iv, cpu });
878        let io_queue = self.io.last_mut().unwrap();
879
880        let admin = self.admin.as_ref().unwrap().issuer().as_ref();
881
882        let mut created_completion_queue = false;
883        let r = async {
884            admin
885                .issue_raw(spec::Command {
886                    cdw10: spec::Cdw10CreateIoQueue::new()
887                        .with_qid(qid)
888                        .with_qsize_z(state.qsize - 1)
889                        .into(),
890                    cdw11: spec::Cdw11CreateIoCompletionQueue::new()
891                        .with_ien(true)
892                        .with_iv(iv)
893                        .with_pc(true)
894                        .into(),
895                    dptr: [io_cq_addr, 0],
896                    ..admin_cmd(spec::AdminOpcode::CREATE_IO_COMPLETION_QUEUE)
897                })
898                .await
899                .with_context(|| format!("failed to create io completion queue {qid}"))?;
900
901            created_completion_queue = true;
902
903            admin
904                .issue_raw(spec::Command {
905                    cdw10: spec::Cdw10CreateIoQueue::new()
906                        .with_qid(qid)
907                        .with_qsize_z(state.qsize - 1)
908                        .into(),
909                    cdw11: spec::Cdw11CreateIoSubmissionQueue::new()
910                        .with_cqid(qid)
911                        .with_pc(true)
912                        .into(),
913                    dptr: [io_sq_addr, 0],
914                    ..admin_cmd(spec::AdminOpcode::CREATE_IO_SUBMISSION_QUEUE)
915                })
916                .await
917                .with_context(|| format!("failed to create io submission queue {qid}"))?;
918
919            Ok(())
920        };
921
922        if let Err(err) = r.await {
923            if created_completion_queue {
924                if let Err(err) = admin
925                    .issue_raw(spec::Command {
926                        cdw10: spec::Cdw10DeleteIoQueue::new().with_qid(qid).into(),
927                        ..admin_cmd(spec::AdminOpcode::DELETE_IO_COMPLETION_QUEUE)
928                    })
929                    .await
930                {
931                    tracing::error!(
932                        error = &err as &dyn std::error::Error,
933                        "failed to delete completion queue in teardown path"
934                    );
935                }
936            }
937            let io = self.io.pop().unwrap();
938            io.queue.shutdown().await;
939            return Err(err);
940        }
941
942        Ok(IoIssuer {
943            issuer: io_queue.queue.issuer().clone(),
944            cpu,
945        })
946    }
947
948    /// Save NVMe driver state for servicing.
949    pub async fn save(
950        &mut self,
951        worker_state: &mut WorkerState,
952    ) -> anyhow::Result<NvmeDriverWorkerSavedState> {
953        let admin = match self.admin.as_ref() {
954            Some(a) => Some(a.save().await?),
955            None => None,
956        };
957
958        let io = join_all(self.io.drain(..).map(async |q| q.save().await))
959            .await
960            .into_iter()
961            .flatten()
962            .collect();
963
964        Ok(NvmeDriverWorkerSavedState {
965            admin,
966            io,
967            qsize: worker_state.qsize,
968            max_io_queues: worker_state.max_io_queues,
969        })
970    }
971}
972
973impl<T: DeviceBacking> InspectTask<WorkerState> for DriverWorkerTask<T> {
974    fn inspect(&self, req: inspect::Request<'_>, state: Option<&WorkerState>) {
975        req.respond().merge(self).merge(state);
976    }
977}
978
979pub mod save_restore {
980    use super::*;
981
982    /// Save and Restore errors for this module.
983    #[derive(Debug, Error)]
984    pub enum Error {
985        /// No data to save.
986        #[error("invalid object state")]
987        InvalidState,
988    }
989
990    /// Save/restore state for NVMe driver.
991    #[derive(Protobuf, Clone, Debug)]
992    #[mesh(package = "nvme_driver")]
993    pub struct NvmeDriverSavedState {
994        /// Copy of the controller's IDENTIFY structure.
995        /// It is defined as Option<> in original structure.
996        #[mesh(1, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
997        pub identify_ctrl: spec::IdentifyController,
998        /// Device ID string.
999        #[mesh(2)]
1000        pub device_id: String,
1001        /// Namespace data.
1002        #[mesh(3)]
1003        pub namespaces: Vec<SavedNamespaceData>,
1004        /// NVMe driver worker task data.
1005        #[mesh(4)]
1006        pub worker_data: NvmeDriverWorkerSavedState,
1007    }
1008
1009    /// Save/restore state for NVMe driver worker task.
1010    #[derive(Protobuf, Clone, Debug)]
1011    #[mesh(package = "nvme_driver")]
1012    pub struct NvmeDriverWorkerSavedState {
1013        /// Admin queue state.
1014        #[mesh(1)]
1015        pub admin: Option<QueuePairSavedState>,
1016        /// IO queue states.
1017        #[mesh(2)]
1018        pub io: Vec<IoQueueSavedState>,
1019        /// Queue size as determined by CAP.MQES.
1020        #[mesh(3)]
1021        pub qsize: u16,
1022        /// Max number of IO queue pairs.
1023        #[mesh(4)]
1024        pub max_io_queues: u16,
1025    }
1026
1027    /// Save/restore state for QueuePair.
1028    #[derive(Protobuf, Clone, Debug)]
1029    #[mesh(package = "nvme_driver")]
1030    pub struct QueuePairSavedState {
1031        /// Allocated memory size in bytes.
1032        #[mesh(1)]
1033        pub mem_len: usize,
1034        /// First PFN of the physically contiguous block.
1035        #[mesh(2)]
1036        pub base_pfn: u64,
1037        /// Queue ID used when creating the pair
1038        /// (SQ and CQ IDs are using same number).
1039        #[mesh(3)]
1040        pub qid: u16,
1041        /// Submission queue entries.
1042        #[mesh(4)]
1043        pub sq_entries: u16,
1044        /// Completion queue entries.
1045        #[mesh(5)]
1046        pub cq_entries: u16,
1047        /// QueueHandler task data.
1048        #[mesh(6)]
1049        pub handler_data: QueueHandlerSavedState,
1050    }
1051
1052    /// Save/restore state for IoQueue.
1053    #[derive(Protobuf, Clone, Debug)]
1054    #[mesh(package = "nvme_driver")]
1055    pub struct IoQueueSavedState {
1056        #[mesh(1)]
1057        /// Which CPU handles requests.
1058        pub cpu: u32,
1059        #[mesh(2)]
1060        /// Interrupt vector (MSI-X)
1061        pub iv: u32,
1062        #[mesh(3)]
1063        pub queue_data: QueuePairSavedState,
1064    }
1065
1066    /// Save/restore state for QueueHandler task.
1067    #[derive(Protobuf, Clone, Debug)]
1068    #[mesh(package = "nvme_driver")]
1069    pub struct QueueHandlerSavedState {
1070        #[mesh(1)]
1071        pub sq_state: SubmissionQueueSavedState,
1072        #[mesh(2)]
1073        pub cq_state: CompletionQueueSavedState,
1074        #[mesh(3)]
1075        pub pending_cmds: PendingCommandsSavedState,
1076    }
1077
1078    #[derive(Protobuf, Clone, Debug)]
1079    #[mesh(package = "nvme_driver")]
1080    pub struct SubmissionQueueSavedState {
1081        #[mesh(1)]
1082        pub sqid: u16,
1083        #[mesh(2)]
1084        pub head: u32,
1085        #[mesh(3)]
1086        pub tail: u32,
1087        #[mesh(4)]
1088        pub committed_tail: u32,
1089        #[mesh(5)]
1090        pub len: u32,
1091    }
1092
1093    #[derive(Protobuf, Clone, Debug)]
1094    #[mesh(package = "nvme_driver")]
1095    pub struct CompletionQueueSavedState {
1096        #[mesh(1)]
1097        pub cqid: u16,
1098        #[mesh(2)]
1099        pub head: u32,
1100        #[mesh(3)]
1101        pub committed_head: u32,
1102        #[mesh(4)]
1103        pub len: u32,
1104        #[mesh(5)]
1105        /// NVMe completion tag.
1106        pub phase: bool,
1107    }
1108
1109    #[derive(Protobuf, Clone, Debug)]
1110    #[mesh(package = "nvme_driver")]
1111    pub struct PendingCommandSavedState {
1112        #[mesh(1, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
1113        pub command: spec::Command,
1114    }
1115
1116    #[derive(Protobuf, Clone, Debug)]
1117    #[mesh(package = "nvme_driver")]
1118    pub struct PendingCommandsSavedState {
1119        #[mesh(1)]
1120        pub commands: Vec<PendingCommandSavedState>,
1121        #[mesh(2)]
1122        pub next_cid_high_bits: u16,
1123        #[mesh(3)]
1124        pub cid_key_bits: u32,
1125    }
1126
1127    /// NVMe namespace data.
1128    #[derive(Protobuf, Clone, Debug)]
1129    #[mesh(package = "nvme_driver")]
1130    pub struct SavedNamespaceData {
1131        #[mesh(1)]
1132        pub nsid: u32,
1133        #[mesh(2, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
1134        pub identify_ns: nvme_spec::nvm::IdentifyNamespace,
1135    }
1136}