net_packet_capture/
lib.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! `pcapng` compatible packet capture endpoint implementation.
5
6#![expect(missing_docs)]
7#![forbid(unsafe_code)]
8
9use async_trait::async_trait;
10use futures::FutureExt;
11use futures::StreamExt;
12use futures::lock::Mutex;
13use futures_concurrency::future::Race;
14use guestmem::GuestMemory;
15use inspect::InspectMut;
16use mesh::error::RemoteError;
17use mesh::rpc::FailableRpc;
18use mesh::rpc::RpcSend;
19use net_backend::BufferAccess;
20use net_backend::Endpoint;
21use net_backend::EndpointAction;
22use net_backend::MultiQueueSupport;
23use net_backend::Queue;
24use net_backend::QueueConfig;
25use net_backend::RssConfig;
26use net_backend::RxId;
27use net_backend::TxError;
28use net_backend::TxId;
29use net_backend::TxOffloadSupport;
30use net_backend::TxSegment;
31use net_backend::next_packet;
32use pcap_file::DataLink;
33use pcap_file::PcapError;
34use pcap_file::PcapResult;
35use pcap_file::pcapng::PcapNgWriter;
36use pcap_file::pcapng::blocks::enhanced_packet::EnhancedPacketBlock;
37use pcap_file::pcapng::blocks::interface_description::InterfaceDescriptionBlock;
38use std::borrow::Cow;
39use std::io::Write;
40use std::sync::Arc;
41use std::sync::atomic::AtomicBool;
42use std::sync::atomic::AtomicUsize;
43use std::sync::atomic::Ordering;
44use std::task::Context;
45use std::task::Poll;
46use std::time::Duration;
47use std::time::SystemTime;
48use std::time::UNIX_EPOCH;
49
50/// Defines packet capture operations.
51#[derive(Debug, PartialEq, mesh::MeshPayload)]
52pub enum PacketCaptureOperation {
53    /// Query details.
54    Query,
55    /// Start packet capture.
56    Start,
57    /// Stop packet capture.
58    Stop,
59}
60
61/// Defines start operation data.
62#[derive(Debug, mesh::MeshPayload)]
63pub struct StartData<W: Write> {
64    pub snaplen: u32,
65    pub writers: Vec<W>,
66}
67
68/// Defines operational data.
69#[derive(Debug, mesh::MeshPayload)]
70pub enum OperationData<W: Write> {
71    OpQueryData(u32),
72    OpStartData(StartData<W>),
73}
74
75/// Additional parameters provided as part of a network packet capture trace.
76#[derive(Debug, mesh::MeshPayload)]
77pub struct PacketCaptureParams<W: Write> {
78    /// Indicates the network capture operation.
79    pub operation: PacketCaptureOperation,
80    /// Operational data that is specific to the given operation.
81    pub op_data: Option<OperationData<W>>,
82}
83
84trait PcapWriter: Send + Sync {
85    /// Writes a EnhancedPacketBlocke
86    fn write_pcapng_block_eb(&mut self, block: EnhancedPacketBlock<'_>) -> PcapResult<usize>;
87
88    /// Writes a InterfaceDescriptionBlock
89    fn write_pcapng_block_id(&mut self, block: InterfaceDescriptionBlock<'_>) -> PcapResult<usize>;
90}
91
92struct LocalPcapWriter<W: Write> {
93    inner: PcapNgWriter<W>,
94}
95
96impl<W: Write + Send + Sync> PcapWriter for LocalPcapWriter<W> {
97    fn write_pcapng_block_eb(&mut self, block: EnhancedPacketBlock<'_>) -> PcapResult<usize> {
98        self.inner.write_pcapng_block(block)
99    }
100
101    fn write_pcapng_block_id(&mut self, block: InterfaceDescriptionBlock<'_>) -> PcapResult<usize> {
102        self.inner.write_pcapng_block(block)
103    }
104}
105
106struct PacketCaptureOptions {
107    operation: PacketCaptureOperation,
108    snaplen: usize,
109    writer: Option<Box<dyn PcapWriter>>,
110}
111
112impl PacketCaptureOptions {
113    fn new_with_start<W: Write + Send + Sync + 'static>(snaplen: u32, writer: W) -> Self {
114        //TODO: Native endianness?
115        let pcap_ng_writer =
116            PcapNgWriter::with_endianness(writer, pcap_file::Endianness::Big).unwrap();
117
118        let local_writer = LocalPcapWriter {
119            inner: pcap_ng_writer,
120        };
121
122        Self {
123            operation: PacketCaptureOperation::Start,
124            snaplen: snaplen as usize,
125            writer: Some(Box::new(local_writer)),
126        }
127    }
128
129    fn new_with_stop() -> Self {
130        Self {
131            operation: PacketCaptureOperation::Stop,
132            snaplen: 0,
133            writer: None,
134        }
135    }
136}
137
138enum PacketCaptureEndpointCommand {
139    PacketCapture(FailableRpc<PacketCaptureOptions, ()>),
140}
141
142pub struct PacketCaptureEndpointControl {
143    control_tx: mesh::Sender<PacketCaptureEndpointCommand>,
144}
145
146impl PacketCaptureEndpointControl {
147    pub async fn packet_capture<W: Write + Send + Sync + 'static>(
148        &self,
149        params: PacketCaptureParams<W>,
150    ) -> anyhow::Result<PacketCaptureParams<W>> {
151        let mut params = params;
152        let options = match params.operation {
153            PacketCaptureOperation::Query | PacketCaptureOperation::Start => {
154                let Some(op_data) = &mut params.op_data else {
155                    anyhow::bail!(
156                        "Invalid input parameter. Expecting operational data, but none provided"
157                    );
158                };
159
160                match op_data {
161                    OperationData::OpQueryData(num_streams) => {
162                        return Ok(PacketCaptureParams {
163                            operation: params.operation,
164                            op_data: Some(OperationData::OpQueryData(*num_streams + 1)),
165                        });
166                    }
167                    OperationData::OpStartData(data) => {
168                        if data.writers.is_empty() {
169                            anyhow::bail!("Insufficient streams");
170                        }
171                        let socket = data.writers.remove(0);
172                        PacketCaptureOptions::new_with_start(data.snaplen, socket)
173                    }
174                }
175            }
176            PacketCaptureOperation::Stop => PacketCaptureOptions::new_with_stop(),
177        };
178
179        self.control_tx
180            .call_failable(PacketCaptureEndpointCommand::PacketCapture, options)
181            .await?;
182
183        Ok(params)
184    }
185}
186
187pub struct PacketCaptureEndpoint {
188    /// Some identifier that this endpoint can identify itself using for things
189    /// like tracing, filtering etc..
190    id: String,
191    endpoint: Box<dyn Endpoint>,
192    control_rx: Arc<Mutex<mesh::Receiver<PacketCaptureEndpointCommand>>>,
193    pcap: Arc<Pcap>,
194}
195
196impl InspectMut for PacketCaptureEndpoint {
197    fn inspect_mut(&mut self, req: inspect::Request<'_>) {
198        self.current_mut().inspect_mut(req)
199    }
200}
201
202impl PacketCaptureEndpoint {
203    pub fn new(endpoint: Box<dyn Endpoint>, id: String) -> (Self, PacketCaptureEndpointControl) {
204        let (control_tx, control_rx) = mesh::channel();
205        let control = PacketCaptureEndpointControl {
206            control_tx: control_tx.clone(),
207        };
208        let pcap = Arc::new(Pcap::new(control_tx.clone()));
209        (
210            Self {
211                id,
212                endpoint,
213                control_rx: Arc::new(Mutex::new(control_rx)),
214                pcap,
215            },
216            control,
217        )
218    }
219
220    fn current(&self) -> &dyn Endpoint {
221        self.endpoint.as_ref()
222    }
223
224    fn current_mut(&mut self) -> &mut dyn Endpoint {
225        self.endpoint.as_mut()
226    }
227}
228
229#[async_trait]
230impl Endpoint for PacketCaptureEndpoint {
231    fn endpoint_type(&self) -> &'static str {
232        self.current().endpoint_type()
233    }
234
235    async fn get_queues(
236        &mut self,
237        config: Vec<QueueConfig<'_>>,
238        rss: Option<&RssConfig<'_>>,
239        queues: &mut Vec<Box<dyn Queue>>,
240    ) -> anyhow::Result<()> {
241        if self.pcap.enabled.load(Ordering::Relaxed) {
242            tracing::trace!("using packet capture queues");
243            let mem = config[0].pool.guest_memory().clone();
244            let mut queues_inner: Vec<Box<dyn Queue>> = Vec::new();
245            self.current_mut()
246                .get_queues(config, rss, &mut queues_inner)
247                .await?;
248            while let Some(inner) = queues_inner.pop() {
249                queues.push(Box::new(PacketCaptureQueue {
250                    queue: inner,
251                    mem: mem.clone(),
252                    pcap: self.pcap.clone(),
253                }));
254            }
255        } else {
256            tracing::trace!("using inner queues");
257            self.current_mut().get_queues(config, rss, queues).await?;
258        }
259        Ok(())
260    }
261
262    async fn stop(&mut self) {
263        self.current_mut().stop().await
264    }
265
266    fn is_ordered(&self) -> bool {
267        self.current().is_ordered()
268    }
269
270    fn tx_offload_support(&self) -> TxOffloadSupport {
271        self.current().tx_offload_support()
272    }
273
274    fn multiqueue_support(&self) -> MultiQueueSupport {
275        self.current().multiqueue_support()
276    }
277
278    fn tx_fast_completions(&self) -> bool {
279        self.current().tx_fast_completions()
280    }
281
282    async fn set_data_path_to_guest_vf(&self, use_vf: bool) -> anyhow::Result<()> {
283        self.current().set_data_path_to_guest_vf(use_vf).await
284    }
285
286    async fn get_data_path_to_guest_vf(&self) -> anyhow::Result<bool> {
287        self.current().get_data_path_to_guest_vf().await
288    }
289
290    async fn wait_for_endpoint_action(&mut self) -> EndpointAction {
291        enum Message {
292            PacketCaptureEndpointCommand(PacketCaptureEndpointCommand),
293            UpdateFromEndpoint(EndpointAction),
294        }
295        loop {
296            let receiver = self.control_rx.clone();
297            let mut receive_update = receiver.lock().await;
298            let update = async {
299                match receive_update.next().await {
300                    Some(m) => Message::PacketCaptureEndpointCommand(m),
301                    None => {
302                        std::future::pending::<()>().await;
303                        unreachable!()
304                    }
305                }
306            };
307            let ep_update = self
308                .current_mut()
309                .wait_for_endpoint_action()
310                .map(Message::UpdateFromEndpoint);
311            let m = (update, ep_update).race().await;
312            match m {
313                Message::PacketCaptureEndpointCommand(
314                    PacketCaptureEndpointCommand::PacketCapture(rpc),
315                ) => {
316                    let (options, response) = rpc.split();
317                    let result = async {
318                        let id = &self.id;
319                        let start = match options.operation {
320                            PacketCaptureOperation::Start => {
321                                tracing::info!(id, "starting trace");
322                                true
323                            }
324                            PacketCaptureOperation::Stop => {
325                                tracing::info!(id, "stopping trace");
326                                false
327                            }
328                            _ => Err(anyhow::anyhow!("Unexpected packet capture option {id}"))?,
329                        };
330
331                        // Keep the lock until all values are being set to make the update atomic.
332                        let mut pcap_writer = self.pcap.pcap_writer.lock();
333                        let restart_required = start != self.pcap.enabled.load(Ordering::Relaxed);
334                        self.pcap.snaplen.store(options.snaplen, Ordering::Relaxed);
335                        self.pcap
336                            .interface_descriptor_written
337                            .store(false, Ordering::Relaxed);
338                        self.pcap.enabled.store(start, Ordering::Relaxed);
339                        *pcap_writer = options.writer;
340                        anyhow::Ok(restart_required)
341                    }
342                    .await;
343                    let (result, restart_required) = match result {
344                        Err(e) => (Err(e), false),
345                        Ok(value) => (Ok(()), value),
346                    };
347                    response.complete(result.map_err(RemoteError::new));
348                    if restart_required {
349                        break EndpointAction::RestartRequired;
350                    }
351                }
352                Message::UpdateFromEndpoint(update) => break update,
353            }
354        }
355    }
356
357    fn link_speed(&self) -> u64 {
358        self.current().link_speed()
359    }
360}
361
362struct Pcap {
363    // N.B Lock/update semantics: Keep the `pcap_writer` lock while updating
364    //  the other fields.
365    pcap_writer: parking_lot::Mutex<Option<Box<dyn PcapWriter>>>,
366    interface_descriptor_written: AtomicBool,
367    enabled: AtomicBool,
368    snaplen: AtomicUsize,
369    endpoint_control: mesh::Sender<PacketCaptureEndpointCommand>,
370}
371
372impl Pcap {
373    fn new(endpoint_control: mesh::Sender<PacketCaptureEndpointCommand>) -> Self {
374        Self {
375            enabled: AtomicBool::new(false),
376            snaplen: AtomicUsize::new(65535),
377            pcap_writer: parking_lot::Mutex::new(None),
378            interface_descriptor_written: AtomicBool::new(false),
379            endpoint_control,
380        }
381    }
382
383    fn write_packet(
384        &self,
385        buf: &[u8],
386        original_len: u32,
387        snaplen: u32,
388        timestamp: &Duration,
389    ) -> bool {
390        let mut locked_writer = self.pcap_writer.lock();
391        let Some(pcap_writer) = &mut *locked_writer else {
392            return false;
393        };
394
395        let handle_write_result = |r: PcapResult<usize>| match r {
396            // Writer gone unexpectedly; disable packet capture.
397            Err(PcapError::IoError(_)) => {
398                // No particular benefit of using compare_exchange atomic here
399                // as the pcap writer lock is held.
400                if self.enabled.load(Ordering::Relaxed) {
401                    self.enabled.store(false, Ordering::Relaxed);
402                    let stop = PacketCaptureOptions::new_with_stop();
403                    // Best effort.
404                    drop(
405                        self.endpoint_control
406                            .call(PacketCaptureEndpointCommand::PacketCapture, stop),
407                    );
408                }
409                Err(())
410            }
411            _ => Ok(()),
412        };
413
414        if !self.interface_descriptor_written.load(Ordering::Relaxed) {
415            let interface = InterfaceDescriptionBlock {
416                linktype: DataLink::ETHERNET,
417                snaplen,
418                options: vec![],
419            };
420            if handle_write_result(pcap_writer.write_pcapng_block_id(interface)).is_err() {
421                *locked_writer = None;
422                return false;
423            }
424            self.interface_descriptor_written
425                .store(true, Ordering::Relaxed);
426        }
427
428        let packet = EnhancedPacketBlock {
429            interface_id: 0,
430            timestamp: *timestamp,
431            original_len,
432            data: Cow::Borrowed(buf),
433            options: vec![],
434        };
435
436        if handle_write_result(pcap_writer.write_pcapng_block_eb(packet)).is_err() {
437            *locked_writer = None;
438            return false;
439        }
440
441        true
442    }
443}
444
445struct PacketCaptureQueue {
446    queue: Box<dyn Queue>,
447    mem: GuestMemory,
448    pcap: Arc<Pcap>,
449}
450
451impl PacketCaptureQueue {
452    fn current_mut(&mut self) -> &mut dyn Queue {
453        self.queue.as_mut()
454    }
455}
456
457#[async_trait]
458impl Queue for PacketCaptureQueue {
459    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
460        self.current_mut().poll_ready(cx)
461    }
462
463    fn rx_avail(&mut self, done: &[RxId]) {
464        self.current_mut().rx_avail(done)
465    }
466
467    fn rx_poll(&mut self, packets: &mut [RxId]) -> anyhow::Result<usize> {
468        let n = self.current_mut().rx_poll(packets)?;
469        if self.pcap.enabled.load(Ordering::Relaxed) {
470            if let Some(pool) = self.queue.buffer_access() {
471                let timestamp = SystemTime::now()
472                    .duration_since(UNIX_EPOCH)
473                    .unwrap_or(Duration::new(0, 0));
474                let snaplen = self.pcap.snaplen.load(Ordering::Relaxed);
475                for id in &packets[..n] {
476                    let mut buf = vec![0; snaplen];
477                    let mut len = 0;
478                    let mut pkt_len = 0;
479                    for segment in pool.guest_addresses(*id).iter() {
480                        pkt_len += segment.len;
481                        if len == buf.len() {
482                            continue;
483                        }
484
485                        let copy_length = std::cmp::min(buf.len() - len, segment.len as usize);
486                        let _ = self.mem.read_at(segment.gpa, &mut buf[len..]);
487                        len += copy_length;
488                    }
489
490                    if len == 0 {
491                        continue;
492                    }
493
494                    if !self
495                        .pcap
496                        .write_packet(&buf[..len], pkt_len, snaplen as u32, &timestamp)
497                    {
498                        break;
499                    }
500                }
501            }
502        }
503        Ok(n)
504    }
505
506    fn tx_avail(&mut self, segments: &[TxSegment]) -> anyhow::Result<(bool, usize)> {
507        if self.pcap.enabled.load(Ordering::Relaxed) {
508            let mut segments = segments;
509            let timestamp = SystemTime::now()
510                .duration_since(UNIX_EPOCH)
511                .unwrap_or(Duration::new(0, 0));
512            let snaplen = self.pcap.snaplen.load(Ordering::Relaxed);
513            while !segments.is_empty() {
514                let (metadata, this, rest) = next_packet(segments);
515                segments = rest;
516                if metadata.len == 0 {
517                    continue;
518                }
519                let mut buf = vec![0; snaplen];
520                let mut len = 0;
521                for segment in this {
522                    if len == buf.len() {
523                        break;
524                    }
525
526                    let copy_length = std::cmp::min(buf.len() - len, segment.len as usize);
527                    let _ = self.mem.read_at(segment.gpa, &mut buf[len..]);
528                    len += copy_length;
529                }
530
531                if len == 0 {
532                    continue;
533                }
534
535                if !self.pcap.write_packet(
536                    &buf[..len],
537                    metadata.len as u32,
538                    snaplen as u32,
539                    &timestamp,
540                ) {
541                    break;
542                }
543            }
544        }
545        self.current_mut().tx_avail(segments)
546    }
547
548    fn tx_poll(&mut self, done: &mut [TxId]) -> Result<usize, TxError> {
549        self.current_mut().tx_poll(done)
550    }
551
552    fn buffer_access(&mut self) -> Option<&mut dyn BufferAccess> {
553        self.queue.buffer_access()
554    }
555}
556
557impl InspectMut for PacketCaptureQueue {
558    fn inspect_mut(&mut self, req: inspect::Request<'_>) {
559        self.current_mut().inspect_mut(req)
560    }
561}