1#![expect(missing_docs)]
7#![forbid(unsafe_code)]
8
9mod bounded;
10
11use self::bounded::BoundedReceiver;
12use self::bounded::BoundedSender;
13use self::bounded::bounded;
14use anyhow::Context as _;
15use futures::FutureExt;
16use futures::Stream;
17use futures::future::join_all;
18use guid::Guid;
19use inspect::InspectMut;
20use mesh::MeshPayload;
21use mesh::rpc::Rpc;
22use mesh::rpc::RpcSend;
23use pal_async::task::Spawn;
24use pal_async::task::Task;
25use std::fs::File;
26use std::future::Future;
27use std::pin::Pin;
28use std::task::Context;
29use std::task::Poll;
30use tracing_subscriber::Layer;
31use tracing_subscriber::filter::Filtered;
32use tracing_subscriber::filter::Targets;
33use tracing_subscriber::registry::LookupSpan;
34use tracing_subscriber::reload;
35
36#[derive(Debug, MeshPayload)]
37pub struct RemoteTracer {
38 pub trace_writer: TraceWriter,
39 pub trace_filter: MeshFilter,
40 pub perf_trace_filter: MeshFilter,
41 pub perf_trace_file: File,
42 pub perf_trace_flush: mesh::Receiver<Rpc<(), ()>>,
43}
44
45#[derive(Debug, MeshPayload, Clone)]
46pub struct MeshFilter {
47 filter: mesh::Cell<String>,
48}
49
50impl MeshFilter {
51 pub fn apply<L, S>(
54 self,
55 spawn: impl Spawn,
56 layer: L,
57 ) -> anyhow::Result<reload::Layer<Filtered<L, Targets, S>, S>>
58 where
59 L: Layer<S> + Send + Sync,
60 S: tracing::Subscriber + for<'span> LookupSpan<'span>,
61 {
62 let targets: Targets = self
63 .filter
64 .with(|filter| filter.parse())
65 .context("failed to parse filter")?;
66
67 let (layer, reload_handle) = reload::Layer::new(layer.with_filter(targets));
68
69 let mut filter_cell = self.filter;
70 spawn
71 .spawn("tracing filter refresh", async move {
72 loop {
73 filter_cell.wait_next().await;
74 filter_cell.with(|filter| match filter.parse::<Targets>() {
75 Ok(new_targets) => {
76 let _ = reload_handle.modify(|layer| *layer.filter_mut() = new_targets);
77 tracing::info!(filter = filter.as_str(), "updated trace filter");
78 }
79 Err(err) => {
80 tracing::error!(
81 error = &err as &dyn std::error::Error,
82 "failed to update filter"
83 );
84 }
85 })
86 }
87 })
88 .detach();
89
90 Ok(layer)
91 }
92}
93
94#[derive(Debug)]
95struct MeshFilterUpdater {
96 updater: mesh::CellUpdater<String>,
97}
98
99impl MeshFilterUpdater {
100 fn get(&self) -> &str {
101 self.updater.get()
102 }
103
104 fn update(&mut self, filter: &str) -> anyhow::Result<()> {
105 let _: Targets = filter.parse().context("invalid filter")?;
107 self.updater.set(filter.into()).now_or_never();
108 Ok(())
109 }
110}
111
112impl InspectMut for MeshFilterUpdater {
113 fn inspect_mut(&mut self, req: inspect::Request<'_>) {
114 match req.update() {
115 Ok(req) => match self.update(req.new_value()) {
116 Ok(()) => req.succeed(self.get()),
117 Err(err) => req.fail(err),
118 },
119 Err(req) => req.value(self.get()),
120 }
121 }
122}
123
124fn filter(initial: String) -> anyhow::Result<(MeshFilterUpdater, MeshFilter)> {
125 let _: Targets = initial.parse().context("invalid filter")?;
127 let (updater, cell) = mesh::cell(initial);
128 Ok((MeshFilterUpdater { updater }, MeshFilter { filter: cell }))
129}
130
131struct MeshFlusher {
132 spawn: Box<dyn Spawn>,
133 remotes: Vec<mesh::Sender<Rpc<(), ()>>>,
134}
135
136impl MeshFlusher {
137 fn add(&mut self) -> mesh::Receiver<Rpc<(), ()>> {
138 let (send, recv) = mesh::channel();
139 self.remotes.retain(|s| !s.is_closed());
140 self.remotes.push(send);
141 recv
142 }
143}
144
145impl InspectMut for MeshFlusher {
146 fn inspect_mut(&mut self, req: inspect::Request<'_>) {
147 match req.update() {
148 Ok(req) => {
149 let join = join_all(self.remotes.iter().map(|r| r.call(|rpc| rpc, ())));
150 let req = req.defer();
151 self.spawn
152 .spawn("trace-flush", async move {
153 let _ = join.await;
154 req.succeed(true);
155 })
156 .detach();
157 }
158 Err(req) => req.value(false),
159 }
160 }
161}
162
163#[derive(Debug, MeshPayload)]
164pub struct TracingRequest {
165 pub log_type: Type,
166 pub timestamp: u64,
167 pub level: Level,
168 pub name: Option<Vec<u8>>,
169 pub target: Option<Vec<u8>>,
170 pub fields: Option<Vec<u8>>,
171 pub activity_id: Option<Guid>,
172 pub related_activity_id: Option<Guid>,
173 pub correlation_id: Option<Guid>,
174 pub message: Vec<u8>,
175}
176
177#[derive(Debug, MeshPayload)]
178pub enum Type {
179 Event = 0,
180 SpanEnter = 1,
181 SpanExit = 2,
182}
183
184#[derive(Debug, MeshPayload)]
185pub enum Level {
186 Trace,
187 Debug,
188 Info,
189 Warn,
190 Error,
191}
192
193#[derive(MeshPayload, Debug)]
194pub struct TraceWriter(BoundedSender<TracingRequest>);
195
196impl From<BoundedSender<TracingRequest>> for TraceWriter {
197 fn from(sender: BoundedSender<TracingRequest>) -> Self {
198 Self(sender)
199 }
200}
201
202impl TraceWriter {
203 pub fn send(
204 &self,
205 log_type: Type,
206 timestamp: u64,
207 level: Level,
208 name: Option<Vec<u8>>,
209 target: Option<Vec<u8>>,
210 fields: Option<Vec<u8>>,
211 activity_id: Option<Guid>,
212 related_activity_id: Option<Guid>,
213 correlation_id: Option<Guid>,
214 message: Vec<u8>,
215 ) -> bool {
216 self.0
217 .try_send(TracingRequest {
218 log_type,
219 timestamp,
220 level,
221 name,
222 target,
223 fields,
224 activity_id,
225 related_activity_id,
226 correlation_id,
227 message,
228 })
229 .is_ok()
230 }
231}
232
233#[derive(InspectMut)]
235pub struct TracingBackend {
236 #[inspect(skip)]
237 state: BackendState,
238
239 #[inspect(mut, safe)]
240 filter: MeshFilterUpdater,
241 #[inspect(rename = "perf/filter", mut)]
242 perf_filter: MeshFilterUpdater,
243 #[inspect(rename = "perf/flush", mut)]
244 perf_flush: MeshFlusher,
245}
246
247struct BackendState {
248 trace_writer: mesh::Sender<BoundedReceiver<TracingRequest>>,
249 trace_filter: MeshFilter,
250 perf_trace_filter: MeshFilter,
251 perf_trace_file: File,
252 flush_send: mesh::Sender<Rpc<(), ()>>,
253 task: Task<()>,
254}
255
256pub struct TracingRequestStream {
257 new_receivers: mesh::Receiver<BoundedReceiver<TracingRequest>>,
258 receivers: Vec<BoundedReceiver<TracingRequest>>,
259}
260
261impl Stream for TracingRequestStream {
262 type Item = TracingRequest;
263
264 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
265 let this = self.get_mut();
266 let mut i = 0;
267 while let Poll::Ready(Some(recv)) = Pin::new(&mut this.new_receivers).poll_next(cx) {
268 this.receivers.push(recv);
269 }
270 while i < this.receivers.len() {
271 match Pin::new(&mut this.receivers[i]).poll_next(cx) {
272 r @ Poll::Ready(Some(_)) => return r,
273 Poll::Ready(None) => {
274 this.receivers.swap_remove(i);
275 }
276 Poll::Pending => {}
277 }
278 i += 1;
279 }
280 Poll::Pending
281 }
282}
283
284impl TracingBackend {
285 pub fn new<Fut, F>(
287 driver: impl 'static + Spawn,
288 trace_filter: String,
289 perf_trace_filter: String,
290 handle_requests: F,
291 ) -> anyhow::Result<Self>
292 where
293 F: 'static + Send + FnOnce(TracingRequestStream, mesh::Receiver<Rpc<(), ()>>) -> Fut,
294 Fut: 'static + Send + Future<Output = ()>,
295 {
296 let (send, recv) = mesh::channel();
297
298 let (trace_filter_updater, trace_filter) = filter(trace_filter)?;
299 let (perf_trace_filter_updater, perf_trace_filter) = filter(perf_trace_filter)?;
300
301 let perf_trace_file = File::options()
305 .append(true)
306 .create(true)
307 .open("underhill.perfetto")
308 .context("failed to open underhill.perfetto")?;
309
310 let (flush_send, flush_recv) = mesh::channel();
311 let task = driver.spawn(
312 "log write",
313 handle_requests(
314 TracingRequestStream {
315 new_receivers: recv,
316 receivers: Vec::new(),
317 },
318 flush_recv,
319 ),
320 );
321 Ok(Self {
322 state: BackendState {
323 trace_writer: send,
324 trace_filter,
325 perf_trace_filter,
326 perf_trace_file,
327 flush_send,
328 task,
329 },
330 filter: trace_filter_updater,
331 perf_filter: perf_trace_filter_updater,
332 perf_flush: MeshFlusher {
333 spawn: Box::new(driver),
334 remotes: Vec::new(),
335 },
336 })
337 }
338
339 pub fn tracer(&mut self) -> RemoteTracer {
340 let (send, recv) = bounded(256);
341 self.state.trace_writer.send(recv);
342 RemoteTracer {
343 trace_writer: TraceWriter(send),
344 trace_filter: self.state.trace_filter.clone(),
345 perf_trace_filter: self.state.perf_trace_filter.clone(),
346 perf_trace_file: self.state.perf_trace_file.try_clone().unwrap(),
347 perf_trace_flush: self.perf_flush.add(),
348 }
349 }
350
351 pub async fn flush(&mut self) {
353 self.state.flush_send.call(|x| x, ()).await.ok();
354 }
355
356 pub async fn shutdown(self) {
360 drop(self.state.flush_send);
361 self.state.task.await;
362 }
363}