1use fs_err::File;
5use fs_err::PathExt;
6use futures::AsyncBufReadExt;
7use futures::AsyncRead;
8use futures::AsyncReadExt;
9use futures::StreamExt;
10use futures::io::BufReader;
11use jiff::Timestamp;
12use kmsg::KmsgParsedEntry;
13use parking_lot::Mutex;
14use std::collections::HashMap;
15use std::io::Write as _;
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19use tracing::Level;
20use tracing::level_filters::LevelFilter;
21use tracing_subscriber::filter::Targets;
22use tracing_subscriber::fmt::MakeWriter;
23use tracing_subscriber::fmt::format::FmtSpan;
24use tracing_subscriber::layer::SubscriberExt;
25use tracing_subscriber::util::SubscriberInitExt;
26
27#[derive(Clone)]
29pub struct PetriLogSource(Arc<LogSourceInner>);
30
31struct LogSourceInner {
32 root_path: PathBuf,
33 json_log: JsonLog,
34 log_files: Mutex<HashMap<String, PetriLogFile>>,
35 attachments: Mutex<HashMap<String, u64>>,
36}
37
38impl PetriLogSource {
39 pub fn log_file(&self, name: &str) -> anyhow::Result<PetriLogFile> {
44 use std::collections::hash_map::Entry;
45
46 let mut log_files = self.0.log_files.lock();
47 let log_file = match log_files.entry(name.to_owned()) {
48 Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
49 Entry::Vacant(vacant_entry) => {
50 let mut path = self.0.root_path.join(name);
51 path.set_extension("log");
55 let file = File::create(&path)?;
56 println!("[[ATTACHMENT|{}]]", path.display());
59 vacant_entry
60 .insert(PetriLogFile(Arc::new(LogFileInner {
61 file,
62 json_log: self.0.json_log.clone(),
63 source: name.to_owned(),
64 })))
65 .clone()
66 }
67 };
68 Ok(log_file)
69 }
70
71 fn attachment_path(&self, name: &str) -> PathBuf {
72 let mut attachments = self.0.attachments.lock();
73 let next = attachments.entry(name.to_owned()).or_default();
74 let name = Path::new(name);
75 let name = if *next == 0 {
76 name
77 } else {
78 let base = name.file_stem().unwrap().to_str().unwrap();
79 let extension = name.extension().unwrap_or_default();
80 &Path::new(&format!("{}_{}", base, *next)).with_extension(extension)
81 };
82 *next += 1;
83 self.0.root_path.join(name)
84 }
85
86 pub fn create_attachment(&self, filename: &str) -> anyhow::Result<File> {
91 let path = self.attachment_path(filename);
92 let file = File::create(&path)?;
93 self.trace_attachment(&path);
94 Ok(file)
95 }
96
97 pub fn write_attachment(
102 &self,
103 filename: &str,
104 data: impl AsRef<[u8]>,
105 ) -> anyhow::Result<PathBuf> {
106 let path = self.attachment_path(filename);
107 fs_err::write(&path, data)?;
108 self.trace_attachment(&path);
109 Ok(path)
110 }
111
112 fn trace_attachment(&self, path: &Path) {
113 self.0
115 .json_log
116 .write_attachment(path.file_name().unwrap().as_ref());
117 println!("[[ATTACHMENT|{}]]", path.display());
118 }
119}
120
121#[derive(Clone)]
122struct JsonLog(Arc<File>);
123
124impl JsonLog {
125 fn write_json(&self, v: &impl serde::Serialize) {
126 let v = serde_json::to_vec(v);
127 if let Ok(mut v) = v {
128 v.push(b'\n');
129 let _ = self.0.as_ref().write_all(&v);
131 }
132 }
133
134 fn write_entry(&self, timestamp: Option<Timestamp>, level: Level, source: &str, buf: &[u8]) {
135 #[derive(serde::Serialize)]
136 struct JsonEntry<'a> {
137 timestamp: Timestamp,
138 source: &'a str,
139 severity: &'a str,
140 message: &'a str,
141 }
142 let message = String::from_utf8_lossy(buf);
143 self.write_json(&JsonEntry {
144 timestamp: timestamp.unwrap_or_else(Timestamp::now),
145 source,
146 severity: level.as_str(),
147 message: message.trim_ascii(),
148 });
149 }
150
151 fn write_attachment(&self, path: &Path) {
152 #[derive(serde::Serialize)]
153 struct JsonEntry<'a> {
154 timestamp: Timestamp,
155 attachment: &'a Path,
156 }
157 self.write_json(&JsonEntry {
158 timestamp: Timestamp::now(),
159 attachment: path,
160 });
161 }
162}
163
164struct LogFileInner {
165 file: File,
166 json_log: JsonLog,
167 source: String,
168}
169
170impl LogFileInner {
171 fn write_stdout(&self, buf: &[u8]) {
172 let mut stdout = std::io::stdout().lock();
173 write!(stdout, "[{:>10}] ", self.source).unwrap();
174 stdout.write_all(buf).unwrap();
175 }
176}
177
178struct LogWriter<'a> {
179 inner: &'a LogFileInner,
180 level: Level,
181 timestamp: Option<Timestamp>,
182}
183
184impl std::io::Write for LogWriter<'_> {
185 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
186 self.inner
188 .json_log
189 .write_entry(self.timestamp, self.level, &self.inner.source, buf);
190 let _ = (&self.inner.file).write_all(buf);
192 self.inner.write_stdout(buf);
194 Ok(buf.len())
195 }
196
197 fn flush(&mut self) -> std::io::Result<()> {
198 Ok(())
199 }
200}
201
202#[derive(Clone)]
208pub struct PetriLogFile(Arc<LogFileInner>);
209
210impl PetriLogFile {
211 pub fn write_entry_fmt(
213 &self,
214 timestamp: Option<Timestamp>,
215 level: Level,
216 args: std::fmt::Arguments<'_>,
217 ) {
218 let _ = LogWriter {
221 inner: &self.0,
222 level,
223 timestamp,
224 }
225 .write_all(format!("{}\n", args).as_bytes());
226 }
227
228 pub fn write_entry(&self, message: impl std::fmt::Display) {
230 self.write_entry_fmt(None, Level::INFO, format_args!("{}", message));
231 }
232}
233
234#[macro_export]
236macro_rules! log {
237 ($file:expr, $($arg:tt)*) => {
238 <$crate::PetriLogFile>::write_entry_fmt(&$file, format_args!($($arg)*))
239 };
240}
241
242pub fn try_init_tracing(root_path: &Path) -> anyhow::Result<PetriLogSource> {
251 let targets =
252 if let Ok(var) = std::env::var("OPENVMM_LOG").or_else(|_| std::env::var("HVLITE_LOG")) {
253 var.parse().unwrap()
254 } else {
255 Targets::new().with_default(LevelFilter::DEBUG)
256 };
257
258 let root_path = root_path.fs_err_canonicalize()?;
260 let jsonl = File::create(root_path.join("petri.jsonl"))?;
261 let logger = PetriLogSource(Arc::new(LogSourceInner {
262 json_log: JsonLog(Arc::new(jsonl)),
263 root_path,
264 log_files: Default::default(),
265 attachments: Default::default(),
266 }));
267
268 let petri_log = logger.log_file("petri")?;
269
270 tracing_subscriber::fmt()
271 .compact()
272 .with_ansi(false) .log_internal_errors(true)
274 .with_writer(PetriWriter(petri_log))
275 .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
276 .with_max_level(LevelFilter::TRACE)
277 .finish()
278 .with(targets)
279 .try_init()?;
280
281 Ok(logger)
282}
283
284struct PetriWriter(PetriLogFile);
285
286impl<'a> MakeWriter<'a> for PetriWriter {
287 type Writer = LogWriter<'a>;
288
289 fn make_writer(&'a self) -> Self::Writer {
290 LogWriter {
291 inner: &self.0.0,
292 level: Level::INFO,
293 timestamp: None,
294 }
295 }
296
297 fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer {
298 LogWriter {
299 inner: &self.0.0,
300 level: *meta.level(),
301 timestamp: None,
302 }
303 }
304}
305
306pub async fn log_task(
311 log_file: PetriLogFile,
312 reader: impl AsyncRead + Unpin + Send + 'static,
313 name: &str,
314) -> anyhow::Result<()> {
315 tracing::info!("connected to {name}");
316 let mut buf = Vec::new();
317 let mut reader = BufReader::new(reader);
318 loop {
319 buf.clear();
320 match (&mut reader).take(256).read_until(b'\n', &mut buf).await {
321 Ok(0) => {
322 tracing::info!("disconnected from {name}: EOF");
323 return Ok(());
324 }
325 Err(e) => {
326 tracing::info!("disconnected from {name}: error: {e:#}");
327 return Err(e.into());
328 }
329 _ => {}
330 }
331
332 let string_buf = String::from_utf8_lossy(&buf);
333 let string_buf_trimmed = string_buf.trim_end();
334
335 if let Some(message) = kmsg::SyslogParsedEntry::new(string_buf_trimmed) {
336 let level = kernel_level_to_tracing_level(message.level);
337 log_file.write_entry_fmt(None, level, format_args!("{}", message.display(false)));
338 } else {
339 log_file.write_entry(string_buf_trimmed);
340 }
341 }
342}
343
344fn kernel_level_to_tracing_level(kernel_level: u8) -> Level {
346 match kernel_level {
347 0..=3 => Level::ERROR,
348 4 => Level::WARN,
349 5..=6 => Level::INFO,
350 7 => Level::DEBUG,
351 _ => Level::INFO,
352 }
353}
354
355pub async fn kmsg_log_task(
357 log_file: PetriLogFile,
358 diag_client: diag_client::DiagClient,
359) -> anyhow::Result<()> {
360 loop {
361 diag_client.wait_for_server().await?;
362 let mut kmsg = diag_client.kmsg(true).await?;
363 tracing::info!("kmsg connected");
364 while let Some(data) = kmsg.next().await {
365 match data {
366 Ok(data) => {
367 let message = KmsgParsedEntry::new(&data).unwrap();
368 let level = kernel_level_to_tracing_level(message.level);
369 log_file.write_entry_fmt(
370 None,
371 level,
372 format_args!("{}", message.display(false)),
373 );
374 }
375 Err(err) => {
376 tracing::info!("kmsg disconnected: {err:#}");
377 break;
378 }
379 }
380 }
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 #[test]
389 fn test_kernel_level_to_tracing_level() {
390 assert_eq!(kernel_level_to_tracing_level(0), Level::ERROR);
392 assert_eq!(kernel_level_to_tracing_level(1), Level::ERROR);
393 assert_eq!(kernel_level_to_tracing_level(2), Level::ERROR);
394 assert_eq!(kernel_level_to_tracing_level(3), Level::ERROR);
395
396 assert_eq!(kernel_level_to_tracing_level(4), Level::WARN);
398
399 assert_eq!(kernel_level_to_tracing_level(5), Level::INFO);
401 assert_eq!(kernel_level_to_tracing_level(6), Level::INFO);
402
403 assert_eq!(kernel_level_to_tracing_level(7), Level::DEBUG);
405
406 assert_eq!(kernel_level_to_tracing_level(8), Level::INFO);
408 assert_eq!(kernel_level_to_tracing_level(255), Level::INFO);
409 }
410}