1#![cfg(target_os = "linux")]
7#![forbid(unsafe_code)]
8
9use anyhow::Context;
10use futures::FutureExt;
11use mesh::MeshPayload;
12use mesh::error::RemoteError;
13use mesh_worker::Worker;
14use mesh_worker::WorkerId;
15use mesh_worker::WorkerRpc;
16use pal_async::DefaultPool;
17use pal_async::driver::Driver;
18use pal_async::timer::PolledTimer;
19use socket2::Socket;
20use std::io::Read;
21use std::os::fd::AsRawFd;
22use std::pin::pin;
23use std::process::Command;
24use std::process::Stdio;
25use std::time::Duration;
26
27const MIN_MEMORY_PROFILER_MB: u64 = 10;
30
31#[derive(Debug, MeshPayload)]
33pub struct ProfilerRequest {
34 pub duration: u64,
36 pub profiler_args: Vec<String>,
38 pub conn: Socket,
40}
41
42pub const PROFILER_WORKER: WorkerId<ProfilerWorkerParameters> = WorkerId::new("ProfilerWorker");
44
45#[derive(MeshPayload)]
47pub struct ProfilerWorkerParameters {
48 pub profiler_request: ProfilerRequest,
50}
51
52pub struct ProfilerWorker {
54 profiler_request: ProfilerRequest,
55}
56
57impl Worker for ProfilerWorker {
58 type Parameters = ProfilerWorkerParameters;
59 type State = ();
60 const ID: WorkerId<Self::Parameters> = WorkerId::new("ProfilerWorker");
61
62 fn new(parameters: Self::Parameters) -> anyhow::Result<Self> {
64 Ok(Self {
65 profiler_request: parameters.profiler_request,
66 })
67 }
68
69 fn restart(_state: Self::State) -> anyhow::Result<Self> {
71 unimplemented!()
72 }
73
74 fn run(self, mut recv: mesh::Receiver<WorkerRpc<Self::State>>) -> anyhow::Result<()> {
76 DefaultPool::run_with(async |driver| {
77 let mut profiling = pin!(profile(self.profiler_request, &driver).fuse());
78 loop {
79 let msg = futures::select! { msg = recv.recv().fuse() => {
81 msg
82 },
83 r = profiling => {
84 match r {
85 Ok(_) => {
86 break
87 },
88 Err(err) => {
89 anyhow::bail!("Profiling failed - Error {}", err.to_string());
90 }
91 }
92 }
93 };
94 match msg {
95 Ok(message) => match message {
96 WorkerRpc::Stop => {
97 break;
98 }
99 WorkerRpc::Restart(rpc) => {
100 rpc.complete(Err(RemoteError::new(anyhow::anyhow!("not supported"))));
101 }
102 WorkerRpc::Inspect(_deferred) => {}
103 },
104 Err(err) => {
105 anyhow::bail!("ProfilerWorker::Run - Error {}", err.to_string());
106 }
107 }
108 }
109 Ok(())
110 })
111 }
112}
113
114fn get_free_mem_mb() -> anyhow::Result<u64> {
116 parse_meminfo_free(&fs_err::read_to_string("/proc/meminfo")?)
117}
118
119fn parse_meminfo_free(contents: &str) -> anyhow::Result<u64> {
120 const KBYTES_PER_MBYTES: u64 = 1024;
121 for line in contents.lines() {
122 let Some((name, rest)) = line.split_once(':') else {
123 continue;
124 };
125 if name == "MemFree" {
126 let value = rest
127 .split_ascii_whitespace()
128 .next()
129 .context("line had no value")?;
130 let value = value.parse::<u64>().context("value failed to parse")?;
131 return Ok(value / KBYTES_PER_MBYTES);
132 }
133 }
134
135 Err(anyhow::anyhow!("no memfree line found"))
136}
137
138pub async fn profile(request: ProfilerRequest, driver: &impl Driver) -> anyhow::Result<()> {
140 let mut timer = PolledTimer::new(driver);
141 let ProfilerRequest {
142 conn,
143 duration,
144 mut profiler_args,
145 } = request;
146
147 conn.set_cloexec(false)
149 .map_err(anyhow::Error::from)
150 .context("Failed to set CLO_EXEC to Socket")?;
151
152 let socket_fd = conn.as_raw_fd();
153 let free_mem_mb = match get_free_mem_mb() {
154 Ok(m) => m,
155 Err(e) => {
156 tracing::error!(
157 e = e.as_ref() as &dyn std::error::Error,
158 "Error when getting memory"
159 );
160 0
161 }
162 };
163
164 if free_mem_mb < MIN_MEMORY_PROFILER_MB {
165 anyhow::bail!("Not enough memory to start profiler {} MB", free_mem_mb);
166 }
167
168 profiler_args.push(format!("LimitMB:{}", free_mem_mb * 75 / 100));
170
171 let mut process = Command::new("/usr/bin/underhill_profiler_binary")
172 .arg(duration.to_string())
173 .arg(socket_fd.to_string())
174 .args(profiler_args)
175 .stdout(Stdio::piped())
176 .stderr(Stdio::piped())
177 .spawn()
178 .context("failed to execute process")?;
179
180 let mut process_success = false;
181
182 timer.sleep(Duration::from_secs(duration + 1)).await;
184
185 for wait_time in 1..=15 {
189 match process.try_wait() {
190 Ok(Some(_status)) => {
191 process_success = true;
192 break;
193 }
194 Ok(None) => {
195 if wait_time == 15 {
196 tracing::error!("Running profiler binary timeout");
197 if let Err(e) = process.kill() {
198 tracing::error!(
199 e = &e as &dyn std::error::Error,
200 "Error when stopping child process"
201 );
202 }
203 process_success = false;
204 }
205 }
206 Err(e) => {
207 process_success = false;
208 tracing::error!(
209 e = &e as &dyn std::error::Error,
210 "Running profiler binary failed",
211 );
212 break;
213 }
214 }
215 timer.sleep(Duration::from_secs(1)).await;
216 }
217
218 let mut child_stdout = process.stdout.take().unwrap();
220 let mut child_stderr = process.stderr.take().unwrap();
221
222 let mut buffer = Vec::new();
223
224 let _ = child_stdout.read_to_end(&mut buffer);
225
226 if !buffer.is_empty() {
227 tracing::info!("{}", String::from_utf8(buffer).unwrap());
228 }
229
230 let mut buffer = Vec::new();
231
232 let _ = child_stderr.read_to_end(&mut buffer);
233
234 if !buffer.is_empty() {
235 tracing::error!("{}", String::from_utf8(buffer).unwrap());
236 }
237
238 drop(conn);
240 if !process_success {
241 anyhow::bail!("Failed while running `underhill_profiler_binary`")
242 }
243
244 Ok(())
245}
246
247#[cfg(test)]
248mod tests {
249 #[test]
250 fn test_parse_meminfo_free() {
251 assert_eq!(
252 super::parse_meminfo_free("MemFree: 1048575 kB").unwrap(),
253 1023
254 );
255 assert_eq!(
256 super::parse_meminfo_free("MemFree:1048576 kB").unwrap(),
257 1024
258 );
259 assert_eq!(
260 super::parse_meminfo_free("MemFree: 1048577 kB").unwrap(),
261 1024
262 );
263 }
264
265 #[test]
266 fn test_parse_meminfo_free_real_data() {
267 let contents = "MemTotal: 32658576 kB\nMemFree: 11105884 kB\nMemAvailable: 26792856 kB\nBuffers: 828448 kB\nCached: 14789464 kB";
268 assert_eq!(super::parse_meminfo_free(contents).unwrap(), 10845);
269 }
270}