profiler_worker/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! A worker for profiling on VTL2.
5
6#![cfg(target_os = "linux")]
7#![forbid(unsafe_code)]
8
9use anyhow::Context;
10use futures::FutureExt;
11use mesh::MeshPayload;
12use mesh::error::RemoteError;
13use mesh_worker::Worker;
14use mesh_worker::WorkerId;
15use mesh_worker::WorkerRpc;
16use pal_async::DefaultPool;
17use pal_async::driver::Driver;
18use pal_async::timer::PolledTimer;
19use socket2::Socket;
20use std::io::Read;
21use std::os::fd::AsRawFd;
22use std::pin::pin;
23use std::process::Command;
24use std::process::Stdio;
25use std::time::Duration;
26
27/// Minimum memory for profiler to start (10MB)
28/// This is to make sure VTL2 doesn't OOM before the Profiler Memory check can take effect
29const MIN_MEMORY_PROFILER_MB: u64 = 10;
30
31/// Struct for profiler worker to store the request value
32#[derive(Debug, MeshPayload)]
33pub struct ProfilerRequest {
34    /// Profiling duration in seconds
35    pub duration: u64,
36    /// List of profiler arguments to pass in the bin file
37    pub profiler_args: Vec<String>,
38    /// Socket connection where bin file will be written to
39    pub conn: Socket,
40}
41
42/// The worker ID.
43pub const PROFILER_WORKER: WorkerId<ProfilerWorkerParameters> = WorkerId::new("ProfilerWorker");
44
45/// The profiler worker parameter
46#[derive(MeshPayload)]
47pub struct ProfilerWorkerParameters {
48    /// Profiler Request struct
49    pub profiler_request: ProfilerRequest,
50}
51
52/// The profiler worker struct
53pub struct ProfilerWorker {
54    profiler_request: ProfilerRequest,
55}
56
57impl Worker for ProfilerWorker {
58    type Parameters = ProfilerWorkerParameters;
59    type State = ();
60    const ID: WorkerId<Self::Parameters> = WorkerId::new("ProfilerWorker");
61
62    /// Create new worker and store the request
63    fn new(parameters: Self::Parameters) -> anyhow::Result<Self> {
64        Ok(Self {
65            profiler_request: parameters.profiler_request,
66        })
67    }
68
69    /// Profiler worker is run per Profile request so there is no need for restart
70    fn restart(_state: Self::State) -> anyhow::Result<Self> {
71        unimplemented!()
72    }
73
74    /// Run profiler worker and start a profiling session
75    fn run(self, mut recv: mesh::Receiver<WorkerRpc<Self::State>>) -> anyhow::Result<()> {
76        DefaultPool::run_with(async |driver| {
77            let mut profiling = pin!(profile(self.profiler_request, &driver).fuse());
78            loop {
79                let msg = futures::select! { // merge semantics
80                    msg = recv.recv().fuse() => {
81                        msg
82                    },
83                    r = profiling => {
84                        match r {
85                            Ok(_) => {
86                                break
87                            },
88                            Err(err) => {
89                                anyhow::bail!("Profiling failed - Error {}", err.to_string());
90                            }
91                        }
92                    }
93                };
94                match msg {
95                    Ok(message) => match message {
96                        WorkerRpc::Stop => {
97                            break;
98                        }
99                        WorkerRpc::Restart(rpc) => {
100                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!("not supported"))));
101                        }
102                        WorkerRpc::Inspect(_deferred) => {}
103                    },
104                    Err(err) => {
105                        anyhow::bail!("ProfilerWorker::Run - Error {}", err.to_string());
106                    }
107                }
108            }
109            Ok(())
110        })
111    }
112}
113
114/// Get current free memory in MB
115fn get_free_mem_mb() -> anyhow::Result<u64> {
116    parse_meminfo_free(&fs_err::read_to_string("/proc/meminfo")?)
117}
118
119fn parse_meminfo_free(contents: &str) -> anyhow::Result<u64> {
120    const KBYTES_PER_MBYTES: u64 = 1024;
121    for line in contents.lines() {
122        let Some((name, rest)) = line.split_once(':') else {
123            continue;
124        };
125        if name == "MemFree" {
126            let value = rest
127                .split_ascii_whitespace()
128                .next()
129                .context("line had no value")?;
130            let value = value.parse::<u64>().context("value failed to parse")?;
131            return Ok(value / KBYTES_PER_MBYTES);
132        }
133    }
134
135    Err(anyhow::anyhow!("no memfree line found"))
136}
137
138/// Profiling function for the worker
139pub async fn profile(request: ProfilerRequest, driver: &impl Driver) -> anyhow::Result<()> {
140    let mut timer = PolledTimer::new(driver);
141    let ProfilerRequest {
142        conn,
143        duration,
144        mut profiler_args,
145    } = request;
146
147    // Set CLOEXEC to false because we need to share FD with child process
148    conn.set_cloexec(false)
149        .map_err(anyhow::Error::from)
150        .context("Failed to set CLO_EXEC to Socket")?;
151
152    let socket_fd = conn.as_raw_fd();
153    let free_mem_mb = match get_free_mem_mb() {
154        Ok(m) => m,
155        Err(e) => {
156            tracing::error!(
157                e = e.as_ref() as &dyn std::error::Error,
158                "Error when getting memory"
159            );
160            0
161        }
162    };
163
164    if free_mem_mb < MIN_MEMORY_PROFILER_MB {
165        anyhow::bail!("Not enough memory to start profiler {} MB", free_mem_mb);
166    }
167
168    // Limit memory to 75% of free memory
169    profiler_args.push(format!("LimitMB:{}", free_mem_mb * 75 / 100));
170
171    let mut process = Command::new("/usr/bin/underhill_profiler_binary")
172        .arg(duration.to_string())
173        .arg(socket_fd.to_string())
174        .args(profiler_args)
175        .stdout(Stdio::piped())
176        .stderr(Stdio::piped())
177        .spawn()
178        .context("failed to execute process")?;
179
180    let mut process_success = false;
181
182    // Sleep for duration+1s so the child process can finish naturally
183    timer.sleep(Duration::from_secs(duration + 1)).await;
184
185    // Checking if child process finished every 1s for 15s
186    // This is a failsafe in case child process doesn't exit and run
187    // forever (which shouldn't happen unless something went wrong)
188    for wait_time in 1..=15 {
189        match process.try_wait() {
190            Ok(Some(_status)) => {
191                process_success = true;
192                break;
193            }
194            Ok(None) => {
195                if wait_time == 15 {
196                    tracing::error!("Running profiler binary timeout");
197                    if let Err(e) = process.kill() {
198                        tracing::error!(
199                            e = &e as &dyn std::error::Error,
200                            "Error when stopping child process"
201                        );
202                    }
203                    process_success = false;
204                }
205            }
206            Err(e) => {
207                process_success = false;
208                tracing::error!(
209                    e = &e as &dyn std::error::Error,
210                    "Running profiler binary failed",
211                );
212                break;
213            }
214        }
215        timer.sleep(Duration::from_secs(1)).await;
216    }
217
218    // Get Stdout and Stderr content
219    let mut child_stdout = process.stdout.take().unwrap();
220    let mut child_stderr = process.stderr.take().unwrap();
221
222    let mut buffer = Vec::new();
223
224    let _ = child_stdout.read_to_end(&mut buffer);
225
226    if !buffer.is_empty() {
227        tracing::info!("{}", String::from_utf8(buffer).unwrap());
228    }
229
230    let mut buffer = Vec::new();
231
232    let _ = child_stderr.read_to_end(&mut buffer);
233
234    if !buffer.is_empty() {
235        tracing::error!("{}", String::from_utf8(buffer).unwrap());
236    }
237
238    // Drop socket no matter child process succeeded or not
239    drop(conn);
240    if !process_success {
241        anyhow::bail!("Failed while running `underhill_profiler_binary`")
242    }
243
244    Ok(())
245}
246
247#[cfg(test)]
248mod tests {
249    #[test]
250    fn test_parse_meminfo_free() {
251        assert_eq!(
252            super::parse_meminfo_free("MemFree: 1048575 kB").unwrap(),
253            1023
254        );
255        assert_eq!(
256            super::parse_meminfo_free("MemFree:1048576 kB").unwrap(),
257            1024
258        );
259        assert_eq!(
260            super::parse_meminfo_free("MemFree:  1048577 kB").unwrap(),
261            1024
262        );
263    }
264
265    #[test]
266    fn test_parse_meminfo_free_real_data() {
267        let contents = "MemTotal:       32658576 kB\nMemFree:        11105884 kB\nMemAvailable:   26792856 kB\nBuffers:          828448 kB\nCached:         14789464 kB";
268        assert_eq!(super::parse_meminfo_free(contents).unwrap(), 10845);
269    }
270}