1use diag_client::kmsg_stream::KmsgStream;
5use fs_err::File;
6use fs_err::PathExt;
7use futures::AsyncBufReadExt;
8use futures::AsyncRead;
9use futures::AsyncReadExt;
10use futures::StreamExt;
11use futures::io::BufReader;
12use jiff::Timestamp;
13use parking_lot::Mutex;
14use std::collections::HashMap;
15use std::io::Write as _;
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19use tracing::Level;
20use tracing::level_filters::LevelFilter;
21use tracing_subscriber::filter::Targets;
22use tracing_subscriber::fmt::MakeWriter;
23use tracing_subscriber::fmt::format::FmtSpan;
24use tracing_subscriber::layer::SubscriberExt;
25use tracing_subscriber::util::SubscriberInitExt;
26
27#[derive(Clone)]
29pub struct PetriLogSource(Arc<LogSourceInner>);
30
31struct LogSourceInner {
32 root_path: PathBuf,
33 json_log: JsonLog,
34 log_files: Mutex<HashMap<String, PetriLogFile>>,
35 attachments: Mutex<HashMap<String, u64>>,
36}
37
38impl PetriLogSource {
39 pub fn log_file(&self, name: &str) -> anyhow::Result<PetriLogFile> {
44 use std::collections::hash_map::Entry;
45
46 let mut log_files = self.0.log_files.lock();
47 let log_file = match log_files.entry(name.to_owned()) {
48 Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
49 Entry::Vacant(vacant_entry) => {
50 let mut path = self.0.root_path.join(name);
51 path.set_extension("log");
55 let file = File::create(&path)?;
56 println!("[[ATTACHMENT|{}]]", path.display());
59 vacant_entry
60 .insert(PetriLogFile(Arc::new(LogFileInner {
61 file,
62 json_log: self.0.json_log.clone(),
63 source: name.to_owned(),
64 })))
65 .clone()
66 }
67 };
68 Ok(log_file)
69 }
70
71 fn attachment_path(&self, name: &str) -> PathBuf {
72 let mut attachments = self.0.attachments.lock();
73 let next = attachments.entry(name.to_owned()).or_default();
74 let name = Path::new(name);
75 let name = if *next == 0 {
76 name
77 } else {
78 let base = name.file_stem().unwrap().to_str().unwrap();
79 let extension = name.extension().unwrap_or_default();
80 &Path::new(&format!("{}_{}", base, *next)).with_extension(extension)
81 };
82 *next += 1;
83 self.0.root_path.join(name)
84 }
85
86 pub fn create_attachment(&self, filename: &str) -> anyhow::Result<File> {
91 let path = self.attachment_path(filename);
92 let file = File::create(&path)?;
93 self.trace_attachment(&path);
94 Ok(file)
95 }
96
97 pub fn write_attachment(
102 &self,
103 filename: &str,
104 data: impl AsRef<[u8]>,
105 ) -> anyhow::Result<PathBuf> {
106 let path = self.attachment_path(filename);
107 fs_err::write(&path, data)?;
108 self.trace_attachment(&path);
109 Ok(path)
110 }
111
112 fn trace_attachment(&self, path: &Path) {
113 self.0
115 .json_log
116 .write_attachment(path.file_name().unwrap().as_ref());
117 println!("[[ATTACHMENT|{}]]", path.display());
118 }
119}
120
121#[derive(Clone)]
122struct JsonLog(Arc<File>);
123
124impl JsonLog {
125 fn write_json(&self, v: &impl serde::Serialize) {
126 let v = serde_json::to_vec(v);
127 if let Ok(mut v) = v {
128 v.push(b'\n');
129 let _ = self.0.as_ref().write_all(&v);
131 }
132 }
133
134 fn write_entry(&self, timestamp: Option<Timestamp>, level: Level, source: &str, buf: &[u8]) {
135 #[derive(serde::Serialize)]
136 struct JsonEntry<'a> {
137 timestamp: Timestamp,
138 source: &'a str,
139 severity: &'a str,
140 message: &'a str,
141 }
142 let message = String::from_utf8_lossy(buf);
143 self.write_json(&JsonEntry {
144 timestamp: timestamp.unwrap_or_else(Timestamp::now),
145 source,
146 severity: level.as_str(),
147 message: message.trim_ascii(),
148 });
149 }
150
151 fn write_attachment(&self, path: &Path) {
152 #[derive(serde::Serialize)]
153 struct JsonEntry<'a> {
154 timestamp: Timestamp,
155 attachment: &'a Path,
156 }
157 self.write_json(&JsonEntry {
158 timestamp: Timestamp::now(),
159 attachment: path,
160 });
161 }
162}
163
164struct LogFileInner {
165 file: File,
166 json_log: JsonLog,
167 source: String,
168}
169
170impl LogFileInner {
171 fn write_stdout(&self, buf: &[u8]) {
172 let mut stdout = std::io::stdout().lock();
173 write!(stdout, "[{:>10}] ", self.source).unwrap();
174 stdout.write_all(buf).unwrap();
175 }
176}
177
178struct LogWriter<'a> {
179 inner: &'a LogFileInner,
180 level: Level,
181 timestamp: Option<Timestamp>,
182}
183
184impl std::io::Write for LogWriter<'_> {
185 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
186 self.inner
188 .json_log
189 .write_entry(self.timestamp, self.level, &self.inner.source, buf);
190 let _ = (&self.inner.file).write_all(buf);
192 self.inner.write_stdout(buf);
194 Ok(buf.len())
195 }
196
197 fn flush(&mut self) -> std::io::Result<()> {
198 Ok(())
199 }
200}
201
202#[derive(Clone)]
208pub struct PetriLogFile(Arc<LogFileInner>);
209
210impl PetriLogFile {
211 pub fn write_entry_fmt(
213 &self,
214 timestamp: Option<Timestamp>,
215 level: Level,
216 args: std::fmt::Arguments<'_>,
217 ) {
218 let _ = LogWriter {
221 inner: &self.0,
222 level,
223 timestamp,
224 }
225 .write_all(format!("{}\n", args).as_bytes());
226 }
227
228 pub fn write_entry(&self, message: impl std::fmt::Display) {
230 self.write_entry_fmt(None, Level::INFO, format_args!("{}", message));
231 }
232}
233
234#[macro_export]
236macro_rules! log {
237 ($file:expr, $($arg:tt)*) => {
238 <$crate::PetriLogFile>::write_entry_fmt(&$file, format_args!($($arg)*))
239 };
240}
241
242pub fn try_init_tracing(root_path: &Path) -> anyhow::Result<PetriLogSource> {
251 let targets =
252 if let Ok(var) = std::env::var("OPENVMM_LOG").or_else(|_| std::env::var("HVLITE_LOG")) {
253 var.parse().unwrap()
254 } else {
255 Targets::new().with_default(LevelFilter::DEBUG)
256 };
257
258 let root_path = root_path.fs_err_canonicalize()?;
260 let jsonl = File::create(root_path.join("petri.jsonl"))?;
261 let logger = PetriLogSource(Arc::new(LogSourceInner {
262 json_log: JsonLog(Arc::new(jsonl)),
263 root_path,
264 log_files: Default::default(),
265 attachments: Default::default(),
266 }));
267
268 let petri_log = logger.log_file("petri")?;
269
270 tracing_subscriber::fmt()
271 .compact()
272 .with_ansi(false) .log_internal_errors(true)
274 .with_writer(PetriWriter(petri_log))
275 .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
276 .with_max_level(LevelFilter::TRACE)
277 .finish()
278 .with(targets)
279 .try_init()?;
280
281 Ok(logger)
282}
283
284struct PetriWriter(PetriLogFile);
285
286impl<'a> MakeWriter<'a> for PetriWriter {
287 type Writer = LogWriter<'a>;
288
289 fn make_writer(&'a self) -> Self::Writer {
290 LogWriter {
291 inner: &self.0.0,
292 level: Level::INFO,
293 timestamp: None,
294 }
295 }
296
297 fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer {
298 LogWriter {
299 inner: &self.0.0,
300 level: *meta.level(),
301 timestamp: None,
302 }
303 }
304}
305
306pub async fn log_stream(
308 log_file: PetriLogFile,
309 reader: impl AsyncRead + Unpin + Send + 'static,
310) -> anyhow::Result<()> {
311 let mut buf = Vec::new();
312 let mut reader = BufReader::new(reader);
313 loop {
314 buf.clear();
315 let n = (&mut reader).take(256).read_until(b'\n', &mut buf).await?;
316 if n == 0 {
317 break;
318 }
319
320 let string_buf = String::from_utf8_lossy(&buf);
321 let string_buf_trimmed = string_buf.trim_end();
322 log_file.write_entry(string_buf_trimmed);
323 }
324 Ok(())
325}
326
327pub async fn kmsg_log_task(
329 log_file: PetriLogFile,
330 mut file_stream: KmsgStream,
331) -> anyhow::Result<()> {
332 while let Some(data) = file_stream.next().await {
333 match data {
334 Ok(data) => {
335 let message = kmsg::KmsgParsedEntry::new(&data)?;
336 log_file.write_entry(message.display(false));
337 }
338 Err(err) => {
339 tracing::info!("kmsg disconnected: {err:?}");
340 break;
341 }
342 }
343 }
344
345 Ok(())
346}