membacking/mapping_manager/
manager.rs1use super::mappable::Mappable;
9use super::object_cache::ObjectCache;
10use super::object_cache::ObjectId;
11use super::va_mapper::VaMapper;
12use super::va_mapper::VaMapperError;
13use crate::RemoteProcess;
14use futures::StreamExt;
15use futures::future::join_all;
16use inspect::Inspect;
17use inspect::InspectMut;
18use memory_range::MemoryRange;
19use mesh::MeshPayload;
20use mesh::rpc::Rpc;
21use mesh::rpc::RpcSend;
22use pal_async::task::Spawn;
23use slab::Slab;
24use std::sync::Arc;
25
26#[derive(Debug)]
28pub struct MappingManager {
29 client: MappingManagerClient,
30}
31
32impl Inspect for MappingManager {
33 fn inspect(&self, req: inspect::Request<'_>) {
34 self.client
35 .req_send
36 .send(MappingRequest::Inspect(req.defer()));
37 }
38}
39
40impl MappingManager {
41 pub fn new(spawn: impl Spawn, max_addr: u64) -> Self {
43 let (req_send, mut req_recv) = mesh::mpsc_channel();
44 spawn
45 .spawn("mapping_manager", {
46 let mut task = MappingManagerTask::new();
47 async move {
48 task.run(&mut req_recv).await;
49 }
50 })
51 .detach();
52 Self {
53 client: MappingManagerClient {
54 id: ObjectId::new(),
55 req_send,
56 max_addr,
57 },
58 }
59 }
60
61 pub fn client(&self) -> &MappingManagerClient {
64 &self.client
65 }
66}
67
68#[derive(Debug, MeshPayload, Clone)]
70pub struct MappingManagerClient {
71 req_send: mesh::Sender<MappingRequest>,
72 id: ObjectId,
73 max_addr: u64,
74}
75
76static MAPPER_CACHE: ObjectCache<VaMapper> = ObjectCache::new();
77
78impl MappingManagerClient {
79 pub async fn new_mapper(&self) -> Result<Arc<VaMapper>, VaMapperError> {
83 MAPPER_CACHE
86 .get_or_insert_with(&self.id, async {
87 VaMapper::new(self.req_send.clone(), self.max_addr, None).await
88 })
89 .await
90 }
91
92 pub async fn new_remote_mapper(
97 &self,
98 process: RemoteProcess,
99 ) -> Result<Arc<VaMapper>, VaMapperError> {
100 Ok(Arc::new(
101 VaMapper::new(self.req_send.clone(), self.max_addr, Some(process)).await?,
102 ))
103 }
104
105 pub async fn add_mapping(
111 &self,
112 range: MemoryRange,
113 mappable: Mappable,
114 file_offset: u64,
115 writable: bool,
116 ) {
117 let params = MappingParams {
118 range,
119 mappable,
120 file_offset,
121 writable,
122 };
123
124 self.req_send
125 .call(MappingRequest::AddMapping, params)
126 .await
127 .unwrap();
128 }
129
130 pub async fn remove_mappings(&self, range: MemoryRange) {
134 self.req_send
135 .call(MappingRequest::RemoveMappings, range)
136 .await
137 .unwrap();
138 }
139}
140
141#[derive(MeshPayload)]
143pub enum MappingRequest {
144 AddMapper(Rpc<mesh::Sender<MapperRequest>, MapperId>),
145 RemoveMapper(MapperId),
146 SendMappings(MapperId, MemoryRange),
147 AddMapping(Rpc<MappingParams, ()>),
148 RemoveMappings(Rpc<MemoryRange, ()>),
149 Inspect(inspect::Deferred),
150}
151
152#[derive(InspectMut)]
153struct MappingManagerTask {
154 #[inspect(with = "inspect_mappings")]
155 mappings: Vec<Mapping>,
156 #[inspect(skip)]
157 mappers: Mappers,
158}
159
160fn inspect_mappings(mappings: &Vec<Mapping>) -> impl '_ + Inspect {
161 inspect::adhoc(move |req| {
162 let mut resp = req.respond();
163 for mapping in mappings {
164 resp.field(
165 &mapping.params.range.to_string(),
166 inspect::adhoc(|req| {
167 req.respond()
168 .field("writable", mapping.params.writable)
169 .hex("file_offset", mapping.params.file_offset);
170 }),
171 );
172 }
173 })
174}
175
176struct Mapping {
177 params: MappingParams,
178 active_mappers: Vec<MapperId>,
179}
180
181#[derive(MeshPayload, Clone)]
183pub struct MappingParams {
184 pub range: MemoryRange,
186 pub mappable: Mappable,
188 pub file_offset: u64,
190 pub writable: bool,
192}
193
194struct Mappers {
195 mappers: Slab<MapperComm>,
196}
197
198struct MapperComm {
199 req_send: mesh::Sender<MapperRequest>,
200}
201
202#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, MeshPayload)]
203pub struct MapperId(usize);
204
205#[derive(MeshPayload)]
207pub enum MapperRequest {
208 Map(MappingParams),
210 NoMapping(MemoryRange),
213 Unmap(Rpc<MemoryRange, ()>),
215}
216
217impl MappingManagerTask {
218 fn new() -> Self {
219 Self {
220 mappers: Mappers {
221 mappers: Slab::new(),
222 },
223 mappings: Vec::new(),
224 }
225 }
226
227 async fn run(&mut self, req_recv: &mut mesh::Receiver<MappingRequest>) {
228 while let Some(req) = req_recv.next().await {
229 match req {
230 MappingRequest::AddMapper(rpc) => rpc.handle_sync(|send| self.add_mapper(send)),
231 MappingRequest::RemoveMapper(id) => {
232 self.remove_mapper(id);
233 }
234 MappingRequest::SendMappings(id, range) => {
235 self.send_mappings(id, range);
236 }
237 MappingRequest::AddMapping(rpc) => {
238 rpc.handle_sync(|params| self.add_mapping(params))
239 }
240 MappingRequest::RemoveMappings(rpc) => {
241 rpc.handle(async |range| self.remove_mappings(range).await)
242 .await
243 }
244 MappingRequest::Inspect(deferred) => deferred.inspect(&mut *self),
245 }
246 }
247 }
248
249 fn add_mapper(&mut self, req_send: mesh::Sender<MapperRequest>) -> MapperId {
250 let id = self.mappers.mappers.insert(MapperComm { req_send });
251 tracing::debug!(?id, "adding mapper");
252 MapperId(id)
253 }
254
255 fn remove_mapper(&mut self, id: MapperId) {
256 tracing::debug!(?id, "removing mapper");
257 self.mappers.mappers.remove(id.0);
258 for mapping in &mut self.mappings {
259 mapping.active_mappers.retain(|m| m != &id);
260 }
261 }
262
263 fn send_mappings(&mut self, id: MapperId, mut range: MemoryRange) {
264 while !range.is_empty() {
265 let (this_end, params) = if let Some(mapping) = self
267 .mappings
268 .iter_mut()
269 .filter(|mapping| mapping.params.range.overlaps(&range))
270 .min_by_key(|mapping| mapping.params.range.start())
271 {
272 if mapping.params.range.start() <= range.start() {
273 if !mapping.active_mappers.contains(&id) {
274 mapping.active_mappers.push(id);
275 }
276 (
278 mapping.params.range.end().min(range.end()),
279 Some(mapping.params.clone()),
280 )
281 } else {
282 (mapping.params.range.start(), None)
284 }
285 } else {
286 (range.end(), None)
288 };
289 let this_range = MemoryRange::new(range.start()..this_end);
290 let req = if let Some(params) = params {
291 tracing::debug!(range = %this_range, full_range = %params.range, "sending mapping for range");
292 MapperRequest::Map(params)
293 } else {
294 tracing::debug!(range = %this_range, "no mapping for range");
295 MapperRequest::NoMapping(this_range)
296 };
297 self.mappers.mappers[id.0].req_send.send(req);
298 range = MemoryRange::new(this_end..range.end());
299 }
300 }
301
302 fn add_mapping(&mut self, params: MappingParams) {
303 tracing::debug!(range = %params.range, "adding mapping");
304
305 assert!(!self.mappings.iter().any(|m| m.params.range == params.range));
306
307 self.mappings.push(Mapping {
308 params,
309 active_mappers: Vec::new(),
310 });
311 }
312
313 async fn remove_mappings(&mut self, range: MemoryRange) {
314 let mut mappers = Vec::new();
315 self.mappings.retain_mut(|mapping| {
316 if !range.contains(&mapping.params.range) {
317 assert!(
318 !range.overlaps(&mapping.params.range),
319 "no partial unmappings allowed"
320 );
321 return true;
322 }
323 tracing::debug!(range = %mapping.params.range, "removing mapping");
324 mappers.append(&mut mapping.active_mappers);
325 false
326 });
327 mappers.sort();
328 mappers.dedup();
329 self.mappers.invalidate(&mappers, range).await;
330 }
331}
332
333impl Mappers {
334 async fn invalidate(&self, ids: &[MapperId], range: MemoryRange) {
335 tracing::debug!(mapper_count = ids.len(), %range, "sending invalidations");
336 join_all(ids.iter().map(async |&MapperId(i)| {
337 if let Err(err) = self.mappers[i]
338 .req_send
339 .call(MapperRequest::Unmap, range)
340 .await
341 {
342 tracing::warn!(
343 error = &err as &dyn std::error::Error,
344 "mapper dropped invalidate request"
345 );
346 }
347 }))
348 .await;
349 }
350}