mesh_tracing/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Mesh tracing backend.
5
6#![expect(missing_docs)]
7#![forbid(unsafe_code)]
8
9mod bounded;
10
11use self::bounded::BoundedReceiver;
12use self::bounded::BoundedSender;
13use self::bounded::bounded;
14use anyhow::Context as _;
15use futures::FutureExt;
16use futures::Stream;
17use futures::future::join_all;
18use guid::Guid;
19use inspect::InspectMut;
20use mesh::MeshPayload;
21use mesh::rpc::Rpc;
22use mesh::rpc::RpcSend;
23use pal_async::task::Spawn;
24use pal_async::task::Task;
25use std::fs::File;
26use std::future::Future;
27use std::pin::Pin;
28use std::task::Context;
29use std::task::Poll;
30use tracing_subscriber::Layer;
31use tracing_subscriber::filter::Filtered;
32use tracing_subscriber::filter::Targets;
33use tracing_subscriber::registry::LookupSpan;
34use tracing_subscriber::reload;
35
36#[derive(Debug, MeshPayload)]
37pub struct RemoteTracer {
38    pub trace_writer: TraceWriter,
39    pub trace_filter: MeshFilter,
40    pub perf_trace_filter: MeshFilter,
41    pub perf_trace_file: File,
42    pub perf_trace_flush: mesh::Receiver<Rpc<(), ()>>,
43}
44
45#[derive(Debug, MeshPayload, Clone)]
46pub struct MeshFilter {
47    filter: mesh::Cell<String>,
48}
49
50impl MeshFilter {
51    /// Wraps `layer` in a filter that will be dynamically updated by incoming
52    /// mesh messages.
53    pub fn apply<L, S>(
54        self,
55        spawn: impl Spawn,
56        layer: L,
57    ) -> anyhow::Result<reload::Layer<Filtered<L, Targets, S>, S>>
58    where
59        L: Layer<S> + Send + Sync,
60        S: tracing::Subscriber + for<'span> LookupSpan<'span>,
61    {
62        let targets: Targets = self
63            .filter
64            .with(|filter| filter.parse())
65            .context("failed to parse filter")?;
66
67        let (layer, reload_handle) = reload::Layer::new(layer.with_filter(targets));
68
69        let mut filter_cell = self.filter;
70        spawn
71            .spawn("tracing filter refresh", async move {
72                loop {
73                    filter_cell.wait_next().await;
74                    filter_cell.with(|filter| match filter.parse::<Targets>() {
75                        Ok(new_targets) => {
76                            let _ = reload_handle.modify(|layer| *layer.filter_mut() = new_targets);
77                            tracing::info!(filter = filter.as_str(), "updated trace filter");
78                        }
79                        Err(err) => {
80                            tracing::error!(
81                                error = &err as &dyn std::error::Error,
82                                "failed to update filter"
83                            );
84                        }
85                    })
86                }
87            })
88            .detach();
89
90        Ok(layer)
91    }
92}
93
94#[derive(Debug)]
95struct MeshFilterUpdater {
96    updater: mesh::CellUpdater<String>,
97}
98
99impl MeshFilterUpdater {
100    fn get(&self) -> &str {
101        self.updater.get()
102    }
103
104    fn update(&mut self, filter: &str) -> anyhow::Result<()> {
105        // Validate the filter.
106        let _: Targets = filter.parse().context("invalid filter")?;
107        self.updater.set(filter.into()).now_or_never();
108        Ok(())
109    }
110}
111
112impl InspectMut for MeshFilterUpdater {
113    fn inspect_mut(&mut self, req: inspect::Request<'_>) {
114        match req.update() {
115            Ok(req) => match self.update(req.new_value()) {
116                Ok(()) => req.succeed(self.get()),
117                Err(err) => req.fail(err),
118            },
119            Err(req) => req.value(self.get()),
120        }
121    }
122}
123
124fn filter(initial: String) -> anyhow::Result<(MeshFilterUpdater, MeshFilter)> {
125    // Validate the filter.
126    let _: Targets = initial.parse().context("invalid filter")?;
127    let (updater, cell) = mesh::cell(initial);
128    Ok((MeshFilterUpdater { updater }, MeshFilter { filter: cell }))
129}
130
131struct MeshFlusher {
132    spawn: Box<dyn Spawn>,
133    remotes: Vec<mesh::Sender<Rpc<(), ()>>>,
134}
135
136impl MeshFlusher {
137    fn add(&mut self) -> mesh::Receiver<Rpc<(), ()>> {
138        let (send, recv) = mesh::channel();
139        self.remotes.retain(|s| !s.is_closed());
140        self.remotes.push(send);
141        recv
142    }
143}
144
145impl InspectMut for MeshFlusher {
146    fn inspect_mut(&mut self, req: inspect::Request<'_>) {
147        match req.update() {
148            Ok(req) => {
149                let join = join_all(self.remotes.iter().map(|r| r.call(|rpc| rpc, ())));
150                let req = req.defer();
151                self.spawn
152                    .spawn("trace-flush", async move {
153                        let _ = join.await;
154                        req.succeed(true);
155                    })
156                    .detach();
157            }
158            Err(req) => req.value(false),
159        }
160    }
161}
162
163#[derive(Debug, MeshPayload)]
164pub struct TracingRequest {
165    pub log_type: Type,
166    pub timestamp: u64,
167    pub level: Level,
168    pub name: Option<Vec<u8>>,
169    pub target: Option<Vec<u8>>,
170    pub fields: Option<Vec<u8>>,
171    pub activity_id: Option<Guid>,
172    pub related_activity_id: Option<Guid>,
173    pub correlation_id: Option<Guid>,
174    pub message: Vec<u8>,
175}
176
177#[derive(Debug, MeshPayload)]
178pub enum Type {
179    Event = 0,
180    SpanEnter = 1,
181    SpanExit = 2,
182}
183
184#[derive(Debug, MeshPayload)]
185pub enum Level {
186    Trace,
187    Debug,
188    Info,
189    Warn,
190    Error,
191}
192
193#[derive(MeshPayload, Debug)]
194pub struct TraceWriter(BoundedSender<TracingRequest>);
195
196impl From<BoundedSender<TracingRequest>> for TraceWriter {
197    fn from(sender: BoundedSender<TracingRequest>) -> Self {
198        Self(sender)
199    }
200}
201
202impl TraceWriter {
203    pub fn send(
204        &self,
205        log_type: Type,
206        timestamp: u64,
207        level: Level,
208        name: Option<Vec<u8>>,
209        target: Option<Vec<u8>>,
210        fields: Option<Vec<u8>>,
211        activity_id: Option<Guid>,
212        related_activity_id: Option<Guid>,
213        correlation_id: Option<Guid>,
214        message: Vec<u8>,
215    ) -> bool {
216        self.0
217            .try_send(TracingRequest {
218                log_type,
219                timestamp,
220                level,
221                name,
222                target,
223                fields,
224                activity_id,
225                related_activity_id,
226                correlation_id,
227                message,
228            })
229            .is_ok()
230    }
231}
232
233/// Object to configure and reconfigure tracing for Underhill.
234#[derive(InspectMut)]
235pub struct TracingBackend {
236    #[inspect(skip)]
237    state: BackendState,
238
239    #[inspect(mut, safe)]
240    filter: MeshFilterUpdater,
241    #[inspect(rename = "perf/filter", mut)]
242    perf_filter: MeshFilterUpdater,
243    #[inspect(rename = "perf/flush", mut)]
244    perf_flush: MeshFlusher,
245}
246
247struct BackendState {
248    trace_writer: mesh::Sender<BoundedReceiver<TracingRequest>>,
249    trace_filter: MeshFilter,
250    perf_trace_filter: MeshFilter,
251    perf_trace_file: File,
252    flush_send: mesh::Sender<Rpc<(), ()>>,
253    task: Task<()>,
254}
255
256pub struct TracingRequestStream {
257    new_receivers: mesh::Receiver<BoundedReceiver<TracingRequest>>,
258    receivers: Vec<BoundedReceiver<TracingRequest>>,
259}
260
261impl Stream for TracingRequestStream {
262    type Item = TracingRequest;
263
264    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
265        let this = self.get_mut();
266        let mut i = 0;
267        while let Poll::Ready(Some(recv)) = Pin::new(&mut this.new_receivers).poll_next(cx) {
268            this.receivers.push(recv);
269        }
270        while i < this.receivers.len() {
271            match Pin::new(&mut this.receivers[i]).poll_next(cx) {
272                r @ Poll::Ready(Some(_)) => return r,
273                Poll::Ready(None) => {
274                    this.receivers.swap_remove(i);
275                }
276                Poll::Pending => {}
277            }
278            i += 1;
279        }
280        Poll::Pending
281    }
282}
283
284impl TracingBackend {
285    /// Spawns worker that sends traces to the host
286    pub fn new<Fut, F>(
287        driver: impl 'static + Spawn,
288        trace_filter: String,
289        perf_trace_filter: String,
290        handle_requests: F,
291    ) -> anyhow::Result<Self>
292    where
293        F: 'static + Send + FnOnce(TracingRequestStream, mesh::Receiver<Rpc<(), ()>>) -> Fut,
294        Fut: 'static + Send + Future<Output = ()>,
295    {
296        let (send, recv) = mesh::channel();
297
298        let (trace_filter_updater, trace_filter) = filter(trace_filter)?;
299        let (perf_trace_filter_updater, perf_trace_filter) = filter(perf_trace_filter)?;
300
301        // This perf trace file can be shared across all processes in the mesh,
302        // without extra synchronization. This works because the file extending
303        // writes are atomic.
304        let perf_trace_file = File::options()
305            .append(true)
306            .create(true)
307            .open("underhill.perfetto")
308            .context("failed to open underhill.perfetto")?;
309
310        let (flush_send, flush_recv) = mesh::channel();
311        let task = driver.spawn(
312            "log write",
313            handle_requests(
314                TracingRequestStream {
315                    new_receivers: recv,
316                    receivers: Vec::new(),
317                },
318                flush_recv,
319            ),
320        );
321        Ok(Self {
322            state: BackendState {
323                trace_writer: send,
324                trace_filter,
325                perf_trace_filter,
326                perf_trace_file,
327                flush_send,
328                task,
329            },
330            filter: trace_filter_updater,
331            perf_filter: perf_trace_filter_updater,
332            perf_flush: MeshFlusher {
333                spawn: Box::new(driver),
334                remotes: Vec::new(),
335            },
336        })
337    }
338
339    pub fn tracer(&mut self) -> RemoteTracer {
340        let (send, recv) = bounded(256);
341        self.state.trace_writer.send(recv);
342        RemoteTracer {
343            trace_writer: TraceWriter(send),
344            trace_filter: self.state.trace_filter.clone(),
345            perf_trace_filter: self.state.perf_trace_filter.clone(),
346            perf_trace_file: self.state.perf_trace_file.try_clone().unwrap(),
347            perf_trace_flush: self.perf_flush.add(),
348        }
349    }
350
351    /// Requests that all sent log messages have been flushed.
352    pub async fn flush(&mut self) {
353        self.state.flush_send.call(|x| x, ()).await.ok();
354    }
355
356    /// Shuts down the tracing backend.
357    ///
358    /// This implicitly flushes any sent log messages.
359    pub async fn shutdown(self) {
360        drop(self.state.flush_send);
361        self.state.task.await;
362    }
363}