1use fs_err::File;
5use fs_err::PathExt;
6use futures::AsyncBufReadExt;
7use futures::AsyncRead;
8use futures::AsyncReadExt;
9use futures::StreamExt;
10use futures::io::BufReader;
11use jiff::Timestamp;
12use kmsg::KmsgParsedEntry;
13use parking_lot::Mutex;
14use std::collections::HashMap;
15use std::io::Read;
16use std::io::Write as _;
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::Arc;
20use tracing::Level;
21use tracing::level_filters::LevelFilter;
22use tracing_subscriber::filter::Targets;
23use tracing_subscriber::fmt::MakeWriter;
24use tracing_subscriber::fmt::format::FmtSpan;
25use tracing_subscriber::layer::SubscriberExt;
26use tracing_subscriber::util::SubscriberInitExt;
27
28#[derive(Clone, Debug)]
30pub struct PetriLogSource(Arc<LogSourceInner>);
31
32#[derive(Debug)]
33struct LogSourceInner {
34 root_path: PathBuf,
35 json_log: JsonLog,
36 log_files: Mutex<HashMap<String, PetriLogFile>>,
37 attachments: Mutex<HashMap<String, u64>>,
38}
39
40impl PetriLogSource {
41 pub fn log_file(&self, name: &str) -> anyhow::Result<PetriLogFile> {
46 use std::collections::hash_map::Entry;
47
48 let mut log_files = self.0.log_files.lock();
49 let log_file = match log_files.entry(name.to_owned()) {
50 Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
51 Entry::Vacant(vacant_entry) => {
52 let mut path = self.0.root_path.join(name);
53 path.set_extension("log");
57 let file = File::create(&path)?;
58 println!("[[ATTACHMENT|{}]]", path.display());
61 vacant_entry
62 .insert(PetriLogFile(Arc::new(LogFileInner {
63 file,
64 json_log: self.0.json_log.clone(),
65 source: name.to_owned(),
66 })))
67 .clone()
68 }
69 };
70 Ok(log_file)
71 }
72
73 fn attachment_path(&self, name: &str) -> PathBuf {
74 let mut attachments = self.0.attachments.lock();
75 let next = attachments.entry(name.to_owned()).or_default();
76 let name = Path::new(name);
77 let name = if *next == 0 {
78 name
79 } else {
80 let base = name.file_stem().unwrap().to_str().unwrap();
81 let extension = name.extension().unwrap_or_default();
82 &Path::new(&format!("{}_{}", base, *next)).with_extension(extension)
83 };
84 *next += 1;
85 self.0.root_path.join(name)
86 }
87
88 pub fn create_attachment(&self, filename: &str) -> anyhow::Result<File> {
93 let path = self.attachment_path(filename);
94 let file = File::create(&path)?;
95 self.trace_attachment(&path);
96 Ok(file)
97 }
98
99 pub fn write_attachment(&self, filename: &str, mut data: impl Read) -> anyhow::Result<PathBuf> {
104 let path = self.attachment_path(filename);
105 let mut file = File::create(&path)?;
106 std::io::copy(&mut data, &mut file)?;
107 self.trace_attachment(&path);
108 Ok(path)
109 }
110
111 pub fn copy_attachment(
116 &self,
117 attachment_filename: &str,
118 source_path: &Path,
119 ) -> anyhow::Result<()> {
120 let dest_path = self.attachment_path(attachment_filename);
121 fs_err::copy(source_path, &dest_path)?;
122 self.trace_attachment(&dest_path);
123 Ok(())
124 }
125
126 fn trace_attachment(&self, path: &Path) {
127 self.0
129 .json_log
130 .write_attachment(path.file_name().unwrap().as_ref());
131 println!("[[ATTACHMENT|{}]]", path.display());
132 }
133
134 pub fn log_test_result(&self, name: &str, r: &anyhow::Result<()>) {
136 let result_path = match &r {
137 Ok(()) => {
138 tracing::info!("test passed");
139 "petri.passed"
140 }
141 Err(err) => {
142 tracing::error!(
143 error = err.as_ref() as &dyn std::error::Error,
144 "test failed"
145 );
146 "petri.failed"
147 }
148 };
149 fs_err::write(self.0.root_path.join(result_path), name).unwrap();
152 }
153
154 pub fn output_dir(&self) -> &Path {
156 &self.0.root_path
157 }
158}
159
160#[derive(Clone, Debug)]
161struct JsonLog(Arc<File>);
162
163impl JsonLog {
164 fn write_json(&self, v: &impl serde::Serialize) {
165 let v = serde_json::to_vec(v);
166 if let Ok(mut v) = v {
167 v.push(b'\n');
168 let _ = self.0.as_ref().write_all(&v);
170 }
171 }
172
173 fn write_entry(&self, timestamp: Option<Timestamp>, level: Level, source: &str, buf: &[u8]) {
174 #[derive(serde::Serialize)]
175 struct JsonEntry<'a> {
176 timestamp: Timestamp,
177 source: &'a str,
178 severity: &'a str,
179 message: &'a str,
180 }
181 let message = String::from_utf8_lossy(buf);
182 self.write_json(&JsonEntry {
183 timestamp: timestamp.unwrap_or_else(Timestamp::now),
184 source,
185 severity: level.as_str(),
186 message: message.trim_ascii(),
187 });
188 }
189
190 fn write_attachment(&self, path: &Path) {
191 #[derive(serde::Serialize)]
192 struct JsonEntry<'a> {
193 timestamp: Timestamp,
194 attachment: &'a Path,
195 }
196 self.write_json(&JsonEntry {
197 timestamp: Timestamp::now(),
198 attachment: path,
199 });
200 }
201}
202
203#[derive(Debug)]
204struct LogFileInner {
205 file: File,
206 json_log: JsonLog,
207 source: String,
208}
209
210impl LogFileInner {
211 fn write_stdout(&self, buf: &[u8]) {
212 let mut stdout = std::io::stdout().lock();
213 write!(stdout, "[{:>10}] ", self.source).unwrap();
214 stdout.write_all(buf).unwrap();
215 }
216}
217
218struct LogWriter<'a> {
219 inner: &'a LogFileInner,
220 level: Level,
221 timestamp: Option<Timestamp>,
222}
223
224impl std::io::Write for LogWriter<'_> {
225 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
226 self.inner
228 .json_log
229 .write_entry(self.timestamp, self.level, &self.inner.source, buf);
230 let _ = (&self.inner.file).write_all(buf);
232 self.inner.write_stdout(buf);
234 Ok(buf.len())
235 }
236
237 fn flush(&mut self) -> std::io::Result<()> {
238 Ok(())
239 }
240}
241
242#[derive(Clone, Debug)]
248pub struct PetriLogFile(Arc<LogFileInner>);
249
250impl PetriLogFile {
251 pub fn write_entry_fmt(
253 &self,
254 timestamp: Option<Timestamp>,
255 level: Level,
256 args: std::fmt::Arguments<'_>,
257 ) {
258 let _ = LogWriter {
261 inner: &self.0,
262 level,
263 timestamp,
264 }
265 .write_all(format!("{}\n", args).as_bytes());
266 }
267
268 pub fn write_entry(&self, message: impl std::fmt::Display) {
270 self.write_entry_fmt(None, Level::INFO, format_args!("{}", message));
271 }
272}
273
274#[macro_export]
276macro_rules! log {
277 ($file:expr, $($arg:tt)*) => {
278 <$crate::PetriLogFile>::write_entry_fmt(&$file, format_args!($($arg)*))
279 };
280}
281
282pub fn try_init_tracing(root_path: &Path) -> anyhow::Result<PetriLogSource> {
291 let targets =
292 if let Ok(var) = std::env::var("OPENVMM_LOG").or_else(|_| std::env::var("HVLITE_LOG")) {
293 var.parse().unwrap()
294 } else {
295 Targets::new().with_default(LevelFilter::DEBUG)
296 };
297
298 let root_path = root_path.fs_err_canonicalize()?;
300 let jsonl = File::create(root_path.join("petri.jsonl"))?;
301 let logger = PetriLogSource(Arc::new(LogSourceInner {
302 json_log: JsonLog(Arc::new(jsonl)),
303 root_path,
304 log_files: Default::default(),
305 attachments: Default::default(),
306 }));
307
308 let petri_log = logger.log_file("petri")?;
309
310 tracing_subscriber::fmt()
311 .compact()
312 .with_ansi(false) .log_internal_errors(true)
314 .with_writer(PetriWriter(petri_log))
315 .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
316 .with_max_level(LevelFilter::TRACE)
317 .finish()
318 .with(targets)
319 .try_init()?;
320
321 Ok(logger)
322}
323
324struct PetriWriter(PetriLogFile);
325
326impl<'a> MakeWriter<'a> for PetriWriter {
327 type Writer = LogWriter<'a>;
328
329 fn make_writer(&'a self) -> Self::Writer {
330 LogWriter {
331 inner: &self.0.0,
332 level: Level::INFO,
333 timestamp: None,
334 }
335 }
336
337 fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer {
338 LogWriter {
339 inner: &self.0.0,
340 level: *meta.level(),
341 timestamp: None,
342 }
343 }
344}
345
346pub async fn log_task(
351 log_file: PetriLogFile,
352 reader: impl AsyncRead + Unpin + Send + 'static,
353 name: &str,
354) -> anyhow::Result<()> {
355 tracing::info!("connected to {name}");
356 let mut buf = Vec::new();
357 let mut reader = BufReader::new(reader);
358 loop {
359 buf.clear();
360 match (&mut reader).take(256).read_until(b'\n', &mut buf).await {
361 Ok(0) => {
362 tracing::info!("disconnected from {name}: EOF");
363 return Ok(());
364 }
365 Err(e) => {
366 tracing::info!("disconnected from {name}: error: {e:#}");
367 return Err(e.into());
368 }
369 _ => {}
370 }
371
372 let string_buf = String::from_utf8_lossy(&buf);
373 let string_buf_trimmed = string_buf.trim_end();
374
375 if let Some(message) = kmsg::SyslogParsedEntry::new(string_buf_trimmed) {
376 let level = kernel_level_to_tracing_level(message.level);
377 log_file.write_entry_fmt(None, level, format_args!("{}", message.display(false)));
378 } else {
379 log_file.write_entry(string_buf_trimmed);
380 }
381 }
382}
383
384fn kernel_level_to_tracing_level(kernel_level: u8) -> Level {
386 match kernel_level {
387 0..=3 => Level::ERROR,
388 4 => Level::WARN,
389 5..=6 => Level::INFO,
390 7 => Level::DEBUG,
391 _ => Level::INFO,
392 }
393}
394
395pub async fn kmsg_log_task(
397 log_file: PetriLogFile,
398 diag_client: diag_client::DiagClient,
399) -> anyhow::Result<()> {
400 loop {
401 diag_client.wait_for_server().await?;
402 let mut kmsg = diag_client.kmsg(true).await?;
403 tracing::info!("kmsg connected");
404 while let Some(data) = kmsg.next().await {
405 match data {
406 Ok(data) => {
407 let message = KmsgParsedEntry::new(&data).unwrap();
408 let level = kernel_level_to_tracing_level(message.level);
409 log_file.write_entry_fmt(
410 None,
411 level,
412 format_args!("{}", message.display(false)),
413 );
414 }
415 Err(err) => {
416 tracing::info!("kmsg disconnected: {err:#}");
417 break;
418 }
419 }
420 }
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427
428 #[test]
429 fn test_kernel_level_to_tracing_level() {
430 assert_eq!(kernel_level_to_tracing_level(0), Level::ERROR);
432 assert_eq!(kernel_level_to_tracing_level(1), Level::ERROR);
433 assert_eq!(kernel_level_to_tracing_level(2), Level::ERROR);
434 assert_eq!(kernel_level_to_tracing_level(3), Level::ERROR);
435
436 assert_eq!(kernel_level_to_tracing_level(4), Level::WARN);
438
439 assert_eq!(kernel_level_to_tracing_level(5), Level::INFO);
441 assert_eq!(kernel_level_to_tracing_level(6), Level::INFO);
442
443 assert_eq!(kernel_level_to_tracing_level(7), Level::DEBUG);
445
446 assert_eq!(kernel_level_to_tracing_level(8), Level::INFO);
448 assert_eq!(kernel_level_to_tracing_level(255), Level::INFO);
449 }
450}