profiler_worker/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! A worker for profiling on VTL2.

#![cfg(target_os = "linux")]
#![forbid(unsafe_code)]

use anyhow::Context;
use futures::FutureExt;
use mesh::MeshPayload;
use mesh::error::RemoteError;
use mesh_worker::Worker;
use mesh_worker::WorkerId;
use mesh_worker::WorkerRpc;
use pal_async::DefaultPool;
use pal_async::driver::Driver;
use pal_async::timer::PolledTimer;
use socket2::Socket;
use std::io::Read;
use std::os::fd::AsRawFd;
use std::pin::pin;
use std::process::Command;
use std::process::Stdio;
use std::time::Duration;

/// Minimum memory for profiler to start (10MB)
/// This is to make sure VTL2 doesn't OOM before the Profiler Memory check can take effect
const MIN_MEMORY_PROFILER_MB: u64 = 10;

/// Struct for profiler worker to store the request value
#[derive(Debug, MeshPayload)]
pub struct ProfilerRequest {
    /// Profiling duration in seconds
    pub duration: u64,
    /// List of profiler arguments to pass in the bin file
    pub profiler_args: Vec<String>,
    /// Socket connection where bin file will be written to
    pub conn: Socket,
}

/// The worker ID.
pub const PROFILER_WORKER: WorkerId<ProfilerWorkerParameters> = WorkerId::new("ProfilerWorker");

/// The profiler worker parameter
#[derive(MeshPayload)]
pub struct ProfilerWorkerParameters {
    /// Profiler Request struct
    pub profiler_request: ProfilerRequest,
}

/// The profiler worker struct
pub struct ProfilerWorker {
    profiler_request: ProfilerRequest,
}

impl Worker for ProfilerWorker {
    type Parameters = ProfilerWorkerParameters;
    type State = ();
    const ID: WorkerId<Self::Parameters> = WorkerId::new("ProfilerWorker");

    /// Create new worker and store the request
    fn new(parameters: Self::Parameters) -> anyhow::Result<Self> {
        Ok(Self {
            profiler_request: parameters.profiler_request,
        })
    }

    /// Profiler worker is run per Profile request so there is no need for restart
    fn restart(_state: Self::State) -> anyhow::Result<Self> {
        unimplemented!()
    }

    /// Run profiler worker and start a profiling session
    fn run(self, mut recv: mesh::Receiver<WorkerRpc<Self::State>>) -> anyhow::Result<()> {
        DefaultPool::run_with(async |driver| {
            let mut profiling = pin!(profile(self.profiler_request, &driver).fuse());
            loop {
                let msg = futures::select! { // merge semantics
                    msg = recv.recv().fuse() => {
                        msg
                    },
                    r = profiling => {
                        match r {
                            Ok(_) => {
                                break
                            },
                            Err(err) => {
                                anyhow::bail!("Profiling failed - Error {}", err.to_string());
                            }
                        }
                    }
                };
                match msg {
                    Ok(message) => match message {
                        WorkerRpc::Stop => {
                            break;
                        }
                        WorkerRpc::Restart(rpc) => {
                            rpc.complete(Err(RemoteError::new(anyhow::anyhow!("not supported"))));
                        }
                        WorkerRpc::Inspect(_deferred) => {}
                    },
                    Err(err) => {
                        anyhow::bail!("ProfilerWorker::Run - Error {}", err.to_string());
                    }
                }
            }
            Ok(())
        })
    }
}

/// Get current free memory in MB
fn get_free_mem_mb() -> anyhow::Result<u64> {
    parse_meminfo_free(&fs_err::read_to_string("/proc/meminfo")?)
}

fn parse_meminfo_free(contents: &str) -> anyhow::Result<u64> {
    const KBYTES_PER_MBYTES: u64 = 1024;
    for line in contents.lines() {
        let Some((name, rest)) = line.split_once(':') else {
            continue;
        };
        if name == "MemFree" {
            let value = rest
                .split_ascii_whitespace()
                .next()
                .context("line had no value")?;
            let value = value.parse::<u64>().context("value failed to parse")?;
            return Ok(value / KBYTES_PER_MBYTES);
        }
    }

    Err(anyhow::anyhow!("no memfree line found"))
}

/// Profiling function for the worker
pub async fn profile(request: ProfilerRequest, driver: &impl Driver) -> anyhow::Result<()> {
    let mut timer = PolledTimer::new(driver);
    let ProfilerRequest {
        conn,
        duration,
        mut profiler_args,
    } = request;

    // Set CLOEXEC to false because we need to share FD with child process
    conn.set_cloexec(false)
        .map_err(anyhow::Error::from)
        .context("Failed to set CLO_EXEC to Socket")?;

    let socket_fd = conn.as_raw_fd();
    let free_mem_mb = match get_free_mem_mb() {
        Ok(m) => m,
        Err(e) => {
            tracing::error!("Error when getting memory {}", e.to_string());
            0
        }
    };

    if free_mem_mb < MIN_MEMORY_PROFILER_MB {
        anyhow::bail!("Not enough memory to start profiler {} MB", free_mem_mb);
    }

    // Limit memory to 75% of free memory
    profiler_args.push(format!("LimitMB:{}", free_mem_mb * 75 / 100));

    let mut process = Command::new("/usr/bin/underhill_profiler_binary")
        .arg(duration.to_string())
        .arg(socket_fd.to_string())
        .args(profiler_args)
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .context("failed to execute process")?;

    let mut process_success = false;

    // Sleep for duration+1s so the child process can finish naturally
    timer.sleep(Duration::from_secs(duration + 1)).await;

    // Checking if child process finished every 1s for 15s
    // This is a failsafe in case child process doesn't exit and run
    // forever (which shouldn't happen unless something went wrong)
    for wait_time in 1..=15 {
        match process.try_wait() {
            Ok(Some(_status)) => {
                process_success = true;
                break;
            }
            Ok(None) => {
                if wait_time == 15 {
                    tracing::error!("Running profiler binary timeout");
                    if let Err(e) = process.kill() {
                        tracing::error!(
                            e = &e as &dyn std::error::Error,
                            "Error when stopping child process"
                        );
                    }
                    process_success = false;
                }
            }
            Err(e) => {
                process_success = false;
                tracing::error!(
                    "Running profiler binary failed with error {}",
                    e.to_string()
                );
                break;
            }
        }
        timer.sleep(Duration::from_secs(1)).await;
    }

    // Get Stdout and Stderr content
    let mut child_stdout = process.stdout.take().unwrap();
    let mut child_stderr = process.stderr.take().unwrap();

    let mut buffer = Vec::new();

    let _ = child_stdout.read_to_end(&mut buffer);

    if !buffer.is_empty() {
        tracing::info!("{}", String::from_utf8(buffer).unwrap());
    }

    let mut buffer = Vec::new();

    let _ = child_stderr.read_to_end(&mut buffer);

    if !buffer.is_empty() {
        tracing::error!("{}", String::from_utf8(buffer).unwrap());
    }

    // Drop socket no matter child process succeeded or not
    drop(conn);
    if !process_success {
        anyhow::bail!("Failed while running `underhill_profiler_binary`")
    }

    Ok(())
}

#[cfg(test)]
mod tests {
    #[test]
    fn test_parse_meminfo_free() {
        assert_eq!(
            super::parse_meminfo_free("MemFree: 1048575 kB").unwrap(),
            1023
        );
        assert_eq!(
            super::parse_meminfo_free("MemFree:1048576 kB").unwrap(),
            1024
        );
        assert_eq!(
            super::parse_meminfo_free("MemFree:  1048577 kB").unwrap(),
            1024
        );
    }

    #[test]
    fn test_parse_meminfo_free_real_data() {
        let contents = "MemTotal:       32658576 kB\nMemFree:        11105884 kB\nMemAvailable:   26792856 kB\nBuffers:          828448 kB\nCached:         14789464 kB";
        assert_eq!(super::parse_meminfo_free(contents).unwrap(), 10845);
    }
}