1use fs_err::File;
5use fs_err::PathExt;
6use futures::AsyncBufReadExt;
7use futures::AsyncRead;
8use futures::AsyncReadExt;
9use futures::StreamExt;
10use futures::io::BufReader;
11use jiff::Timestamp;
12use kmsg::KmsgParsedEntry;
13use parking_lot::Mutex;
14use std::collections::HashMap;
15use std::io::Read;
16use std::io::Write as _;
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::Arc;
20use tracing::Level;
21use tracing::level_filters::LevelFilter;
22use tracing_subscriber::filter::Targets;
23use tracing_subscriber::fmt::MakeWriter;
24use tracing_subscriber::fmt::format::FmtSpan;
25use tracing_subscriber::layer::SubscriberExt;
26use tracing_subscriber::util::SubscriberInitExt;
27
28#[derive(Clone, Debug)]
30pub struct PetriLogSource(Arc<LogSourceInner>);
31
32#[derive(Debug)]
33struct LogSourceInner {
34 root_path: PathBuf,
35 json_log: JsonLog,
36 log_files: Mutex<HashMap<String, PetriLogFile>>,
37 attachments: Mutex<HashMap<String, u64>>,
38}
39
40impl PetriLogSource {
41 pub fn log_file(&self, name: &str) -> anyhow::Result<PetriLogFile> {
46 use std::collections::hash_map::Entry;
47
48 let mut log_files = self.0.log_files.lock();
49 let log_file = match log_files.entry(name.to_owned()) {
50 Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
51 Entry::Vacant(vacant_entry) => {
52 let mut path = self.0.root_path.join(name);
53 path.set_extension("log");
57 let file = File::create(&path)?;
58 println!("[[ATTACHMENT|{}]]", path.display());
61 vacant_entry
62 .insert(PetriLogFile(Arc::new(LogFileInner {
63 file,
64 json_log: self.0.json_log.clone(),
65 source: name.to_owned(),
66 })))
67 .clone()
68 }
69 };
70 Ok(log_file)
71 }
72
73 fn attachment_path(&self, name: &str) -> PathBuf {
74 let mut attachments = self.0.attachments.lock();
75 let next = attachments.entry(name.to_owned()).or_default();
76 let name = Path::new(name);
77 let name = if *next == 0 {
78 name
79 } else {
80 let base = name.file_stem().unwrap().to_str().unwrap();
81 let extension = name.extension().unwrap_or_default();
82 &Path::new(&format!("{}_{}", base, *next)).with_extension(extension)
83 };
84 *next += 1;
85 self.0.root_path.join(name)
86 }
87
88 pub fn create_attachment(&self, filename: &str) -> anyhow::Result<File> {
93 let path = self.attachment_path(filename);
94 let file = File::create(&path)?;
95 self.trace_attachment(&path);
96 Ok(file)
97 }
98
99 pub fn write_attachment(&self, filename: &str, mut data: impl Read) -> anyhow::Result<PathBuf> {
104 let path = self.attachment_path(filename);
105 let mut file = File::create(&path)?;
106 std::io::copy(&mut data, &mut file)?;
107 self.trace_attachment(&path);
108 Ok(path)
109 }
110
111 pub fn copy_attachment(
116 &self,
117 attachment_filename: &str,
118 source_path: &Path,
119 ) -> anyhow::Result<()> {
120 let dest_path = self.attachment_path(attachment_filename);
121 fs_err::copy(source_path, &dest_path)?;
122 self.trace_attachment(&dest_path);
123 Ok(())
124 }
125
126 fn trace_attachment(&self, path: &Path) {
127 self.0
129 .json_log
130 .write_attachment(path.file_name().unwrap().as_ref());
131 println!("[[ATTACHMENT|{}]]", path.display());
132 }
133
134 pub fn log_test_result(&self, name: &str, r: &anyhow::Result<()>, unstable: bool) {
136 let result_path = match &r {
137 Ok(()) => {
138 tracing::info!("test passed");
139 "petri.passed"
140 }
141 Err(err) if unstable => {
142 tracing::warn!(
143 error = err.as_ref() as &dyn std::error::Error,
144 "unstable test failed"
145 );
146 "petri.failed_unstable"
147 }
148 Err(err) => {
149 tracing::error!(
150 error = err.as_ref() as &dyn std::error::Error,
151 "test failed"
152 );
153 "petri.failed"
154 }
155 };
156 fs_err::write(self.0.root_path.join(result_path), name).unwrap();
159 }
160
161 pub fn output_dir(&self) -> &Path {
163 &self.0.root_path
164 }
165}
166
167#[derive(Clone, Debug)]
168struct JsonLog(Arc<File>);
169
170impl JsonLog {
171 fn write_json(&self, v: &impl serde::Serialize) {
172 let v = serde_json::to_vec(v);
173 if let Ok(mut v) = v {
174 v.push(b'\n');
175 let _ = self.0.as_ref().write_all(&v);
177 }
178 }
179
180 fn write_entry(&self, timestamp: Option<Timestamp>, level: Level, source: &str, buf: &[u8]) {
181 #[derive(serde::Serialize)]
182 struct JsonEntry<'a> {
183 timestamp: Timestamp,
184 source: &'a str,
185 severity: &'a str,
186 message: &'a str,
187 }
188 let message = String::from_utf8_lossy(buf);
189 self.write_json(&JsonEntry {
190 timestamp: timestamp.unwrap_or_else(Timestamp::now),
191 source,
192 severity: level.as_str(),
193 message: message.trim_ascii(),
194 });
195 }
196
197 fn write_attachment(&self, path: &Path) {
198 #[derive(serde::Serialize)]
199 struct JsonEntry<'a> {
200 timestamp: Timestamp,
201 attachment: &'a Path,
202 }
203 self.write_json(&JsonEntry {
204 timestamp: Timestamp::now(),
205 attachment: path,
206 });
207 }
208}
209
210#[derive(Debug)]
211struct LogFileInner {
212 file: File,
213 json_log: JsonLog,
214 source: String,
215}
216
217impl LogFileInner {
218 fn write_stdout(&self, buf: &[u8]) {
219 let mut stdout = std::io::stdout().lock();
220 write!(stdout, "[{:>10}] ", self.source).unwrap();
221 stdout.write_all(buf).unwrap();
222 }
223}
224
225struct LogWriter<'a> {
226 inner: &'a LogFileInner,
227 level: Level,
228 timestamp: Option<Timestamp>,
229}
230
231impl std::io::Write for LogWriter<'_> {
232 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
233 self.inner
235 .json_log
236 .write_entry(self.timestamp, self.level, &self.inner.source, buf);
237 let _ = (&self.inner.file).write_all(buf);
239 self.inner.write_stdout(buf);
241 Ok(buf.len())
242 }
243
244 fn flush(&mut self) -> std::io::Result<()> {
245 Ok(())
246 }
247}
248
249#[derive(Clone, Debug)]
255pub struct PetriLogFile(Arc<LogFileInner>);
256
257impl PetriLogFile {
258 pub fn write_entry_fmt(
260 &self,
261 timestamp: Option<Timestamp>,
262 level: Level,
263 args: std::fmt::Arguments<'_>,
264 ) {
265 let _ = LogWriter {
268 inner: &self.0,
269 level,
270 timestamp,
271 }
272 .write_all(format!("{}\n", args).as_bytes());
273 }
274
275 pub fn write_entry(&self, message: impl std::fmt::Display) {
277 self.write_entry_fmt(None, Level::INFO, format_args!("{}", message));
278 }
279}
280
281#[macro_export]
283macro_rules! log {
284 ($file:expr, $($arg:tt)*) => {
285 <$crate::PetriLogFile>::write_entry_fmt(&$file, format_args!($($arg)*))
286 };
287}
288
289pub fn try_init_tracing(
298 root_path: &Path,
299 default_level: LevelFilter,
300) -> anyhow::Result<PetriLogSource> {
301 let targets =
302 if let Ok(var) = std::env::var("OPENVMM_LOG").or_else(|_| std::env::var("HVLITE_LOG")) {
303 var.parse().unwrap()
304 } else {
305 Targets::new().with_default(default_level)
306 };
307
308 let root_path = root_path.fs_err_canonicalize()?;
310 let jsonl = File::create(root_path.join("petri.jsonl"))?;
311 let logger = PetriLogSource(Arc::new(LogSourceInner {
312 json_log: JsonLog(Arc::new(jsonl)),
313 root_path,
314 log_files: Default::default(),
315 attachments: Default::default(),
316 }));
317
318 let petri_log = logger.log_file("petri")?;
319
320 tracing_subscriber::fmt()
321 .compact()
322 .with_ansi(false) .log_internal_errors(true)
324 .with_writer(PetriWriter(petri_log))
325 .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
326 .with_max_level(LevelFilter::TRACE)
327 .finish()
328 .with(targets)
329 .try_init()?;
330
331 Ok(logger)
332}
333
334struct PetriWriter(PetriLogFile);
335
336impl<'a> MakeWriter<'a> for PetriWriter {
337 type Writer = LogWriter<'a>;
338
339 fn make_writer(&'a self) -> Self::Writer {
340 LogWriter {
341 inner: &self.0.0,
342 level: Level::INFO,
343 timestamp: None,
344 }
345 }
346
347 fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer {
348 LogWriter {
349 inner: &self.0.0,
350 level: *meta.level(),
351 timestamp: None,
352 }
353 }
354}
355
356pub async fn log_task(
361 log_file: PetriLogFile,
362 reader: impl AsyncRead + Unpin + Send + 'static,
363 name: &str,
364) -> anyhow::Result<()> {
365 tracing::info!("connected to {name}");
366 let mut buf = Vec::new();
367 let mut reader = BufReader::new(reader);
368 loop {
369 buf.clear();
370 match (&mut reader).take(256).read_until(b'\n', &mut buf).await {
371 Ok(0) => {
372 tracing::info!("disconnected from {name}: EOF");
373 return Ok(());
374 }
375 Err(e) => {
376 tracing::info!("disconnected from {name}: error: {e:#}");
377 return Err(e.into());
378 }
379 _ => {}
380 }
381
382 let string_buf = String::from_utf8_lossy(&buf);
383 let string_buf_trimmed = string_buf.trim_end();
384
385 if let Some(message) = kmsg::SyslogParsedEntry::new(string_buf_trimmed) {
386 let level = kernel_level_to_tracing_level(message.level);
387 log_file.write_entry_fmt(None, level, format_args!("{}", message.display(false)));
388 } else {
389 log_file.write_entry(string_buf_trimmed);
390 }
391 }
392}
393
394fn kernel_level_to_tracing_level(kernel_level: u8) -> Level {
396 match kernel_level {
397 0..=3 => Level::ERROR,
398 4 => Level::WARN,
399 5..=6 => Level::INFO,
400 7 => Level::DEBUG,
401 _ => Level::INFO,
402 }
403}
404
405pub async fn kmsg_log_task(
407 log_file: PetriLogFile,
408 diag_client: diag_client::DiagClient,
409) -> anyhow::Result<()> {
410 loop {
411 diag_client.wait_for_server().await?;
412 let mut kmsg = diag_client.kmsg(true).await?;
413 tracing::info!("kmsg connected");
414 while let Some(data) = kmsg.next().await {
415 match data {
416 Ok(data) => {
417 let message = KmsgParsedEntry::new(&data).unwrap();
418 let level = kernel_level_to_tracing_level(message.level);
419 log_file.write_entry_fmt(
420 None,
421 level,
422 format_args!("{}", message.display(false)),
423 );
424 }
425 Err(err) => {
426 tracing::info!("kmsg disconnected: {err:#}");
427 break;
428 }
429 }
430 }
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437
438 #[test]
439 fn test_kernel_level_to_tracing_level() {
440 assert_eq!(kernel_level_to_tracing_level(0), Level::ERROR);
442 assert_eq!(kernel_level_to_tracing_level(1), Level::ERROR);
443 assert_eq!(kernel_level_to_tracing_level(2), Level::ERROR);
444 assert_eq!(kernel_level_to_tracing_level(3), Level::ERROR);
445
446 assert_eq!(kernel_level_to_tracing_level(4), Level::WARN);
448
449 assert_eq!(kernel_level_to_tracing_level(5), Level::INFO);
451 assert_eq!(kernel_level_to_tracing_level(6), Level::INFO);
452
453 assert_eq!(kernel_level_to_tracing_level(7), Level::DEBUG);
455
456 assert_eq!(kernel_level_to_tracing_level(8), Level::INFO);
458 assert_eq!(kernel_level_to_tracing_level(255), Level::INFO);
459 }
460}