nvme_driver/
driver.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Implementation of the device driver core.
5
6use super::spec;
7use crate::NVME_PAGE_SHIFT;
8use crate::Namespace;
9use crate::NamespaceError;
10use crate::NvmeDriverSavedState;
11use crate::RequestError;
12use crate::driver::save_restore::IoQueueSavedState;
13use crate::queue_pair::Issuer;
14use crate::queue_pair::QueuePair;
15use crate::queue_pair::admin_cmd;
16use crate::registers::Bar0;
17use crate::registers::DeviceRegisters;
18use anyhow::Context as _;
19use futures::StreamExt;
20use futures::future::join_all;
21use inspect::Inspect;
22use mesh::payload::Protobuf;
23use mesh::rpc::Rpc;
24use mesh::rpc::RpcSend;
25use pal_async::task::Spawn;
26use pal_async::task::Task;
27use save_restore::NvmeDriverWorkerSavedState;
28use std::sync::Arc;
29use std::sync::OnceLock;
30use task_control::AsyncRun;
31use task_control::InspectTask;
32use task_control::TaskControl;
33use thiserror::Error;
34use tracing::Instrument;
35use tracing::info_span;
36use user_driver::DeviceBacking;
37use user_driver::backoff::Backoff;
38use user_driver::interrupt::DeviceInterrupt;
39use user_driver::memory::MemoryBlock;
40use vmcore::vm_task::VmTaskDriver;
41use vmcore::vm_task::VmTaskDriverSource;
42use zerocopy::FromBytes;
43use zerocopy::FromZeros;
44use zerocopy::IntoBytes;
45
46/// An NVMe driver.
47///
48/// Note that if this is dropped, the process will abort. Call
49/// [`NvmeDriver::shutdown`] to drop this.
50///
51/// Further, note that this is an internal interface to be used
52/// only by `NvmeDisk`! Remove any sanitization in `fuzz_nvm_driver.rs`
53/// if this struct is used anywhere else.
54#[derive(Inspect)]
55pub struct NvmeDriver<T: DeviceBacking> {
56    #[inspect(flatten)]
57    task: Option<TaskControl<DriverWorkerTask<T>, WorkerState>>,
58    device_id: String,
59    identify: Option<Arc<spec::IdentifyController>>,
60    #[inspect(skip)]
61    driver: VmTaskDriver,
62    #[inspect(skip)]
63    admin: Option<Arc<Issuer>>,
64    #[inspect(skip)]
65    io_issuers: Arc<IoIssuers>,
66    #[inspect(skip)]
67    rescan_event: Arc<event_listener::Event>,
68    /// NVMe namespaces associated with this driver.
69    #[inspect(skip)]
70    namespaces: Vec<Arc<Namespace>>,
71    /// Keeps the controller connected (CC.EN==1) while servicing.
72    nvme_keepalive: bool,
73    bounce_buffer: bool,
74}
75
76#[derive(Inspect)]
77struct DriverWorkerTask<T: DeviceBacking> {
78    device: T,
79    #[inspect(skip)]
80    driver: VmTaskDriver,
81    registers: Arc<DeviceRegisters<T>>,
82    admin: Option<QueuePair>,
83    #[inspect(iter_by_index)]
84    io: Vec<IoQueue>,
85    io_issuers: Arc<IoIssuers>,
86    #[inspect(skip)]
87    recv: mesh::Receiver<NvmeWorkerRequest>,
88    bounce_buffer: bool,
89}
90
91#[derive(Inspect)]
92struct WorkerState {
93    max_io_queues: u16,
94    qsize: u16,
95    #[inspect(skip)]
96    async_event_task: Task<()>,
97}
98
99/// An error restoring from saved state.
100#[derive(Debug, Error)]
101pub enum RestoreError {
102    #[error("invalid data")]
103    InvalidData,
104}
105
106#[derive(Inspect)]
107struct IoQueue {
108    queue: QueuePair,
109    iv: u16,
110    cpu: u32,
111}
112
113impl IoQueue {
114    pub async fn save(&self) -> anyhow::Result<IoQueueSavedState> {
115        Ok(IoQueueSavedState {
116            cpu: self.cpu,
117            iv: self.iv as u32,
118            queue_data: self.queue.save().await?,
119        })
120    }
121
122    pub fn restore(
123        spawner: VmTaskDriver,
124        interrupt: DeviceInterrupt,
125        registers: Arc<DeviceRegisters<impl DeviceBacking>>,
126        mem_block: MemoryBlock,
127        saved_state: &IoQueueSavedState,
128        bounce_buffer: bool,
129    ) -> anyhow::Result<Self> {
130        let IoQueueSavedState {
131            cpu,
132            iv,
133            queue_data,
134        } = saved_state;
135        let queue = QueuePair::restore(
136            spawner,
137            interrupt,
138            registers.clone(),
139            mem_block,
140            queue_data,
141            bounce_buffer,
142        )?;
143
144        Ok(Self {
145            queue,
146            iv: *iv as u16,
147            cpu: *cpu,
148        })
149    }
150}
151
152#[derive(Debug, Inspect)]
153#[inspect(transparent)]
154pub(crate) struct IoIssuers {
155    #[inspect(iter_by_index)]
156    per_cpu: Vec<OnceLock<IoIssuer>>,
157    #[inspect(skip)]
158    send: mesh::Sender<NvmeWorkerRequest>,
159}
160
161#[derive(Debug, Clone, Inspect)]
162struct IoIssuer {
163    #[inspect(flatten)]
164    issuer: Arc<Issuer>,
165    cpu: u32,
166}
167
168#[derive(Debug)]
169enum NvmeWorkerRequest {
170    CreateIssuer(Rpc<u32, ()>),
171    /// Save worker state.
172    Save(Rpc<(), anyhow::Result<NvmeDriverWorkerSavedState>>),
173}
174
175impl<T: DeviceBacking> NvmeDriver<T> {
176    /// Initializes the driver.
177    pub async fn new(
178        driver_source: &VmTaskDriverSource,
179        cpu_count: u32,
180        device: T,
181        bounce_buffer: bool,
182    ) -> anyhow::Result<Self> {
183        let pci_id = device.id().to_owned();
184        let mut this = Self::new_disabled(driver_source, cpu_count, device, bounce_buffer)
185            .instrument(tracing::info_span!("nvme_new_disabled", pci_id))
186            .await?;
187        match this
188            .enable(cpu_count as u16)
189            .instrument(tracing::info_span!("nvme_enable", pci_id))
190            .await
191        {
192            Ok(()) => Ok(this),
193            Err(err) => {
194                tracing::error!(
195                    error = err.as_ref() as &dyn std::error::Error,
196                    "device initialization failed, shutting down"
197                );
198                this.shutdown().await;
199                Err(err)
200            }
201        }
202    }
203
204    /// Initializes but does not enable the device. DMA memory
205    /// is preallocated from backing device.
206    async fn new_disabled(
207        driver_source: &VmTaskDriverSource,
208        cpu_count: u32,
209        mut device: T,
210        bounce_buffer: bool,
211    ) -> anyhow::Result<Self> {
212        let driver = driver_source.simple();
213        let bar0 = Bar0(
214            device
215                .map_bar(0)
216                .context("failed to map device registers")?,
217        );
218
219        let cc = bar0.cc();
220        if cc.en() || bar0.csts().rdy() {
221            if let Err(e) = bar0
222                .reset(&driver)
223                .instrument(tracing::info_span!(
224                    "nvme_already_enabled",
225                    pci_id = device.id().to_owned()
226                ))
227                .await
228            {
229                anyhow::bail!("device is gone, csts: {:#x}", e);
230            }
231        }
232
233        let registers = Arc::new(DeviceRegisters::new(bar0));
234        let cap = registers.cap;
235
236        if cap.mpsmin() != 0 {
237            anyhow::bail!(
238                "unsupported minimum page size: {}",
239                cap.mpsmin() + NVME_PAGE_SHIFT
240            );
241        }
242
243        let (send, recv) = mesh::channel();
244        let io_issuers = Arc::new(IoIssuers {
245            per_cpu: (0..cpu_count).map(|_| OnceLock::new()).collect(),
246            send,
247        });
248
249        Ok(Self {
250            device_id: device.id().to_owned(),
251            task: Some(TaskControl::new(DriverWorkerTask {
252                device,
253                driver: driver.clone(),
254                registers,
255                admin: None,
256                io: Vec::new(),
257                io_issuers: io_issuers.clone(),
258                recv,
259                bounce_buffer,
260            })),
261            admin: None,
262            identify: None,
263            driver,
264            io_issuers,
265            rescan_event: Default::default(),
266            namespaces: vec![],
267            nvme_keepalive: false,
268            bounce_buffer,
269        })
270    }
271
272    /// Enables the device, aliasing the admin queue memory and adding IO queues.
273    async fn enable(&mut self, requested_io_queue_count: u16) -> anyhow::Result<()> {
274        const ADMIN_QID: u16 = 0;
275
276        let task = &mut self.task.as_mut().unwrap();
277        let worker = task.task_mut();
278
279        // Request the admin queue pair be the same size to avoid potential
280        // device bugs where differing sizes might be a less common scenario
281        //
282        // Namely: using differing sizes revealed a bug in the initial NvmeDirectV2 implementation
283        let admin_len = std::cmp::min(QueuePair::MAX_SQ_ENTRIES, QueuePair::MAX_CQ_ENTRIES);
284        let admin_sqes = admin_len;
285        let admin_cqes = admin_len;
286
287        let interrupt0 = worker
288            .device
289            .map_interrupt(0, 0)
290            .context("failed to map interrupt 0")?;
291
292        // Start the admin queue pair.
293        let admin = QueuePair::new(
294            self.driver.clone(),
295            &worker.device,
296            ADMIN_QID,
297            admin_sqes,
298            admin_cqes,
299            interrupt0,
300            worker.registers.clone(),
301            self.bounce_buffer,
302        )
303        .context("failed to create admin queue pair")?;
304
305        let admin = worker.admin.insert(admin);
306
307        // Register the admin queue with the controller.
308        worker.registers.bar0.set_aqa(
309            spec::Aqa::new()
310                .with_acqs_z(admin_cqes - 1)
311                .with_asqs_z(admin_sqes - 1),
312        );
313        worker.registers.bar0.set_asq(admin.sq_addr());
314        worker.registers.bar0.set_acq(admin.cq_addr());
315
316        // Enable the controller.
317        let span = tracing::info_span!("nvme_ctrl_enable", pci_id = worker.device.id().to_owned());
318        let ctrl_enable_span = span.enter();
319        worker.registers.bar0.set_cc(
320            spec::Cc::new()
321                .with_iocqes(4)
322                .with_iosqes(6)
323                .with_en(true)
324                .with_mps(0),
325        );
326
327        // Wait for the controller to be ready.
328        let mut backoff = Backoff::new(&self.driver);
329        loop {
330            let csts = worker.registers.bar0.csts();
331            let csts_val: u32 = csts.into();
332            if csts_val == !0 {
333                anyhow::bail!("device is gone, csts: {:#x}", csts_val);
334            }
335            if csts.cfs() {
336                // Attempt to leave the device in reset state CC.EN 1 -> 0.
337                let after_reset = if let Err(e) = worker.registers.bar0.reset(&self.driver).await {
338                    e
339                } else {
340                    0
341                };
342                anyhow::bail!(
343                    "device had fatal error, csts: {:#x}, after reset: {:#}",
344                    csts_val,
345                    after_reset
346                );
347            }
348            if csts.rdy() {
349                break;
350            }
351            backoff.back_off().await;
352        }
353        drop(ctrl_enable_span);
354
355        // Get the controller identify structure.
356        let identify = self
357            .identify
358            .insert(Arc::new(spec::IdentifyController::new_zeroed()));
359
360        admin
361            .issuer()
362            .issue_out(
363                spec::Command {
364                    cdw10: spec::Cdw10Identify::new()
365                        .with_cns(spec::Cns::CONTROLLER.0)
366                        .into(),
367                    ..admin_cmd(spec::AdminOpcode::IDENTIFY)
368                },
369                Arc::get_mut(identify).unwrap().as_mut_bytes(),
370            )
371            .await
372            .context("failed to identify controller")?;
373
374        // Configure the number of IO queues.
375        //
376        // Note that interrupt zero is shared between IO queue 1 and the admin queue.
377        let max_interrupt_count = worker.device.max_interrupt_count();
378        if max_interrupt_count == 0 {
379            anyhow::bail!("bad device behavior: max_interrupt_count == 0");
380        }
381
382        let requested_io_queue_count = if max_interrupt_count < requested_io_queue_count as u32 {
383            tracing::warn!(
384                max_interrupt_count,
385                requested_io_queue_count,
386                "queue count constrained by msi count"
387            );
388            max_interrupt_count as u16
389        } else {
390            requested_io_queue_count
391        };
392
393        let completion = admin
394            .issuer()
395            .issue_neither(spec::Command {
396                cdw10: spec::Cdw10SetFeatures::new()
397                    .with_fid(spec::Feature::NUMBER_OF_QUEUES.0)
398                    .into(),
399                cdw11: spec::Cdw11FeatureNumberOfQueues::new()
400                    .with_ncq_z(requested_io_queue_count - 1)
401                    .with_nsq_z(requested_io_queue_count - 1)
402                    .into(),
403                ..admin_cmd(spec::AdminOpcode::SET_FEATURES)
404            })
405            .await
406            .context("failed to set number of queues")?;
407
408        // See how many queues are actually available.
409        let dw0 = spec::Cdw11FeatureNumberOfQueues::from(completion.dw0);
410        let sq_count = dw0.nsq_z() + 1;
411        let cq_count = dw0.ncq_z() + 1;
412        let allocated_io_queue_count = sq_count.min(cq_count);
413        if allocated_io_queue_count < requested_io_queue_count {
414            tracing::warn!(
415                sq_count,
416                cq_count,
417                requested_io_queue_count,
418                "queue count constrained by hardware queue count"
419            );
420        }
421
422        let max_io_queues = allocated_io_queue_count.min(requested_io_queue_count);
423
424        let qsize = {
425            if worker.registers.cap.mqes_z() < 1 {
426                anyhow::bail!("bad device behavior. mqes cannot be 0");
427            }
428
429            let io_cqsize = (QueuePair::MAX_CQ_ENTRIES - 1).min(worker.registers.cap.mqes_z()) + 1;
430            let io_sqsize = (QueuePair::MAX_SQ_ENTRIES - 1).min(worker.registers.cap.mqes_z()) + 1;
431
432            // Some hardware (such as ASAP) require that the sq and cq have the same size.
433            io_cqsize.min(io_sqsize)
434        };
435
436        // Spawn a task to handle asynchronous events.
437        let async_event_task = self.driver.spawn("nvme_async_event", {
438            let admin = admin.issuer().clone();
439            let rescan_event = self.rescan_event.clone();
440            async move {
441                if let Err(err) = handle_asynchronous_events(&admin, &rescan_event).await {
442                    tracing::error!(
443                        error = err.as_ref() as &dyn std::error::Error,
444                        "asynchronous event failure, not processing any more"
445                    );
446                }
447            }
448        });
449
450        let mut state = WorkerState {
451            qsize,
452            async_event_task,
453            max_io_queues,
454        };
455
456        self.admin = Some(admin.issuer().clone());
457
458        // Pre-create the IO queue 1 for CPU 0. The other queues will be created
459        // lazily. Numbering for I/O queues starts with 1 (0 is Admin).
460        let issuer = worker
461            .create_io_queue(&mut state, 0)
462            .await
463            .context("failed to create io queue 1")?;
464
465        self.io_issuers.per_cpu[0].set(issuer).unwrap();
466        task.insert(&self.driver, "nvme_worker", state);
467        task.start();
468        Ok(())
469    }
470
471    /// Shuts the device down.
472    pub async fn shutdown(mut self) {
473        // If nvme_keepalive was requested, return early.
474        // The memory is still aliased as we don't flush pending IOs.
475        if self.nvme_keepalive {
476            return;
477        }
478        self.reset().await;
479        drop(self);
480    }
481
482    fn reset(&mut self) -> impl Send + std::future::Future<Output = ()> + use<T> {
483        let driver = self.driver.clone();
484        let mut task = std::mem::take(&mut self.task).unwrap();
485        async move {
486            task.stop().await;
487            let (worker, state) = task.into_inner();
488            if let Some(state) = state {
489                state.async_event_task.cancel().await;
490            }
491            // Hold onto responses until the reset completes so that waiting IOs do
492            // not think the memory is unaliased by the device.
493            let _io_responses = join_all(worker.io.into_iter().map(|io| io.queue.shutdown())).await;
494            let _admin_responses;
495            if let Some(admin) = worker.admin {
496                _admin_responses = admin.shutdown().await;
497            }
498            if let Err(e) = worker.registers.bar0.reset(&driver).await {
499                tracing::info!(csts = e, "device reset failed");
500            }
501        }
502    }
503
504    /// Gets the namespace with namespace ID `nsid`.
505    pub async fn namespace(&self, nsid: u32) -> Result<Namespace, NamespaceError> {
506        Namespace::new(
507            &self.driver,
508            self.admin.as_ref().unwrap().clone(),
509            self.rescan_event.clone(),
510            self.identify.clone().unwrap(),
511            &self.io_issuers,
512            nsid,
513        )
514        .await
515    }
516
517    /// Returns the number of CPUs that are in fallback mode (that are using a
518    /// remote CPU's queue due to a failure or resource limitation).
519    pub fn fallback_cpu_count(&self) -> usize {
520        self.io_issuers
521            .per_cpu
522            .iter()
523            .enumerate()
524            .filter(|&(cpu, c)| c.get().is_some_and(|c| c.cpu != cpu as u32))
525            .count()
526    }
527
528    /// Saves the NVMe driver state during servicing.
529    pub async fn save(&mut self) -> anyhow::Result<NvmeDriverSavedState> {
530        // Nothing to save if Identify Controller was never queried.
531        if self.identify.is_none() {
532            return Err(save_restore::Error::InvalidState.into());
533        }
534        self.nvme_keepalive = true;
535        match self
536            .io_issuers
537            .send
538            .call(NvmeWorkerRequest::Save, ())
539            .await?
540        {
541            Ok(s) => {
542                // TODO: The decision is to re-query namespace data after the restore.
543                // Leaving the code in place so it can be restored in future.
544                // The reason is uncertainty about namespace change during servicing.
545                // ------
546                // for ns in &self.namespaces {
547                //     s.namespaces.push(ns.save()?);
548                // }
549                Ok(NvmeDriverSavedState {
550                    identify_ctrl: spec::IdentifyController::read_from_bytes(
551                        self.identify.as_ref().unwrap().as_bytes(),
552                    )
553                    .unwrap(),
554                    device_id: self.device_id.clone(),
555                    // TODO: See the description above, save the vector once resolved.
556                    namespaces: vec![],
557                    worker_data: s,
558                })
559            }
560            Err(e) => Err(e),
561        }
562    }
563
564    /// Restores NVMe driver state after servicing.
565    pub async fn restore(
566        driver_source: &VmTaskDriverSource,
567        cpu_count: u32,
568        mut device: T,
569        saved_state: &NvmeDriverSavedState,
570        bounce_buffer: bool,
571    ) -> anyhow::Result<Self> {
572        let driver = driver_source.simple();
573        let bar0_mapping = device
574            .map_bar(0)
575            .context("failed to map device registers")?;
576        let bar0 = Bar0(bar0_mapping);
577
578        // It is expected for the device to be alive when restoring.
579        let csts = bar0.csts();
580        if !csts.rdy() {
581            anyhow::bail!(
582                "device is not ready during restore, csts: {:#x}",
583                u32::from(csts)
584            );
585        }
586
587        let registers = Arc::new(DeviceRegisters::new(bar0));
588
589        let (send, recv) = mesh::channel();
590        let io_issuers = Arc::new(IoIssuers {
591            per_cpu: (0..cpu_count).map(|_| OnceLock::new()).collect(),
592            send,
593        });
594
595        let mut this = Self {
596            device_id: device.id().to_owned(),
597            task: Some(TaskControl::new(DriverWorkerTask {
598                device,
599                driver: driver.clone(),
600                registers: registers.clone(),
601                admin: None, // Updated below.
602                io: Vec::new(),
603                io_issuers: io_issuers.clone(),
604                recv,
605                bounce_buffer,
606            })),
607            admin: None, // Updated below.
608            identify: Some(Arc::new(
609                spec::IdentifyController::read_from_bytes(saved_state.identify_ctrl.as_bytes())
610                    .map_err(|_| RestoreError::InvalidData)?, // TODO: zerocopy: map_err (https://github.com/microsoft/openvmm/issues/759)
611            )),
612            driver: driver.clone(),
613            io_issuers,
614            rescan_event: Default::default(),
615            namespaces: vec![],
616            nvme_keepalive: true,
617            bounce_buffer,
618        };
619
620        let task = &mut this.task.as_mut().unwrap();
621        let worker = task.task_mut();
622
623        // Interrupt 0 is shared between admin queue and I/O queue 1.
624        let interrupt0 = worker
625            .device
626            .map_interrupt(0, 0)
627            .context("failed to map interrupt 0")?;
628
629        let dma_client = worker.device.dma_client();
630        let restored_memory = dma_client
631            .attach_pending_buffers()
632            .context("failed to restore allocations")?;
633
634        // Restore the admin queue pair.
635        let admin = saved_state
636            .worker_data
637            .admin
638            .as_ref()
639            .map(|a| {
640                // Restore memory block for admin queue pair.
641                let mem_block = restored_memory
642                    .iter()
643                    .find(|mem| mem.len() == a.mem_len && a.base_pfn == mem.pfns()[0])
644                    .expect("unable to find restored mem block")
645                    .to_owned();
646                QueuePair::restore(
647                    driver.clone(),
648                    interrupt0,
649                    registers.clone(),
650                    mem_block,
651                    a,
652                    bounce_buffer,
653                )
654                .unwrap()
655            })
656            .unwrap();
657
658        let admin = worker.admin.insert(admin);
659
660        // Spawn a task to handle asynchronous events.
661        let async_event_task = this.driver.spawn("nvme_async_event", {
662            let admin = admin.issuer().clone();
663            let rescan_event = this.rescan_event.clone();
664            async move {
665                if let Err(err) = handle_asynchronous_events(&admin, &rescan_event).await {
666                    tracing::error!(
667                        error = err.as_ref() as &dyn std::error::Error,
668                        "asynchronous event failure, not processing any more"
669                    );
670                }
671            }
672        });
673
674        let state = WorkerState {
675            qsize: saved_state.worker_data.qsize,
676            async_event_task,
677            max_io_queues: saved_state.worker_data.max_io_queues,
678        };
679
680        this.admin = Some(admin.issuer().clone());
681
682        // Restore I/O queues.
683        // Interrupt vector 0 is shared between Admin queue and I/O queue #1.
684        worker.io = saved_state
685            .worker_data
686            .io
687            .iter()
688            .flat_map(|q| -> Result<IoQueue, anyhow::Error> {
689                let interrupt = worker
690                    .device
691                    .map_interrupt(q.iv, q.cpu)
692                    .context("failed to map interrupt")?;
693                let mem_block = restored_memory
694                    .iter()
695                    .find(|mem| {
696                        mem.len() == q.queue_data.mem_len && q.queue_data.base_pfn == mem.pfns()[0]
697                    })
698                    .expect("unable to find restored mem block")
699                    .to_owned();
700                let q = IoQueue::restore(
701                    driver.clone(),
702                    interrupt,
703                    registers.clone(),
704                    mem_block,
705                    q,
706                    bounce_buffer,
707                )?;
708                let issuer = IoIssuer {
709                    issuer: q.queue.issuer().clone(),
710                    cpu: q.cpu,
711                };
712                this.io_issuers.per_cpu[q.cpu as usize].set(issuer).unwrap();
713                Ok(q)
714            })
715            .collect();
716
717        // Restore namespace(s).
718        for ns in &saved_state.namespaces {
719            // TODO: Current approach is to re-query namespace data after servicing
720            // and this array will be empty. Once we confirm that we can process
721            // namespace change notification AEN, the restore code will be re-added.
722            this.namespaces.push(Arc::new(Namespace::restore(
723                &driver,
724                admin.issuer().clone(),
725                this.rescan_event.clone(),
726                this.identify.clone().unwrap(),
727                &this.io_issuers,
728                ns,
729            )?));
730        }
731
732        task.insert(&this.driver, "nvme_worker", state);
733        task.start();
734
735        Ok(this)
736    }
737
738    /// Change device's behavior when servicing.
739    pub fn update_servicing_flags(&mut self, nvme_keepalive: bool) {
740        self.nvme_keepalive = nvme_keepalive;
741    }
742}
743
744async fn handle_asynchronous_events(
745    admin: &Issuer,
746    rescan_event: &event_listener::Event,
747) -> anyhow::Result<()> {
748    loop {
749        let completion = admin
750            .issue_neither(admin_cmd(spec::AdminOpcode::ASYNCHRONOUS_EVENT_REQUEST))
751            .await
752            .context("asynchronous event request failed")?;
753
754        let dw0 = spec::AsynchronousEventRequestDw0::from(completion.dw0);
755        match spec::AsynchronousEventType(dw0.event_type()) {
756            spec::AsynchronousEventType::NOTICE => {
757                tracing::info!("namespace attribute change event");
758
759                // Clear the namespace list.
760                let mut list = [0u32; 1024];
761                admin
762                    .issue_out(
763                        spec::Command {
764                            cdw10: spec::Cdw10GetLogPage::new()
765                                .with_lid(spec::LogPageIdentifier::CHANGED_NAMESPACE_LIST.0)
766                                .with_numdl_z(1023)
767                                .into(),
768                            ..admin_cmd(spec::AdminOpcode::GET_LOG_PAGE)
769                        },
770                        list.as_mut_bytes(),
771                    )
772                    .await
773                    .context("failed to query changed namespace list")?;
774
775                if list[0] != 0 {
776                    // For simplicity, tell all namespaces to rescan.
777                    rescan_event.notify(usize::MAX);
778                }
779            }
780            event_type => {
781                tracing::info!(
782                    ?event_type,
783                    information = dw0.information(),
784                    log_page_identifier = dw0.log_page_identifier(),
785                    "unhandled asynchronous event"
786                );
787            }
788        }
789    }
790}
791
792impl<T: DeviceBacking> Drop for NvmeDriver<T> {
793    fn drop(&mut self) {
794        if self.task.is_some() {
795            // Do not reset NVMe device when nvme_keepalive is requested.
796            if !self.nvme_keepalive {
797                // Reset the device asynchronously so that pending IOs are not
798                // dropped while their memory is aliased.
799                let reset = self.reset();
800                self.driver.spawn("nvme_drop", reset).detach();
801            }
802        }
803    }
804}
805
806impl IoIssuers {
807    pub async fn get(&self, cpu: u32) -> Result<&Issuer, RequestError> {
808        if let Some(v) = self.per_cpu[cpu as usize].get() {
809            return Ok(&v.issuer);
810        }
811
812        self.send
813            .call(NvmeWorkerRequest::CreateIssuer, cpu)
814            .await
815            .map_err(RequestError::Gone)?;
816
817        Ok(self.per_cpu[cpu as usize]
818            .get()
819            .expect("issuer was set by rpc")
820            .issuer
821            .as_ref())
822    }
823}
824
825impl<T: DeviceBacking> AsyncRun<WorkerState> for DriverWorkerTask<T> {
826    async fn run(
827        &mut self,
828        stop: &mut task_control::StopTask<'_>,
829        state: &mut WorkerState,
830    ) -> Result<(), task_control::Cancelled> {
831        stop.until_stopped(async {
832            loop {
833                match self.recv.next().await {
834                    Some(NvmeWorkerRequest::CreateIssuer(rpc)) => {
835                        rpc.handle(async |cpu| self.create_io_issuer(state, cpu).await)
836                            .await
837                    }
838                    Some(NvmeWorkerRequest::Save(rpc)) => {
839                        rpc.handle(async |_| self.save(state).await).await
840                    }
841                    None => break,
842                }
843            }
844        })
845        .await
846    }
847}
848
849impl<T: DeviceBacking> DriverWorkerTask<T> {
850    async fn create_io_issuer(&mut self, state: &mut WorkerState, cpu: u32) {
851        tracing::debug!(cpu, "issuer request");
852        if self.io_issuers.per_cpu[cpu as usize].get().is_some() {
853            return;
854        }
855
856        let issuer = match self
857            .create_io_queue(state, cpu)
858            .instrument(info_span!("create_nvme_io_queue", cpu))
859            .await
860        {
861            Ok(issuer) => issuer,
862            Err(err) => {
863                // Find a fallback queue close in index to the failed queue.
864                let (fallback_cpu, fallback) = self.io_issuers.per_cpu[..cpu as usize]
865                    .iter()
866                    .enumerate()
867                    .rev()
868                    .find_map(|(i, issuer)| issuer.get().map(|issuer| (i, issuer)))
869                    .unwrap();
870
871                tracing::error!(
872                    cpu,
873                    fallback_cpu,
874                    error = err.as_ref() as &dyn std::error::Error,
875                    "failed to create io queue, falling back"
876                );
877                fallback.clone()
878            }
879        };
880
881        self.io_issuers.per_cpu[cpu as usize]
882            .set(issuer)
883            .ok()
884            .unwrap();
885    }
886
887    async fn create_io_queue(
888        &mut self,
889        state: &mut WorkerState,
890        cpu: u32,
891    ) -> anyhow::Result<IoIssuer> {
892        if self.io.len() >= state.max_io_queues as usize {
893            anyhow::bail!("no more io queues available");
894        }
895
896        let qid = self.io.len() as u16 + 1;
897
898        tracing::debug!(cpu, qid, "creating io queue");
899
900        // Share IO queue 1's interrupt with the admin queue.
901        let iv = self.io.len() as u16;
902        let interrupt = self
903            .device
904            .map_interrupt(iv.into(), cpu)
905            .context("failed to map interrupt")?;
906
907        let queue = QueuePair::new(
908            self.driver.clone(),
909            &self.device,
910            qid,
911            state.qsize,
912            state.qsize,
913            interrupt,
914            self.registers.clone(),
915            self.bounce_buffer,
916        )
917        .with_context(|| format!("failed to create io queue pair {qid}"))?;
918
919        let io_sq_addr = queue.sq_addr();
920        let io_cq_addr = queue.cq_addr();
921
922        // Add the queue pair before aliasing its memory with the device so
923        // that it can be torn down correctly on failure.
924        self.io.push(IoQueue { queue, iv, cpu });
925        let io_queue = self.io.last_mut().unwrap();
926
927        let admin = self.admin.as_ref().unwrap().issuer().as_ref();
928
929        let mut created_completion_queue = false;
930        let r = async {
931            admin
932                .issue_raw(spec::Command {
933                    cdw10: spec::Cdw10CreateIoQueue::new()
934                        .with_qid(qid)
935                        .with_qsize_z(state.qsize - 1)
936                        .into(),
937                    cdw11: spec::Cdw11CreateIoCompletionQueue::new()
938                        .with_ien(true)
939                        .with_iv(iv)
940                        .with_pc(true)
941                        .into(),
942                    dptr: [io_cq_addr, 0],
943                    ..admin_cmd(spec::AdminOpcode::CREATE_IO_COMPLETION_QUEUE)
944                })
945                .await
946                .with_context(|| format!("failed to create io completion queue {qid}"))?;
947
948            created_completion_queue = true;
949
950            admin
951                .issue_raw(spec::Command {
952                    cdw10: spec::Cdw10CreateIoQueue::new()
953                        .with_qid(qid)
954                        .with_qsize_z(state.qsize - 1)
955                        .into(),
956                    cdw11: spec::Cdw11CreateIoSubmissionQueue::new()
957                        .with_cqid(qid)
958                        .with_pc(true)
959                        .into(),
960                    dptr: [io_sq_addr, 0],
961                    ..admin_cmd(spec::AdminOpcode::CREATE_IO_SUBMISSION_QUEUE)
962                })
963                .await
964                .with_context(|| format!("failed to create io submission queue {qid}"))?;
965
966            Ok(())
967        };
968
969        if let Err(err) = r.await {
970            if created_completion_queue {
971                if let Err(err) = admin
972                    .issue_raw(spec::Command {
973                        cdw10: spec::Cdw10DeleteIoQueue::new().with_qid(qid).into(),
974                        ..admin_cmd(spec::AdminOpcode::DELETE_IO_COMPLETION_QUEUE)
975                    })
976                    .await
977                {
978                    tracing::error!(
979                        error = &err as &dyn std::error::Error,
980                        "failed to delete completion queue in teardown path"
981                    );
982                }
983            }
984            let io = self.io.pop().unwrap();
985            io.queue.shutdown().await;
986            return Err(err);
987        }
988
989        Ok(IoIssuer {
990            issuer: io_queue.queue.issuer().clone(),
991            cpu,
992        })
993    }
994
995    /// Save NVMe driver state for servicing.
996    pub async fn save(
997        &mut self,
998        worker_state: &mut WorkerState,
999    ) -> anyhow::Result<NvmeDriverWorkerSavedState> {
1000        let admin = match self.admin.as_ref() {
1001            Some(a) => Some(a.save().await?),
1002            None => None,
1003        };
1004
1005        let io = join_all(self.io.drain(..).map(async |q| q.save().await))
1006            .await
1007            .into_iter()
1008            .flatten()
1009            .collect();
1010
1011        Ok(NvmeDriverWorkerSavedState {
1012            admin,
1013            io,
1014            qsize: worker_state.qsize,
1015            max_io_queues: worker_state.max_io_queues,
1016        })
1017    }
1018}
1019
1020impl<T: DeviceBacking> InspectTask<WorkerState> for DriverWorkerTask<T> {
1021    fn inspect(&self, req: inspect::Request<'_>, state: Option<&WorkerState>) {
1022        req.respond().merge(self).merge(state);
1023    }
1024}
1025
1026pub mod save_restore {
1027    use super::*;
1028
1029    /// Save and Restore errors for this module.
1030    #[derive(Debug, Error)]
1031    pub enum Error {
1032        /// No data to save.
1033        #[error("invalid object state")]
1034        InvalidState,
1035    }
1036
1037    /// Save/restore state for NVMe driver.
1038    #[derive(Protobuf, Clone, Debug)]
1039    #[mesh(package = "nvme_driver")]
1040    pub struct NvmeDriverSavedState {
1041        /// Copy of the controller's IDENTIFY structure.
1042        /// It is defined as Option<> in original structure.
1043        #[mesh(1, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
1044        pub identify_ctrl: spec::IdentifyController,
1045        /// Device ID string.
1046        #[mesh(2)]
1047        pub device_id: String,
1048        /// Namespace data.
1049        #[mesh(3)]
1050        pub namespaces: Vec<SavedNamespaceData>,
1051        /// NVMe driver worker task data.
1052        #[mesh(4)]
1053        pub worker_data: NvmeDriverWorkerSavedState,
1054    }
1055
1056    /// Save/restore state for NVMe driver worker task.
1057    #[derive(Protobuf, Clone, Debug)]
1058    #[mesh(package = "nvme_driver")]
1059    pub struct NvmeDriverWorkerSavedState {
1060        /// Admin queue state.
1061        #[mesh(1)]
1062        pub admin: Option<QueuePairSavedState>,
1063        /// IO queue states.
1064        #[mesh(2)]
1065        pub io: Vec<IoQueueSavedState>,
1066        /// Queue size as determined by CAP.MQES.
1067        #[mesh(3)]
1068        pub qsize: u16,
1069        /// Max number of IO queue pairs.
1070        #[mesh(4)]
1071        pub max_io_queues: u16,
1072    }
1073
1074    /// Save/restore state for QueuePair.
1075    #[derive(Protobuf, Clone, Debug)]
1076    #[mesh(package = "nvme_driver")]
1077    pub struct QueuePairSavedState {
1078        /// Allocated memory size in bytes.
1079        #[mesh(1)]
1080        pub mem_len: usize,
1081        /// First PFN of the physically contiguous block.
1082        #[mesh(2)]
1083        pub base_pfn: u64,
1084        /// Queue ID used when creating the pair
1085        /// (SQ and CQ IDs are using same number).
1086        #[mesh(3)]
1087        pub qid: u16,
1088        /// Submission queue entries.
1089        #[mesh(4)]
1090        pub sq_entries: u16,
1091        /// Completion queue entries.
1092        #[mesh(5)]
1093        pub cq_entries: u16,
1094        /// QueueHandler task data.
1095        #[mesh(6)]
1096        pub handler_data: QueueHandlerSavedState,
1097    }
1098
1099    /// Save/restore state for IoQueue.
1100    #[derive(Protobuf, Clone, Debug)]
1101    #[mesh(package = "nvme_driver")]
1102    pub struct IoQueueSavedState {
1103        #[mesh(1)]
1104        /// Which CPU handles requests.
1105        pub cpu: u32,
1106        #[mesh(2)]
1107        /// Interrupt vector (MSI-X)
1108        pub iv: u32,
1109        #[mesh(3)]
1110        pub queue_data: QueuePairSavedState,
1111    }
1112
1113    /// Save/restore state for QueueHandler task.
1114    #[derive(Protobuf, Clone, Debug)]
1115    #[mesh(package = "nvme_driver")]
1116    pub struct QueueHandlerSavedState {
1117        #[mesh(1)]
1118        pub sq_state: SubmissionQueueSavedState,
1119        #[mesh(2)]
1120        pub cq_state: CompletionQueueSavedState,
1121        #[mesh(3)]
1122        pub pending_cmds: PendingCommandsSavedState,
1123    }
1124
1125    #[derive(Protobuf, Clone, Debug)]
1126    #[mesh(package = "nvme_driver")]
1127    pub struct SubmissionQueueSavedState {
1128        #[mesh(1)]
1129        pub sqid: u16,
1130        #[mesh(2)]
1131        pub head: u32,
1132        #[mesh(3)]
1133        pub tail: u32,
1134        #[mesh(4)]
1135        pub committed_tail: u32,
1136        #[mesh(5)]
1137        pub len: u32,
1138    }
1139
1140    #[derive(Protobuf, Clone, Debug)]
1141    #[mesh(package = "nvme_driver")]
1142    pub struct CompletionQueueSavedState {
1143        #[mesh(1)]
1144        pub cqid: u16,
1145        #[mesh(2)]
1146        pub head: u32,
1147        #[mesh(3)]
1148        pub committed_head: u32,
1149        #[mesh(4)]
1150        pub len: u32,
1151        #[mesh(5)]
1152        /// NVMe completion tag.
1153        pub phase: bool,
1154    }
1155
1156    #[derive(Protobuf, Clone, Debug)]
1157    #[mesh(package = "nvme_driver")]
1158    pub struct PendingCommandSavedState {
1159        #[mesh(1, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
1160        pub command: spec::Command,
1161    }
1162
1163    #[derive(Protobuf, Clone, Debug)]
1164    #[mesh(package = "nvme_driver")]
1165    pub struct PendingCommandsSavedState {
1166        #[mesh(1)]
1167        pub commands: Vec<PendingCommandSavedState>,
1168        #[mesh(2)]
1169        pub next_cid_high_bits: u16,
1170        #[mesh(3)]
1171        pub cid_key_bits: u32,
1172    }
1173
1174    /// NVMe namespace data.
1175    #[derive(Protobuf, Clone, Debug)]
1176    #[mesh(package = "nvme_driver")]
1177    pub struct SavedNamespaceData {
1178        #[mesh(1)]
1179        pub nsid: u32,
1180        #[mesh(2, encoding = "mesh::payload::encoding::ZeroCopyEncoding")]
1181        pub identify_ns: nvme_spec::nvm::IdentifyNamespace,
1182    }
1183}