1use fs_err::File;
5use fs_err::PathExt;
6use futures::AsyncBufReadExt;
7use futures::AsyncRead;
8use futures::AsyncReadExt;
9use futures::StreamExt;
10use futures::io::BufReader;
11use jiff::Timestamp;
12use kmsg::KmsgParsedEntry;
13use parking_lot::Mutex;
14use std::collections::HashMap;
15use std::io::Write as _;
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19use tracing::Level;
20use tracing::level_filters::LevelFilter;
21use tracing_subscriber::filter::Targets;
22use tracing_subscriber::fmt::MakeWriter;
23use tracing_subscriber::fmt::format::FmtSpan;
24use tracing_subscriber::layer::SubscriberExt;
25use tracing_subscriber::util::SubscriberInitExt;
26
27#[derive(Clone)]
29pub struct PetriLogSource(Arc<LogSourceInner>);
30
31struct LogSourceInner {
32 root_path: PathBuf,
33 json_log: JsonLog,
34 log_files: Mutex<HashMap<String, PetriLogFile>>,
35 attachments: Mutex<HashMap<String, u64>>,
36}
37
38impl PetriLogSource {
39 pub fn log_file(&self, name: &str) -> anyhow::Result<PetriLogFile> {
44 use std::collections::hash_map::Entry;
45
46 let mut log_files = self.0.log_files.lock();
47 let log_file = match log_files.entry(name.to_owned()) {
48 Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
49 Entry::Vacant(vacant_entry) => {
50 let mut path = self.0.root_path.join(name);
51 path.set_extension("log");
55 let file = File::create(&path)?;
56 println!("[[ATTACHMENT|{}]]", path.display());
59 vacant_entry
60 .insert(PetriLogFile(Arc::new(LogFileInner {
61 file,
62 json_log: self.0.json_log.clone(),
63 source: name.to_owned(),
64 })))
65 .clone()
66 }
67 };
68 Ok(log_file)
69 }
70
71 fn attachment_path(&self, name: &str) -> PathBuf {
72 let mut attachments = self.0.attachments.lock();
73 let next = attachments.entry(name.to_owned()).or_default();
74 let name = Path::new(name);
75 let name = if *next == 0 {
76 name
77 } else {
78 let base = name.file_stem().unwrap().to_str().unwrap();
79 let extension = name.extension().unwrap_or_default();
80 &Path::new(&format!("{}_{}", base, *next)).with_extension(extension)
81 };
82 *next += 1;
83 self.0.root_path.join(name)
84 }
85
86 pub fn create_attachment(&self, filename: &str) -> anyhow::Result<File> {
91 let path = self.attachment_path(filename);
92 let file = File::create(&path)?;
93 self.trace_attachment(&path);
94 Ok(file)
95 }
96
97 pub fn write_attachment(
102 &self,
103 filename: &str,
104 data: impl AsRef<[u8]>,
105 ) -> anyhow::Result<PathBuf> {
106 let path = self.attachment_path(filename);
107 fs_err::write(&path, data)?;
108 self.trace_attachment(&path);
109 Ok(path)
110 }
111
112 fn trace_attachment(&self, path: &Path) {
113 self.0
115 .json_log
116 .write_attachment(path.file_name().unwrap().as_ref());
117 println!("[[ATTACHMENT|{}]]", path.display());
118 }
119
120 pub fn log_test_result(&self, name: &str, r: &anyhow::Result<()>) {
122 let result_path = match &r {
123 Ok(()) => {
124 tracing::info!("test passed");
125 "petri.passed"
126 }
127 Err(err) => {
128 tracing::error!(
129 error = err.as_ref() as &dyn std::error::Error,
130 "test failed"
131 );
132 "petri.failed"
133 }
134 };
135 fs_err::write(self.0.root_path.join(result_path), name).unwrap();
138 }
139
140 pub fn output_dir(&self) -> &Path {
142 &self.0.root_path
143 }
144}
145
146#[derive(Clone)]
147struct JsonLog(Arc<File>);
148
149impl JsonLog {
150 fn write_json(&self, v: &impl serde::Serialize) {
151 let v = serde_json::to_vec(v);
152 if let Ok(mut v) = v {
153 v.push(b'\n');
154 let _ = self.0.as_ref().write_all(&v);
156 }
157 }
158
159 fn write_entry(&self, timestamp: Option<Timestamp>, level: Level, source: &str, buf: &[u8]) {
160 #[derive(serde::Serialize)]
161 struct JsonEntry<'a> {
162 timestamp: Timestamp,
163 source: &'a str,
164 severity: &'a str,
165 message: &'a str,
166 }
167 let message = String::from_utf8_lossy(buf);
168 self.write_json(&JsonEntry {
169 timestamp: timestamp.unwrap_or_else(Timestamp::now),
170 source,
171 severity: level.as_str(),
172 message: message.trim_ascii(),
173 });
174 }
175
176 fn write_attachment(&self, path: &Path) {
177 #[derive(serde::Serialize)]
178 struct JsonEntry<'a> {
179 timestamp: Timestamp,
180 attachment: &'a Path,
181 }
182 self.write_json(&JsonEntry {
183 timestamp: Timestamp::now(),
184 attachment: path,
185 });
186 }
187}
188
189struct LogFileInner {
190 file: File,
191 json_log: JsonLog,
192 source: String,
193}
194
195impl LogFileInner {
196 fn write_stdout(&self, buf: &[u8]) {
197 let mut stdout = std::io::stdout().lock();
198 write!(stdout, "[{:>10}] ", self.source).unwrap();
199 stdout.write_all(buf).unwrap();
200 }
201}
202
203struct LogWriter<'a> {
204 inner: &'a LogFileInner,
205 level: Level,
206 timestamp: Option<Timestamp>,
207}
208
209impl std::io::Write for LogWriter<'_> {
210 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
211 self.inner
213 .json_log
214 .write_entry(self.timestamp, self.level, &self.inner.source, buf);
215 let _ = (&self.inner.file).write_all(buf);
217 self.inner.write_stdout(buf);
219 Ok(buf.len())
220 }
221
222 fn flush(&mut self) -> std::io::Result<()> {
223 Ok(())
224 }
225}
226
227#[derive(Clone)]
233pub struct PetriLogFile(Arc<LogFileInner>);
234
235impl PetriLogFile {
236 pub fn write_entry_fmt(
238 &self,
239 timestamp: Option<Timestamp>,
240 level: Level,
241 args: std::fmt::Arguments<'_>,
242 ) {
243 let _ = LogWriter {
246 inner: &self.0,
247 level,
248 timestamp,
249 }
250 .write_all(format!("{}\n", args).as_bytes());
251 }
252
253 pub fn write_entry(&self, message: impl std::fmt::Display) {
255 self.write_entry_fmt(None, Level::INFO, format_args!("{}", message));
256 }
257}
258
259#[macro_export]
261macro_rules! log {
262 ($file:expr, $($arg:tt)*) => {
263 <$crate::PetriLogFile>::write_entry_fmt(&$file, format_args!($($arg)*))
264 };
265}
266
267pub fn try_init_tracing(root_path: &Path) -> anyhow::Result<PetriLogSource> {
276 let targets =
277 if let Ok(var) = std::env::var("OPENVMM_LOG").or_else(|_| std::env::var("HVLITE_LOG")) {
278 var.parse().unwrap()
279 } else {
280 Targets::new().with_default(LevelFilter::DEBUG)
281 };
282
283 let root_path = root_path.fs_err_canonicalize()?;
285 let jsonl = File::create(root_path.join("petri.jsonl"))?;
286 let logger = PetriLogSource(Arc::new(LogSourceInner {
287 json_log: JsonLog(Arc::new(jsonl)),
288 root_path,
289 log_files: Default::default(),
290 attachments: Default::default(),
291 }));
292
293 let petri_log = logger.log_file("petri")?;
294
295 tracing_subscriber::fmt()
296 .compact()
297 .with_ansi(false) .log_internal_errors(true)
299 .with_writer(PetriWriter(petri_log))
300 .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
301 .with_max_level(LevelFilter::TRACE)
302 .finish()
303 .with(targets)
304 .try_init()?;
305
306 Ok(logger)
307}
308
309struct PetriWriter(PetriLogFile);
310
311impl<'a> MakeWriter<'a> for PetriWriter {
312 type Writer = LogWriter<'a>;
313
314 fn make_writer(&'a self) -> Self::Writer {
315 LogWriter {
316 inner: &self.0.0,
317 level: Level::INFO,
318 timestamp: None,
319 }
320 }
321
322 fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer {
323 LogWriter {
324 inner: &self.0.0,
325 level: *meta.level(),
326 timestamp: None,
327 }
328 }
329}
330
331pub async fn log_task(
336 log_file: PetriLogFile,
337 reader: impl AsyncRead + Unpin + Send + 'static,
338 name: &str,
339) -> anyhow::Result<()> {
340 tracing::info!("connected to {name}");
341 let mut buf = Vec::new();
342 let mut reader = BufReader::new(reader);
343 loop {
344 buf.clear();
345 match (&mut reader).take(256).read_until(b'\n', &mut buf).await {
346 Ok(0) => {
347 tracing::info!("disconnected from {name}: EOF");
348 return Ok(());
349 }
350 Err(e) => {
351 tracing::info!("disconnected from {name}: error: {e:#}");
352 return Err(e.into());
353 }
354 _ => {}
355 }
356
357 let string_buf = String::from_utf8_lossy(&buf);
358 let string_buf_trimmed = string_buf.trim_end();
359
360 if let Some(message) = kmsg::SyslogParsedEntry::new(string_buf_trimmed) {
361 let level = kernel_level_to_tracing_level(message.level);
362 log_file.write_entry_fmt(None, level, format_args!("{}", message.display(false)));
363 } else {
364 log_file.write_entry(string_buf_trimmed);
365 }
366 }
367}
368
369fn kernel_level_to_tracing_level(kernel_level: u8) -> Level {
371 match kernel_level {
372 0..=3 => Level::ERROR,
373 4 => Level::WARN,
374 5..=6 => Level::INFO,
375 7 => Level::DEBUG,
376 _ => Level::INFO,
377 }
378}
379
380pub async fn kmsg_log_task(
382 log_file: PetriLogFile,
383 diag_client: diag_client::DiagClient,
384) -> anyhow::Result<()> {
385 loop {
386 diag_client.wait_for_server().await?;
387 let mut kmsg = diag_client.kmsg(true).await?;
388 tracing::info!("kmsg connected");
389 while let Some(data) = kmsg.next().await {
390 match data {
391 Ok(data) => {
392 let message = KmsgParsedEntry::new(&data).unwrap();
393 let level = kernel_level_to_tracing_level(message.level);
394 log_file.write_entry_fmt(
395 None,
396 level,
397 format_args!("{}", message.display(false)),
398 );
399 }
400 Err(err) => {
401 tracing::info!("kmsg disconnected: {err:#}");
402 break;
403 }
404 }
405 }
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412
413 #[test]
414 fn test_kernel_level_to_tracing_level() {
415 assert_eq!(kernel_level_to_tracing_level(0), Level::ERROR);
417 assert_eq!(kernel_level_to_tracing_level(1), Level::ERROR);
418 assert_eq!(kernel_level_to_tracing_level(2), Level::ERROR);
419 assert_eq!(kernel_level_to_tracing_level(3), Level::ERROR);
420
421 assert_eq!(kernel_level_to_tracing_level(4), Level::WARN);
423
424 assert_eq!(kernel_level_to_tracing_level(5), Level::INFO);
426 assert_eq!(kernel_level_to_tracing_level(6), Level::INFO);
427
428 assert_eq!(kernel_level_to_tracing_level(7), Level::DEBUG);
430
431 assert_eq!(kernel_level_to_tracing_level(8), Level::INFO);
433 assert_eq!(kernel_level_to_tracing_level(255), Level::INFO);
434 }
435}