1use super::mappable::Mappable;
9use super::object_cache::ObjectCache;
10use super::object_cache::ObjectId;
11use super::va_mapper::VaMapper;
12use super::va_mapper::VaMapperError;
13use crate::RemoteProcess;
14use crate::region_manager::MappingType;
15use futures::StreamExt;
16use futures::future::join_all;
17use guestmem::ProvideShareableRegions;
18use guestmem::ShareableRegion;
19use inspect::Inspect;
20use inspect::InspectMut;
21use memory_range::MemoryRange;
22use mesh::MeshPayload;
23use mesh::error::RemoteError;
24use mesh::rpc::FailableRpc;
25use mesh::rpc::Rpc;
26use mesh::rpc::RpcSend;
27use pal_async::task::Spawn;
28use slab::Slab;
29use std::sync::Arc;
30use thiserror::Error;
31
32#[derive(Debug, Inspect)]
34pub struct MappingManager {
35 #[inspect(
36 flatten,
37 with = "|x| inspect::send(&x.req_send, MappingRequest::Inspect)"
38 )]
39 client: MappingManagerClient,
40}
41
42impl MappingManager {
43 pub fn new(
49 spawn: impl Spawn,
50 max_addr: u64,
51 private_ranges: Vec<MemoryRange>,
52 minimum_va_alignment: Option<usize>,
53 ) -> Self {
54 let (req_send, mut req_recv) = mesh::mpsc_channel();
55 spawn
56 .spawn("mapping_manager", {
57 let mut task = MappingManagerTask::new();
58 async move {
59 task.run(&mut req_recv).await;
60 }
61 })
62 .detach();
63 Self {
64 client: MappingManagerClient {
65 id: ObjectId::new(),
66 req_send,
67 max_addr,
68 private_ranges,
69 minimum_va_alignment,
70 },
71 }
72 }
73
74 pub fn client(&self) -> &MappingManagerClient {
77 &self.client
78 }
79}
80
81#[derive(Debug, MeshPayload, Clone)]
83pub struct MappingManagerClient {
84 req_send: mesh::Sender<MappingRequest>,
85 id: ObjectId,
86 max_addr: u64,
87 private_ranges: Vec<MemoryRange>,
88 minimum_va_alignment: Option<usize>,
89}
90
91static MAPPER_CACHE: ObjectCache<VaMapper> = ObjectCache::new();
92
93impl MappingManagerClient {
94 pub async fn new_mapper(&self, eager: bool) -> Result<Arc<VaMapper>, VaMapperError> {
112 let mapper = MAPPER_CACHE
113 .get_or_insert_with(&self.id, async {
114 assert!(
115 eager || self.private_ranges.is_empty(),
116 "lazy mappers are not supported with private memory"
117 );
118 VaMapper::new(
119 self.req_send.clone(),
120 self.max_addr,
121 None,
122 self.private_ranges.clone(),
123 self.minimum_va_alignment,
124 eager,
125 )
126 .await
127 })
128 .await?;
129
130 if eager && !mapper.is_eager() {
133 self.req_send
134 .call(MappingRequest::UpgradeToEager, mapper.mapper_id())
135 .await
136 .map_err(VaMapperError::MemoryManagerGone)?
137 .map_err(VaMapperError::Registration)?;
138 }
139
140 Ok(mapper)
141 }
142
143 pub async fn new_remote_mapper(
152 &self,
153 process: RemoteProcess,
154 ) -> Result<Arc<VaMapper>, VaMapperError> {
155 if !self.private_ranges.is_empty() {
156 return Err(VaMapperError::RemoteWithPrivateMemory);
157 }
158 Ok(Arc::new(
159 VaMapper::new(
160 self.req_send.clone(),
161 self.max_addr,
162 Some(process),
163 Vec::new(),
164 self.minimum_va_alignment,
165 true, )
167 .await?,
168 ))
169 }
170
171 pub async fn add_mapping(&self, params: MappingParams) -> anyhow::Result<()> {
180 self.req_send
181 .call_failable(MappingRequest::AddMapping, params)
182 .await?;
183 Ok(())
184 }
185
186 pub async fn remove_mappings(&self, range: MemoryRange) {
190 self.req_send
191 .call(MappingRequest::RemoveMappings, range)
192 .await
193 .unwrap();
194 }
195}
196
197#[derive(MeshPayload)]
199pub struct AddMapperParams {
200 pub send: mesh::Sender<MapperRequest>,
202 pub eager: bool,
205}
206
207#[derive(MeshPayload)]
209pub enum MappingRequest {
210 AddMapper(FailableRpc<AddMapperParams, MapperId>),
212 RemoveMapper(MapperId),
213 SendMappings(MapperId, MemoryRange),
217 UpgradeToEager(FailableRpc<MapperId, ()>),
220 AddMapping(FailableRpc<MappingParams, ()>),
221 RemoveMappings(Rpc<MemoryRange, ()>),
222 GetDmaTargetMappings(Rpc<(), Vec<MappingParams>>),
224 Inspect(inspect::Deferred),
225}
226
227#[derive(InspectMut)]
228struct MappingManagerTask {
229 #[inspect(with = "inspect_mappings")]
230 mappings: Vec<Mapping>,
231 #[inspect(skip)]
232 mappers: Mappers,
233}
234
235fn inspect_mappings(mappings: &Vec<Mapping>) -> impl '_ + Inspect {
236 inspect::adhoc(move |req| {
237 let mut resp = req.respond();
238 for mapping in mappings {
239 resp.field(
240 &mapping.params.range.to_string(),
241 inspect::adhoc(|req| {
242 req.respond()
243 .field("writable", mapping.params.writable)
244 .field("mapping_type", mapping.params.mapping_type)
245 .field("backed_by_fd", mapping.params.backing.mappable().is_some())
246 .hex("file_offset", mapping.params.backing.file_offset());
247 }),
248 );
249 }
250 })
251}
252
253struct Mapping {
254 params: MappingParams,
255 active_mappers: Vec<MapperId>,
256}
257
258#[derive(Debug, MeshPayload, Clone)]
260pub enum MappingBacking {
261 File {
266 mappable: Mappable,
268 file_offset: u64,
270 },
271 Private {
288 transparent_hugepages: bool,
290 },
291}
292
293impl MappingBacking {
294 pub fn mappable(&self) -> Option<&Mappable> {
296 match self {
297 MappingBacking::File { mappable, .. } => Some(mappable),
298 MappingBacking::Private { .. } => None,
299 }
300 }
301
302 pub fn file_offset(&self) -> u64 {
304 match self {
305 MappingBacking::File { file_offset, .. } => *file_offset,
306 MappingBacking::Private { .. } => 0,
307 }
308 }
309}
310
311#[derive(Debug, MeshPayload, Clone)]
313pub struct MappingParams {
314 pub range: MemoryRange,
316 pub backing: MappingBacking,
318 pub writable: bool,
320 pub mapping_type: MappingType,
326 pub numa_node: Option<u32>,
328}
329
330#[derive(Debug, Error)]
332#[error("failed to map {range}")]
333pub struct MappingError {
334 pub range: MemoryRange,
336 #[source]
338 pub error: std::io::Error,
339}
340
341impl MappingError {
342 pub(crate) fn new(range: MemoryRange, error: std::io::Error) -> Self {
343 Self { range, error }
344 }
345}
346
347struct Mappers {
348 mappers: Slab<MapperComm>,
349}
350
351struct MapperComm {
352 req_send: mesh::Sender<MapperRequest>,
353 eager: bool,
354}
355
356#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, MeshPayload)]
357pub struct MapperId(pub(crate) usize);
358
359#[derive(Debug, MeshPayload)]
361pub enum MapperRequest {
362 MapEager(Rpc<MappingParams, Result<(), RemoteError>>),
365 MapLazy(MappingParams),
369 NoMapping(MemoryRange),
372 Unmap(Rpc<MemoryRange, ()>),
374 SetEager(Rpc<(), ()>),
378}
379
380impl MappingManagerTask {
381 fn new() -> Self {
382 Self {
383 mappers: Mappers {
384 mappers: Slab::new(),
385 },
386 mappings: Vec::new(),
387 }
388 }
389
390 async fn run(&mut self, req_recv: &mut mesh::Receiver<MappingRequest>) {
391 while let Some(req) = req_recv.next().await {
392 match req {
393 MappingRequest::AddMapper(rpc) => {
394 rpc.handle_failable(async |params: AddMapperParams| {
395 self.add_mapper(params.send, params.eager).await
396 })
397 .await
398 }
399 MappingRequest::RemoveMapper(id) => {
400 self.remove_mapper(id);
401 }
402 MappingRequest::SendMappings(id, range) => {
403 self.send_mappings(id, range);
404 }
405 MappingRequest::UpgradeToEager(rpc) => {
406 rpc.handle_failable(async |id| self.upgrade_to_eager(id).await)
407 .await
408 }
409 MappingRequest::AddMapping(rpc) => {
410 rpc.handle(async |params| self.add_mapping(params).await)
411 .await
412 }
413 MappingRequest::RemoveMappings(rpc) => {
414 rpc.handle(async |range| self.remove_mappings(range).await)
415 .await
416 }
417 MappingRequest::GetDmaTargetMappings(rpc) => {
418 rpc.handle_sync(|()| self.get_dma_target_mappings())
419 }
420 MappingRequest::Inspect(deferred) => deferred.inspect(&mut *self),
421 }
422 }
423 }
424
425 async fn add_mapper(
426 &mut self,
427 req_send: mesh::Sender<MapperRequest>,
428 eager: bool,
429 ) -> Result<MapperId, MappingError> {
430 let id = self.mappers.mappers.insert(MapperComm { req_send, eager });
431 let mapper_id = MapperId(id);
432 tracing::debug!(?id, eager, "adding mapper");
433
434 if eager {
435 let mut failed = None;
437 for mapping in &mut self.mappings {
438 match self.mappers.mappers[id]
439 .req_send
440 .call(MapperRequest::MapEager, mapping.params.clone())
441 .await
442 {
443 Ok(Ok(())) => {
444 mapping.active_mappers.push(mapper_id);
445 }
446 Ok(Err(e)) => {
447 failed = Some(MappingError::new(
448 mapping.params.range,
449 std::io::Error::other(e),
450 ));
451 break;
452 }
453 Err(_) => {
454 failed = Some(MappingError::new(
455 MemoryRange::EMPTY,
456 std::io::Error::other("mapper gone during replay"),
457 ));
458 break;
459 }
460 }
461 }
462 if let Some(err) = failed {
463 self.remove_mapper(mapper_id);
464 return Err(err);
465 }
466 }
467
468 Ok(mapper_id)
469 }
470
471 fn remove_mapper(&mut self, id: MapperId) {
472 tracing::debug!(?id, "removing mapper");
473 self.mappers.mappers.remove(id.0);
474 for mapping in &mut self.mappings {
475 mapping.active_mappers.retain(|m| m != &id);
476 }
477 }
478
479 async fn upgrade_to_eager(&mut self, id: MapperId) -> Result<(), MappingError> {
482 let mapper = &mut self.mappers.mappers[id.0];
483 if mapper.eager {
484 return Ok(()); }
486 mapper.eager = true;
495 tracing::debug!(?id, "upgrading mapper to eager");
496
497 let mut failed = None;
498 for mapping in &mut self.mappings {
499 if mapping.active_mappers.contains(&id) {
501 continue;
502 }
503 match self.mappers.mappers[id.0]
504 .req_send
505 .call(MapperRequest::MapEager, mapping.params.clone())
506 .await
507 {
508 Ok(Ok(())) => {
509 mapping.active_mappers.push(id);
510 }
511 Ok(Err(e)) => {
512 failed = Some(MappingError::new(
513 mapping.params.range,
514 std::io::Error::other(e),
515 ));
516 break;
517 }
518 Err(_) => {
519 failed = Some(MappingError::new(
520 MemoryRange::EMPTY,
521 std::io::Error::other("mapper gone during eager upgrade"),
522 ));
523 break;
524 }
525 }
526 }
527
528 if let Some(err) = failed {
529 self.mappers.mappers[id.0].eager = false;
534 return Err(err);
535 }
536
537 self.mappers.mappers[id.0]
541 .req_send
542 .call(MapperRequest::SetEager, ())
543 .await
544 .ok();
545
546 Ok(())
547 }
548
549 fn send_mappings(&mut self, id: MapperId, mut range: MemoryRange) {
555 while !range.is_empty() {
556 let (this_end, params) = if let Some(mapping) = self
558 .mappings
559 .iter_mut()
560 .filter(|mapping| mapping.params.range.overlaps(&range))
561 .min_by_key(|mapping| mapping.params.range.start())
562 {
563 if mapping.params.range.start() <= range.start() {
564 if !mapping.active_mappers.contains(&id) {
565 mapping.active_mappers.push(id);
566 }
567 (
569 mapping.params.range.end().min(range.end()),
570 Some(mapping.params.clone()),
571 )
572 } else {
573 (mapping.params.range.start(), None)
575 }
576 } else {
577 (range.end(), None)
579 };
580 let this_range = MemoryRange::new(range.start()..this_end);
581 let req = if let Some(params) = params {
582 tracing::debug!(range = %this_range, full_range = %params.range, "sending lazy mapping");
583 MapperRequest::MapLazy(params)
584 } else {
585 tracing::debug!(range = %this_range, "no mapping for range");
586 MapperRequest::NoMapping(this_range)
587 };
588 self.mappers.mappers[id.0].req_send.send(req);
589 range = MemoryRange::new(this_end..range.end());
590 }
591 }
592
593 async fn add_mapping(&mut self, params: MappingParams) -> Result<(), RemoteError> {
594 tracing::debug!(range = %params.range, "adding mapping");
595
596 assert!(!self.mappings.iter().any(|m| m.params.range == params.range));
597
598 let mut active_mappers = Vec::new();
600 for (i, mapper) in self.mappers.mappers.iter() {
601 if !mapper.eager {
602 continue;
603 }
604 let id = MapperId(i);
605 match mapper
606 .req_send
607 .call(MapperRequest::MapEager, params.clone())
608 .await
609 {
610 Ok(Ok(())) => {
611 active_mappers.push(id);
612 }
613 Ok(Err(e)) => {
614 for &rollback_id in &active_mappers {
617 if let Err(err) = self.mappers.mappers[rollback_id.0]
618 .req_send
619 .call(MapperRequest::Unmap, params.range)
620 .await
621 {
622 tracing::warn!(
623 error = &err as &dyn std::error::Error,
624 "mapper dropped unmap during rollback"
625 );
626 }
627 }
628 return Err(e);
629 }
630 Err(_) => {
631 tracing::debug!(?id, "mapper gone during add_mapping");
634 }
635 }
636 }
637
638 self.mappings.push(Mapping {
639 params,
640 active_mappers,
641 });
642 Ok(())
643 }
644
645 fn get_dma_target_mappings(&self) -> Vec<MappingParams> {
646 self.mappings
647 .iter()
648 .filter(|m| {
651 m.params.mapping_type == MappingType::Ram && m.params.backing.mappable().is_some()
652 })
653 .map(|m| m.params.clone())
654 .collect()
655 }
656
657 async fn remove_mappings(&mut self, range: MemoryRange) {
658 let mut mappers = Vec::new();
659 self.mappings.retain_mut(|mapping| {
660 if !range.contains(&mapping.params.range) {
661 assert!(
662 !range.overlaps(&mapping.params.range),
663 "no partial unmappings allowed"
664 );
665 return true;
666 }
667 tracing::debug!(range = %mapping.params.range, "removing mapping");
668 mappers.append(&mut mapping.active_mappers);
669 false
670 });
671 mappers.sort();
672 mappers.dedup();
673 self.mappers.invalidate(&mappers, range).await;
674 }
675}
676
677impl Mappers {
678 async fn invalidate(&self, ids: &[MapperId], range: MemoryRange) {
679 tracing::debug!(mapper_count = ids.len(), %range, "sending invalidations");
680 join_all(ids.iter().map(async |&MapperId(i)| {
681 if let Err(err) = self.mappers[i]
682 .req_send
683 .call(MapperRequest::Unmap, range)
684 .await
685 {
686 tracing::warn!(
687 error = &err as &dyn std::error::Error,
688 "mapper dropped invalidate request"
689 );
690 }
691 }))
692 .await;
693 }
694}
695
696pub(crate) struct DmaRegionProvider {
700 pub req_send: mesh::Sender<MappingRequest>,
701}
702
703impl ProvideShareableRegions for DmaRegionProvider {
704 async fn get_regions(&self) -> Result<Vec<ShareableRegion>, guestmem::ShareableRegionError> {
705 let mappings = self
706 .req_send
707 .call(MappingRequest::GetDmaTargetMappings, ())
708 .await?;
709
710 Ok(mappings
711 .into_iter()
712 .filter_map(|m| {
713 let mappable = m.backing.mappable()?;
714 Some(ShareableRegion {
715 guest_address: m.range.start(),
716 size: m.range.len(),
717 file: mappable.inner_arc(),
718 file_offset: m.backing.file_offset(),
719 })
720 })
721 .collect())
722 }
723}
724
725#[cfg(test)]
726mod tests {
727 use super::*;
728 use crate::region_manager::MappingType;
729 use guestmem::GuestMemoryAccess;
730 use guestmem::ProvideShareableRegions;
731 use memory_range::MemoryRange;
732
733 #[pal_async::async_test]
734 async fn test_dma_target_regions_returned(spawn: impl Spawn) {
735 let mm = MappingManager::new(&spawn, 0x200000, Vec::new(), None);
736 let client = mm.client().clone();
737
738 let ram: Mappable = sparse_mmap::alloc_shared_memory(0x100000, "test-ram")
739 .unwrap()
740 .into();
741 let device: Mappable = sparse_mmap::alloc_shared_memory(0x1000, "test-dev")
742 .unwrap()
743 .into();
744
745 client
746 .add_mapping(MappingParams {
747 range: MemoryRange::new(0..0x100000),
748 backing: MappingBacking::File {
749 mappable: ram,
750 file_offset: 0,
751 },
752 writable: true,
753 mapping_type: MappingType::Ram,
754 numa_node: None,
755 })
756 .await
757 .unwrap();
758
759 client
760 .add_mapping(MappingParams {
761 range: MemoryRange::new(0x100000..0x101000),
762 backing: MappingBacking::File {
763 mappable: device,
764 file_offset: 0,
765 },
766 writable: true,
767 mapping_type: MappingType::Device,
768 numa_node: None,
769 })
770 .await
771 .unwrap();
772
773 let provider = DmaRegionProvider {
774 req_send: client.req_send.clone(),
775 };
776 let regions = provider.get_regions().await.unwrap();
777
778 assert_eq!(regions.len(), 1);
780 assert_eq!(regions[0].guest_address, 0);
781 assert_eq!(regions[0].size, 0x100000);
782 assert_eq!(regions[0].file_offset, 0);
783 }
784
785 #[pal_async::async_test]
786 async fn test_no_dma_targets_returns_empty(spawn: impl Spawn) {
787 let mm = MappingManager::new(&spawn, 0x100000, Vec::new(), None);
788 let client = mm.client().clone();
789
790 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x1000, "test")
791 .unwrap()
792 .into();
793
794 client
795 .add_mapping(MappingParams {
796 range: MemoryRange::new(0..0x1000),
797 backing: MappingBacking::File {
798 mappable,
799 file_offset: 0,
800 },
801 writable: true,
802 mapping_type: MappingType::Device,
803 numa_node: None,
804 })
805 .await
806 .unwrap();
807
808 let provider = DmaRegionProvider {
809 req_send: client.req_send.clone(),
810 };
811 let regions = provider.get_regions().await.unwrap();
812 assert!(regions.is_empty());
813 }
814
815 async fn task_with_mapping() -> (MappingManagerTask, MappingParams) {
817 let mut task = MappingManagerTask::new();
818 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x10000, "test")
819 .unwrap()
820 .into();
821 let params = MappingParams {
822 range: MemoryRange::new(0..0x10000),
823 backing: MappingBacking::File {
824 mappable,
825 file_offset: 0,
826 },
827 writable: true,
828 mapping_type: MappingType::Ram,
829 numa_node: None,
830 };
831 task.add_mapping(params.clone()).await.unwrap();
832 (task, params)
833 }
834
835 async fn add_mapper_and_drain(
838 task: &mut MappingManagerTask,
839 eager: bool,
840 ) -> (MapperId, Vec<MapperRequest>) {
841 let (send, mut recv) = mesh::channel();
842 let id = task.add_mapper(send, eager).await.unwrap();
843 let mut msgs = Vec::new();
845 while let Ok(msg) = recv.try_recv() {
846 msgs.push(msg);
847 }
848 (id, msgs)
849 }
850
851 #[pal_async::async_test]
852 async fn test_eager_mapper_gets_replay(_spawn: impl Spawn) {
853 let (mut task, _params) = task_with_mapping().await;
854
855 let (send, mut recv) = mesh::channel();
856 let (id, _) = futures::join!(task.add_mapper(send, true), async {
859 let msg = recv.recv().await.unwrap();
860 match msg {
861 MapperRequest::MapEager(rpc) => {
862 let (params, rpc) = rpc.split();
863 assert_eq!(params.range, MemoryRange::new(0..0x10000));
864 rpc.complete(Ok(()));
865 }
866 other => panic!("expected MapEager, got {other:?}"),
867 }
868 });
869 let _ = id;
870 }
871
872 #[pal_async::async_test]
873 async fn test_lazy_mapper_no_replay(_spawn: impl Spawn) {
874 let (mut task, _params) = task_with_mapping().await;
875
876 let (_id, msgs) = add_mapper_and_drain(&mut task, false).await;
877
878 assert!(msgs.is_empty(), "lazy mapper should not get replay");
880 }
881
882 #[pal_async::async_test]
883 async fn test_add_mapping_pushes_only_to_eager(_spawn: impl Spawn) {
884 let mut task = MappingManagerTask::new();
885
886 let (eager_send, mut eager_recv) = mesh::channel();
888 let _eager_id = task.add_mapper(eager_send, true).await.unwrap();
889
890 let (lazy_send, mut lazy_recv) = mesh::channel();
891 let _lazy_id = task.add_mapper(lazy_send, false).await.unwrap();
892
893 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x1000, "test")
895 .unwrap()
896 .into();
897 let params = MappingParams {
898 range: MemoryRange::new(0..0x1000),
899 backing: MappingBacking::File {
900 mappable,
901 file_offset: 0,
902 },
903 writable: true,
904 mapping_type: MappingType::Device,
905 numa_node: None,
906 };
907
908 let add_future = task.add_mapping(params);
910 let (add_result, _) = futures::join!(add_future, async {
912 let msg = eager_recv.recv().await.unwrap();
913 match msg {
914 MapperRequest::MapEager(rpc) => rpc.complete(Ok(())),
915 other => panic!("expected MapEager, got {other:?}"),
916 }
917 });
918 add_result.unwrap();
919
920 assert!(
922 lazy_recv.try_recv().is_err(),
923 "lazy mapper should not be notified on add_mapping"
924 );
925 }
926
927 #[pal_async::async_test]
928 async fn test_upgrade_to_eager_replays(_spawn: impl Spawn) {
929 let (mut task, _params) = task_with_mapping().await;
930
931 let (send, mut recv) = mesh::channel();
933 let id = task.add_mapper(send, false).await.unwrap();
934 assert!(
935 recv.try_recv().is_err(),
936 "lazy mapper should not get replay"
937 );
938
939 let upgrade_future = task.upgrade_to_eager(id);
941 let (result, _) = futures::join!(upgrade_future, async {
942 let msg = recv.recv().await.unwrap();
943 match msg {
944 MapperRequest::MapEager(rpc) => {
945 let (params, rpc) = rpc.split();
946 assert_eq!(params.range, MemoryRange::new(0..0x10000));
947 rpc.complete(Ok(()));
948 }
949 other => panic!("expected MapEager during upgrade, got {other:?}"),
950 }
951 let msg = recv.recv().await.unwrap();
953 match msg {
954 MapperRequest::SetEager(rpc) => rpc.complete(()),
955 other => panic!("expected SetEager, got {other:?}"),
956 }
957 });
958 result.unwrap();
959
960 assert!(task.mappers.mappers[id.0].eager);
962 }
963
964 #[pal_async::async_test]
965 async fn test_upgrade_already_eager_is_noop(_spawn: impl Spawn) {
966 let (mut task, _params) = task_with_mapping().await;
967
968 let (send, mut recv) = mesh::channel();
970 let upgrade_future = task.add_mapper(send, true);
971 let (id, _) = futures::join!(upgrade_future, async {
972 let msg = recv.recv().await.unwrap();
973 match msg {
974 MapperRequest::MapEager(rpc) => rpc.complete(Ok(())),
975 other => panic!("expected MapEager, got {other:?}"),
976 }
977 });
978 let id = id.unwrap();
979
980 task.upgrade_to_eager(id).await.unwrap();
982 assert!(
983 recv.try_recv().is_err(),
984 "upgrade of already-eager mapper should send nothing"
985 );
986 }
987
988 #[pal_async::async_test]
989 async fn test_after_upgrade_new_mappings_are_pushed(_spawn: impl Spawn) {
990 let mut task = MappingManagerTask::new();
991
992 let (send, mut recv) = mesh::channel();
994 let id = task.add_mapper(send, false).await.unwrap();
995
996 let upgrade_future = task.upgrade_to_eager(id);
999 let (result, _) = futures::join!(upgrade_future, async {
1000 let msg = recv.recv().await.unwrap();
1001 match msg {
1002 MapperRequest::SetEager(rpc) => rpc.complete(()),
1003 other => panic!("expected SetEager, got {other:?}"),
1004 }
1005 });
1006 result.unwrap();
1007
1008 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x1000, "test")
1010 .unwrap()
1011 .into();
1012 let params = MappingParams {
1013 range: MemoryRange::new(0..0x1000),
1014 backing: MappingBacking::File {
1015 mappable,
1016 file_offset: 0,
1017 },
1018 writable: true,
1019 mapping_type: MappingType::Device,
1020 numa_node: None,
1021 };
1022
1023 let add_future = task.add_mapping(params);
1024 let (result, _) = futures::join!(add_future, async {
1025 let msg = recv.recv().await.unwrap();
1026 match msg {
1027 MapperRequest::MapEager(rpc) => rpc.complete(Ok(())),
1028 other => panic!("expected MapEager after upgrade, got {other:?}"),
1029 }
1030 });
1031 result.unwrap();
1032 }
1033
1034 #[pal_async::async_test]
1035 async fn test_send_mappings_for_lazy(_spawn: impl Spawn) {
1036 let (mut task, _params) = task_with_mapping().await;
1037
1038 let (send, mut recv) = mesh::channel();
1040 let id = task.add_mapper(send, false).await.unwrap();
1041
1042 task.send_mappings(id, MemoryRange::new(0..0x10000));
1044
1045 let msg = recv.recv().await.unwrap();
1047 match msg {
1048 MapperRequest::MapLazy(params) => {
1049 assert_eq!(params.range, MemoryRange::new(0..0x10000));
1050 }
1051 other => panic!("expected MapLazy, got {other:?}"),
1052 }
1053 }
1054
1055 #[pal_async::async_test]
1056 async fn test_send_mappings_gap_sends_no_mapping(_spawn: impl Spawn) {
1057 let (mut task, _params) = task_with_mapping().await;
1058
1059 let (send, mut recv) = mesh::channel();
1060 let id = task.add_mapper(send, false).await.unwrap();
1061
1062 task.send_mappings(id, MemoryRange::new(0x10000..0x20000));
1064
1065 let msg = recv.recv().await.unwrap();
1066 match msg {
1067 MapperRequest::NoMapping(range) => {
1068 assert_eq!(range, MemoryRange::new(0x10000..0x20000));
1069 }
1070 other => panic!("expected NoMapping, got {other:?}"),
1071 }
1072 }
1073
1074 #[pal_async::async_test]
1075 async fn test_remove_mapping_invalidates_both_eager_and_lazy(_spawn: impl Spawn) {
1076 let (mut task, _params) = task_with_mapping().await;
1077
1078 let (eager_send, mut eager_recv) = mesh::channel();
1080 let add_future = task.add_mapper(eager_send, true);
1081 let (_eager_id, _) = futures::join!(add_future, async {
1082 let msg = eager_recv.recv().await.unwrap();
1083 match msg {
1084 MapperRequest::MapEager(rpc) => rpc.complete(Ok(())),
1085 other => panic!("expected MapEager, got {other:?}"),
1086 }
1087 });
1088
1089 let (lazy_send, mut lazy_recv) = mesh::channel();
1091 let lazy_id = task.add_mapper(lazy_send, false).await.unwrap();
1092 task.send_mappings(lazy_id, MemoryRange::new(0..0x10000));
1093 let _ = lazy_recv.recv().await.unwrap();
1095
1096 let remove_future = task.remove_mappings(MemoryRange::new(0..0x10000));
1098 let ((), _, _) = futures::join!(
1099 remove_future,
1100 async {
1101 let msg = eager_recv.recv().await.unwrap();
1102 match msg {
1103 MapperRequest::Unmap(rpc) => {
1104 let (range, rpc) = rpc.split();
1105 assert_eq!(range, MemoryRange::new(0..0x10000));
1106 rpc.complete(());
1107 }
1108 other => panic!("expected Unmap for eager, got {other:?}"),
1109 }
1110 },
1111 async {
1112 let msg = lazy_recv.recv().await.unwrap();
1113 match msg {
1114 MapperRequest::Unmap(rpc) => {
1115 let (range, rpc) = rpc.split();
1116 assert_eq!(range, MemoryRange::new(0..0x10000));
1117 rpc.complete(());
1118 }
1119 other => panic!("expected Unmap for lazy, got {other:?}"),
1120 }
1121 }
1122 );
1123 }
1124
1125 async fn task_with_two_mappings() -> MappingManagerTask {
1127 let mut task = MappingManagerTask::new();
1128 for (start, end) in [(0u64, 0x10000u64), (0x10000, 0x20000)] {
1129 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x10000, "test")
1130 .unwrap()
1131 .into();
1132 task.add_mapping(MappingParams {
1133 range: MemoryRange::new(start..end),
1134 backing: MappingBacking::File {
1135 mappable,
1136 file_offset: 0,
1137 },
1138 writable: true,
1139 mapping_type: MappingType::Ram,
1140 numa_node: None,
1141 })
1142 .await
1143 .unwrap();
1144 }
1145 task
1146 }
1147
1148 #[pal_async::async_test]
1149 async fn test_add_eager_mapper_rollback_on_replay_failure(_spawn: impl Spawn) {
1150 let mut task = task_with_two_mappings().await;
1151
1152 let (send, mut recv) = mesh::channel();
1155 let add_future = task.add_mapper(send, true);
1156 let (result, _) = futures::join!(add_future, async {
1157 let msg = recv.recv().await.unwrap();
1159 match msg {
1160 MapperRequest::MapEager(rpc) => rpc.complete(Ok(())),
1161 other => panic!("expected MapEager #1, got {other:?}"),
1162 }
1163 let msg = recv.recv().await.unwrap();
1165 match msg {
1166 MapperRequest::MapEager(rpc) => {
1167 rpc.complete(Err(RemoteError::new(std::io::Error::other(
1168 "simulated failure",
1169 ))));
1170 }
1171 other => panic!("expected MapEager #2, got {other:?}"),
1172 }
1173 });
1174
1175 assert!(result.is_err());
1177
1178 assert_eq!(task.mappers.mappers.len(), 0);
1180
1181 for mapping in &task.mappings {
1183 assert!(
1184 mapping.active_mappers.is_empty(),
1185 "active_mappers should be empty after rollback, got {:?} for {}",
1186 mapping.active_mappers,
1187 mapping.params.range
1188 );
1189 }
1190
1191 let (send2, mut recv2) = mesh::channel();
1194 let add_future2 = task.add_mapper(send2, true);
1195 let (result2, _) = futures::join!(add_future2, async {
1196 for _ in 0..2 {
1197 let msg = recv2.recv().await.unwrap();
1198 match msg {
1199 MapperRequest::MapEager(rpc) => rpc.complete(Ok(())),
1200 other => panic!("expected MapEager during second add, got {other:?}"),
1201 }
1202 }
1203 });
1204 assert!(result2.is_ok());
1205 }
1206
1207 #[pal_async::async_test]
1208 async fn test_add_mapping_rollback_on_eager_failure(_spawn: impl Spawn) {
1209 let mut task = MappingManagerTask::new();
1210
1211 let (send1, mut recv1) = mesh::channel();
1213 let _id1 = task.add_mapper(send1, true).await.unwrap();
1214
1215 let (send2, mut recv2) = mesh::channel();
1216 let _id2 = task.add_mapper(send2, true).await.unwrap();
1217
1218 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x1000, "test")
1220 .unwrap()
1221 .into();
1222 let params = MappingParams {
1223 range: MemoryRange::new(0..0x1000),
1224 backing: MappingBacking::File {
1225 mappable,
1226 file_offset: 0,
1227 },
1228 writable: true,
1229 mapping_type: MappingType::Device,
1230 numa_node: None,
1231 };
1232
1233 let add_future = task.add_mapping(params);
1234 let (result, _, _) = futures::join!(
1235 add_future,
1236 async {
1237 let msg = recv1.recv().await.unwrap();
1239 match msg {
1240 MapperRequest::MapEager(rpc) => rpc.complete(Ok(())),
1241 other => panic!("expected MapEager, got {other:?}"),
1242 }
1243 let msg = recv1.recv().await.unwrap();
1245 match msg {
1246 MapperRequest::Unmap(rpc) => {
1247 let (range, rpc) = rpc.split();
1248 assert_eq!(range, MemoryRange::new(0..0x1000));
1249 rpc.complete(());
1250 }
1251 other => panic!("expected Unmap rollback, got {other:?}"),
1252 }
1253 },
1254 async {
1255 let msg = recv2.recv().await.unwrap();
1257 match msg {
1258 MapperRequest::MapEager(rpc) => {
1259 rpc.complete(Err(RemoteError::new(std::io::Error::other(
1260 "simulated failure",
1261 ))));
1262 }
1263 other => panic!("expected MapEager, got {other:?}"),
1264 }
1265 }
1266 );
1267
1268 assert!(result.is_err());
1270
1271 assert!(task.mappings.is_empty());
1273 }
1274
1275 #[pal_async::async_test]
1276 async fn test_upgrade_to_eager_rollback_on_failure(_spawn: impl Spawn) {
1277 let mut task = task_with_two_mappings().await;
1278
1279 let (send, mut recv) = mesh::channel();
1281 let id = task.add_mapper(send, false).await.unwrap();
1282 assert!(!task.mappers.mappers[id.0].eager);
1283
1284 let upgrade_future = task.upgrade_to_eager(id);
1286 let (result, _) = futures::join!(upgrade_future, async {
1287 let msg = recv.recv().await.unwrap();
1289 match msg {
1290 MapperRequest::MapEager(rpc) => rpc.complete(Ok(())),
1291 other => panic!("expected MapEager #1, got {other:?}"),
1292 }
1293 let msg = recv.recv().await.unwrap();
1295 match msg {
1296 MapperRequest::MapEager(rpc) => {
1297 rpc.complete(Err(RemoteError::new(std::io::Error::other(
1298 "simulated failure",
1299 ))));
1300 }
1301 other => panic!("expected MapEager #2, got {other:?}"),
1302 }
1303 });
1304
1305 assert!(result.is_err());
1307
1308 assert!(!task.mappers.mappers[id.0].eager);
1310
1311 assert!(
1315 task.mappings[0].active_mappers.contains(&id),
1316 "first mapping should retain mapper in active_mappers"
1317 );
1318 assert!(
1319 !task.mappings[1].active_mappers.contains(&id),
1320 "second mapping should not have mapper (replay failed)"
1321 );
1322
1323 assert!(task.mappers.mappers.contains(id.0));
1325
1326 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x1000, "test")
1329 .unwrap()
1330 .into();
1331 task.add_mapping(MappingParams {
1332 range: MemoryRange::new(0x20000..0x21000),
1333 backing: MappingBacking::File {
1334 mappable,
1335 file_offset: 0,
1336 },
1337 writable: true,
1338 mapping_type: MappingType::Device,
1339 numa_node: None,
1340 })
1341 .await
1342 .unwrap();
1343
1344 assert!(
1346 recv.try_recv().is_err(),
1347 "lazy mapper should not receive add_mapping push after failed upgrade"
1348 );
1349 }
1350
1351 #[pal_async::async_test]
1352 async fn test_eager_page_fault_fails_immediately(_spawn: impl Spawn) {
1353 use super::super::va_mapper::VaMapper;
1354
1355 let (req_send, mut req_recv) = mesh::channel::<MappingRequest>();
1357 let mapper_future = VaMapper::new(
1358 req_send,
1359 0x10000,
1360 None,
1361 Vec::new(),
1362 None,
1363 true, );
1365 let (mapper, _) = futures::join!(mapper_future, async {
1366 let msg = req_recv.recv().await.unwrap();
1367 match msg {
1368 MappingRequest::AddMapper(rpc) => {
1369 rpc.handle_failable_sync(|params| {
1370 assert!(params.eager);
1371 Ok::<_, MappingError>(MapperId(0))
1372 });
1373 }
1374 _ => panic!("expected AddMapper"),
1375 }
1376 });
1377 let mapper = mapper.unwrap();
1378 assert!(mapper.is_eager());
1379
1380 let action = mapper.page_fault(0x1000, 0x1000, false, false);
1384 assert!(
1385 matches!(action, guestmem::PageFaultAction::Fail(_)),
1386 "eager mapper should fail page faults on unmapped file-backed ranges"
1387 );
1388 }
1389
1390 #[pal_async::async_test]
1391 async fn test_va_mapper_drop_removes_mapper(_spawn: impl Spawn) {
1392 use super::super::va_mapper::VaMapper;
1393
1394 let (req_send, mut req_recv) = mesh::channel::<MappingRequest>();
1395 let mapper_future = VaMapper::new(
1396 req_send,
1397 0x10000,
1398 None,
1399 Vec::new(),
1400 None,
1401 true, );
1403 let (mapper, mapper_req_send) = futures::join!(mapper_future, async {
1404 let msg = req_recv.recv().await.unwrap();
1405 match msg {
1406 MappingRequest::AddMapper(rpc) => {
1407 let (params, rpc) = rpc.split();
1408 assert!(params.eager);
1409 rpc.complete(Ok(MapperId(7)));
1410 params.send
1411 }
1412 _ => panic!("expected AddMapper"),
1413 }
1414 });
1415 drop(mapper.unwrap());
1416
1417 match req_recv.recv().await.unwrap() {
1418 MappingRequest::RemoveMapper(id) => assert_eq!(id, MapperId(7)),
1419 _ => panic!("expected RemoveMapper"),
1420 }
1421
1422 drop(mapper_req_send);
1425 }
1426
1427 #[pal_async::async_test]
1428 async fn test_lazy_page_fault_requests_mapping(spawn: impl Spawn) {
1429 let _ = spawn;
1430 let (manager_thread, manager_driver) =
1431 pal_async::DefaultPool::spawn_on_thread("mapping-manager-test");
1432 let mm = MappingManager::new(&manager_driver, 0x10000, Vec::new(), None);
1433 let client = mm.client().clone();
1434
1435 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x10000, "test")
1437 .unwrap()
1438 .into();
1439 client
1440 .add_mapping(MappingParams {
1441 range: MemoryRange::new(0..0x10000),
1442 backing: MappingBacking::File {
1443 mappable,
1444 file_offset: 0,
1445 },
1446 writable: true,
1447 mapping_type: MappingType::Device,
1448 numa_node: None,
1449 })
1450 .await
1451 .unwrap();
1452
1453 let mapper = client.new_mapper(false).await.unwrap();
1455 assert!(!mapper.is_eager());
1456
1457 let action = mapper.page_fault(0x1000, 0x1000, false, false);
1462 assert!(
1463 matches!(action, guestmem::PageFaultAction::Retry),
1464 "lazy mapper should request mapping on page fault and succeed"
1465 );
1466
1467 drop(mapper);
1468 drop(client);
1469 drop(mm);
1470 drop(manager_driver);
1471 manager_thread.join().unwrap();
1472 }
1473
1474 #[pal_async::async_test]
1478 async fn test_eager_mapper_with_existing_mappings(spawn: impl Spawn) {
1479 let _ = spawn;
1480 let (manager_thread, manager_driver) =
1481 pal_async::DefaultPool::spawn_on_thread("mapping-manager-test");
1482 let mm = MappingManager::new(&manager_driver, 0x10000, Vec::new(), None);
1483 let client = mm.client().clone();
1484
1485 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x10000, "test")
1487 .unwrap()
1488 .into();
1489 client
1490 .add_mapping(MappingParams {
1491 range: MemoryRange::new(0..0x10000),
1492 backing: MappingBacking::File {
1493 mappable,
1494 file_offset: 0,
1495 },
1496 writable: true,
1497 mapping_type: MappingType::Ram,
1498 numa_node: None,
1499 })
1500 .await
1501 .unwrap();
1502
1503 let mapper = client.new_mapper(true).await.unwrap();
1506 assert!(mapper.is_eager());
1507
1508 drop(mapper);
1509 drop(client);
1510 drop(mm);
1511 drop(manager_driver);
1512 manager_thread.join().unwrap();
1513 }
1514
1515 #[pal_async::async_test]
1516 async fn test_new_mapper_upgrades_cached_lazy_to_eager(spawn: impl Spawn) {
1517 let _ = spawn;
1518 let (manager_thread, manager_driver) =
1519 pal_async::DefaultPool::spawn_on_thread("mapping-manager-test");
1520 let mm = MappingManager::new(&manager_driver, 0x20000, Vec::new(), None);
1521 let client = mm.client().clone();
1522
1523 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x10000, "test")
1525 .unwrap()
1526 .into();
1527 client
1528 .add_mapping(MappingParams {
1529 range: MemoryRange::new(0..0x10000),
1530 backing: MappingBacking::File {
1531 mappable,
1532 file_offset: 0,
1533 },
1534 writable: true,
1535 mapping_type: MappingType::Device,
1536 numa_node: None,
1537 })
1538 .await
1539 .unwrap();
1540
1541 let lazy = client.new_mapper(false).await.unwrap();
1543 assert!(!lazy.is_eager());
1544
1545 let eager = client.new_mapper(true).await.unwrap();
1547
1548 assert!(Arc::ptr_eq(&lazy, &eager));
1550
1551 let mappable: Mappable = sparse_mmap::alloc_shared_memory(0x1000, "test")
1556 .unwrap()
1557 .into();
1558 client
1559 .add_mapping(MappingParams {
1560 range: MemoryRange::new(0x10000..0x11000),
1561 backing: MappingBacking::File {
1562 mappable,
1563 file_offset: 0,
1564 },
1565 writable: true,
1566 mapping_type: MappingType::Device,
1567 numa_node: None,
1568 })
1569 .await
1570 .unwrap();
1571
1572 assert!(eager.is_eager());
1574
1575 drop(eager);
1576 drop(lazy);
1577 drop(client);
1578 drop(mm);
1579 drop(manager_driver);
1580 manager_thread.join().unwrap();
1581 }
1582}