1use fs_err::File;
5use fs_err::PathExt;
6use futures::AsyncBufReadExt;
7use futures::AsyncRead;
8use futures::AsyncReadExt;
9use futures::StreamExt;
10use futures::io::BufReader;
11use jiff::Timestamp;
12use kmsg::KmsgParsedEntry;
13use parking_lot::Mutex;
14use std::collections::HashMap;
15use std::io::Read;
16use std::io::Write as _;
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::Arc;
20use tracing::Level;
21use tracing::level_filters::LevelFilter;
22use tracing_subscriber::filter::Targets;
23use tracing_subscriber::fmt::MakeWriter;
24use tracing_subscriber::fmt::format::FmtSpan;
25use tracing_subscriber::layer::SubscriberExt;
26use tracing_subscriber::util::SubscriberInitExt;
27
28#[derive(Clone)]
30pub struct PetriLogSource(Arc<LogSourceInner>);
31
32struct LogSourceInner {
33 root_path: PathBuf,
34 json_log: JsonLog,
35 log_files: Mutex<HashMap<String, PetriLogFile>>,
36 attachments: Mutex<HashMap<String, u64>>,
37}
38
39impl PetriLogSource {
40 pub fn log_file(&self, name: &str) -> anyhow::Result<PetriLogFile> {
45 use std::collections::hash_map::Entry;
46
47 let mut log_files = self.0.log_files.lock();
48 let log_file = match log_files.entry(name.to_owned()) {
49 Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
50 Entry::Vacant(vacant_entry) => {
51 let mut path = self.0.root_path.join(name);
52 path.set_extension("log");
56 let file = File::create(&path)?;
57 println!("[[ATTACHMENT|{}]]", path.display());
60 vacant_entry
61 .insert(PetriLogFile(Arc::new(LogFileInner {
62 file,
63 json_log: self.0.json_log.clone(),
64 source: name.to_owned(),
65 })))
66 .clone()
67 }
68 };
69 Ok(log_file)
70 }
71
72 fn attachment_path(&self, name: &str) -> PathBuf {
73 let mut attachments = self.0.attachments.lock();
74 let next = attachments.entry(name.to_owned()).or_default();
75 let name = Path::new(name);
76 let name = if *next == 0 {
77 name
78 } else {
79 let base = name.file_stem().unwrap().to_str().unwrap();
80 let extension = name.extension().unwrap_or_default();
81 &Path::new(&format!("{}_{}", base, *next)).with_extension(extension)
82 };
83 *next += 1;
84 self.0.root_path.join(name)
85 }
86
87 pub fn create_attachment(&self, filename: &str) -> anyhow::Result<File> {
92 let path = self.attachment_path(filename);
93 let file = File::create(&path)?;
94 self.trace_attachment(&path);
95 Ok(file)
96 }
97
98 pub fn write_attachment(&self, filename: &str, mut data: impl Read) -> anyhow::Result<PathBuf> {
103 let path = self.attachment_path(filename);
104 let mut file = File::create(&path)?;
105 std::io::copy(&mut data, &mut file)?;
106 self.trace_attachment(&path);
107 Ok(path)
108 }
109
110 pub fn copy_attachment(
115 &self,
116 attachment_filename: &str,
117 source_path: &Path,
118 ) -> anyhow::Result<()> {
119 let dest_path = self.attachment_path(attachment_filename);
120 fs_err::copy(source_path, &dest_path)?;
121 self.trace_attachment(&dest_path);
122 Ok(())
123 }
124
125 fn trace_attachment(&self, path: &Path) {
126 self.0
128 .json_log
129 .write_attachment(path.file_name().unwrap().as_ref());
130 println!("[[ATTACHMENT|{}]]", path.display());
131 }
132
133 pub fn log_test_result(&self, name: &str, r: &anyhow::Result<()>) {
135 let result_path = match &r {
136 Ok(()) => {
137 tracing::info!("test passed");
138 "petri.passed"
139 }
140 Err(err) => {
141 tracing::error!(
142 error = err.as_ref() as &dyn std::error::Error,
143 "test failed"
144 );
145 "petri.failed"
146 }
147 };
148 fs_err::write(self.0.root_path.join(result_path), name).unwrap();
151 }
152
153 pub fn output_dir(&self) -> &Path {
155 &self.0.root_path
156 }
157}
158
159#[derive(Clone)]
160struct JsonLog(Arc<File>);
161
162impl JsonLog {
163 fn write_json(&self, v: &impl serde::Serialize) {
164 let v = serde_json::to_vec(v);
165 if let Ok(mut v) = v {
166 v.push(b'\n');
167 let _ = self.0.as_ref().write_all(&v);
169 }
170 }
171
172 fn write_entry(&self, timestamp: Option<Timestamp>, level: Level, source: &str, buf: &[u8]) {
173 #[derive(serde::Serialize)]
174 struct JsonEntry<'a> {
175 timestamp: Timestamp,
176 source: &'a str,
177 severity: &'a str,
178 message: &'a str,
179 }
180 let message = String::from_utf8_lossy(buf);
181 self.write_json(&JsonEntry {
182 timestamp: timestamp.unwrap_or_else(Timestamp::now),
183 source,
184 severity: level.as_str(),
185 message: message.trim_ascii(),
186 });
187 }
188
189 fn write_attachment(&self, path: &Path) {
190 #[derive(serde::Serialize)]
191 struct JsonEntry<'a> {
192 timestamp: Timestamp,
193 attachment: &'a Path,
194 }
195 self.write_json(&JsonEntry {
196 timestamp: Timestamp::now(),
197 attachment: path,
198 });
199 }
200}
201
202struct LogFileInner {
203 file: File,
204 json_log: JsonLog,
205 source: String,
206}
207
208impl LogFileInner {
209 fn write_stdout(&self, buf: &[u8]) {
210 let mut stdout = std::io::stdout().lock();
211 write!(stdout, "[{:>10}] ", self.source).unwrap();
212 stdout.write_all(buf).unwrap();
213 }
214}
215
216struct LogWriter<'a> {
217 inner: &'a LogFileInner,
218 level: Level,
219 timestamp: Option<Timestamp>,
220}
221
222impl std::io::Write for LogWriter<'_> {
223 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
224 self.inner
226 .json_log
227 .write_entry(self.timestamp, self.level, &self.inner.source, buf);
228 let _ = (&self.inner.file).write_all(buf);
230 self.inner.write_stdout(buf);
232 Ok(buf.len())
233 }
234
235 fn flush(&mut self) -> std::io::Result<()> {
236 Ok(())
237 }
238}
239
240#[derive(Clone)]
246pub struct PetriLogFile(Arc<LogFileInner>);
247
248impl PetriLogFile {
249 pub fn write_entry_fmt(
251 &self,
252 timestamp: Option<Timestamp>,
253 level: Level,
254 args: std::fmt::Arguments<'_>,
255 ) {
256 let _ = LogWriter {
259 inner: &self.0,
260 level,
261 timestamp,
262 }
263 .write_all(format!("{}\n", args).as_bytes());
264 }
265
266 pub fn write_entry(&self, message: impl std::fmt::Display) {
268 self.write_entry_fmt(None, Level::INFO, format_args!("{}", message));
269 }
270}
271
272#[macro_export]
274macro_rules! log {
275 ($file:expr, $($arg:tt)*) => {
276 <$crate::PetriLogFile>::write_entry_fmt(&$file, format_args!($($arg)*))
277 };
278}
279
280pub fn try_init_tracing(root_path: &Path) -> anyhow::Result<PetriLogSource> {
289 let targets =
290 if let Ok(var) = std::env::var("OPENVMM_LOG").or_else(|_| std::env::var("HVLITE_LOG")) {
291 var.parse().unwrap()
292 } else {
293 Targets::new().with_default(LevelFilter::DEBUG)
294 };
295
296 let root_path = root_path.fs_err_canonicalize()?;
298 let jsonl = File::create(root_path.join("petri.jsonl"))?;
299 let logger = PetriLogSource(Arc::new(LogSourceInner {
300 json_log: JsonLog(Arc::new(jsonl)),
301 root_path,
302 log_files: Default::default(),
303 attachments: Default::default(),
304 }));
305
306 let petri_log = logger.log_file("petri")?;
307
308 tracing_subscriber::fmt()
309 .compact()
310 .with_ansi(false) .log_internal_errors(true)
312 .with_writer(PetriWriter(petri_log))
313 .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
314 .with_max_level(LevelFilter::TRACE)
315 .finish()
316 .with(targets)
317 .try_init()?;
318
319 Ok(logger)
320}
321
322struct PetriWriter(PetriLogFile);
323
324impl<'a> MakeWriter<'a> for PetriWriter {
325 type Writer = LogWriter<'a>;
326
327 fn make_writer(&'a self) -> Self::Writer {
328 LogWriter {
329 inner: &self.0.0,
330 level: Level::INFO,
331 timestamp: None,
332 }
333 }
334
335 fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer {
336 LogWriter {
337 inner: &self.0.0,
338 level: *meta.level(),
339 timestamp: None,
340 }
341 }
342}
343
344pub async fn log_task(
349 log_file: PetriLogFile,
350 reader: impl AsyncRead + Unpin + Send + 'static,
351 name: &str,
352) -> anyhow::Result<()> {
353 tracing::info!("connected to {name}");
354 let mut buf = Vec::new();
355 let mut reader = BufReader::new(reader);
356 loop {
357 buf.clear();
358 match (&mut reader).take(256).read_until(b'\n', &mut buf).await {
359 Ok(0) => {
360 tracing::info!("disconnected from {name}: EOF");
361 return Ok(());
362 }
363 Err(e) => {
364 tracing::info!("disconnected from {name}: error: {e:#}");
365 return Err(e.into());
366 }
367 _ => {}
368 }
369
370 let string_buf = String::from_utf8_lossy(&buf);
371 let string_buf_trimmed = string_buf.trim_end();
372
373 if let Some(message) = kmsg::SyslogParsedEntry::new(string_buf_trimmed) {
374 let level = kernel_level_to_tracing_level(message.level);
375 log_file.write_entry_fmt(None, level, format_args!("{}", message.display(false)));
376 } else {
377 log_file.write_entry(string_buf_trimmed);
378 }
379 }
380}
381
382fn kernel_level_to_tracing_level(kernel_level: u8) -> Level {
384 match kernel_level {
385 0..=3 => Level::ERROR,
386 4 => Level::WARN,
387 5..=6 => Level::INFO,
388 7 => Level::DEBUG,
389 _ => Level::INFO,
390 }
391}
392
393pub async fn kmsg_log_task(
395 log_file: PetriLogFile,
396 diag_client: diag_client::DiagClient,
397) -> anyhow::Result<()> {
398 loop {
399 diag_client.wait_for_server().await?;
400 let mut kmsg = diag_client.kmsg(true).await?;
401 tracing::info!("kmsg connected");
402 while let Some(data) = kmsg.next().await {
403 match data {
404 Ok(data) => {
405 let message = KmsgParsedEntry::new(&data).unwrap();
406 let level = kernel_level_to_tracing_level(message.level);
407 log_file.write_entry_fmt(
408 None,
409 level,
410 format_args!("{}", message.display(false)),
411 );
412 }
413 Err(err) => {
414 tracing::info!("kmsg disconnected: {err:#}");
415 break;
416 }
417 }
418 }
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425
426 #[test]
427 fn test_kernel_level_to_tracing_level() {
428 assert_eq!(kernel_level_to_tracing_level(0), Level::ERROR);
430 assert_eq!(kernel_level_to_tracing_level(1), Level::ERROR);
431 assert_eq!(kernel_level_to_tracing_level(2), Level::ERROR);
432 assert_eq!(kernel_level_to_tracing_level(3), Level::ERROR);
433
434 assert_eq!(kernel_level_to_tracing_level(4), Level::WARN);
436
437 assert_eq!(kernel_level_to_tracing_level(5), Level::INFO);
439 assert_eq!(kernel_level_to_tracing_level(6), Level::INFO);
440
441 assert_eq!(kernel_level_to_tracing_level(7), Level::DEBUG);
443
444 assert_eq!(kernel_level_to_tracing_level(8), Level::INFO);
446 assert_eq!(kernel_level_to_tracing_level(255), Level::INFO);
447 }
448}