1#![expect(missing_docs)]
7#![forbid(unsafe_code)]
8
9use async_trait::async_trait;
10use futures::FutureExt;
11use futures::StreamExt;
12use futures::lock::Mutex;
13use futures_concurrency::future::Race;
14use guestmem::GuestMemory;
15use inspect::InspectMut;
16use mesh::error::RemoteError;
17use mesh::rpc::FailableRpc;
18use mesh::rpc::RpcSend;
19use net_backend::BufferAccess;
20use net_backend::Endpoint;
21use net_backend::EndpointAction;
22use net_backend::MultiQueueSupport;
23use net_backend::Queue;
24use net_backend::QueueConfig;
25use net_backend::RssConfig;
26use net_backend::RxId;
27use net_backend::TxError;
28use net_backend::TxId;
29use net_backend::TxOffloadSupport;
30use net_backend::TxSegment;
31use net_backend::next_packet;
32use pcap_file::DataLink;
33use pcap_file::PcapError;
34use pcap_file::PcapResult;
35use pcap_file::pcapng::PcapNgWriter;
36use pcap_file::pcapng::blocks::enhanced_packet::EnhancedPacketBlock;
37use pcap_file::pcapng::blocks::interface_description::InterfaceDescriptionBlock;
38use std::borrow::Cow;
39use std::io::Write;
40use std::sync::Arc;
41use std::sync::atomic::AtomicBool;
42use std::sync::atomic::AtomicUsize;
43use std::sync::atomic::Ordering;
44use std::task::Context;
45use std::task::Poll;
46use std::time::Duration;
47use std::time::SystemTime;
48use std::time::UNIX_EPOCH;
49
50#[derive(Debug, PartialEq, mesh::MeshPayload)]
52pub enum PacketCaptureOperation {
53 Query,
55 Start,
57 Stop,
59}
60
61#[derive(Debug, mesh::MeshPayload)]
63pub struct StartData<W: Write> {
64 pub snaplen: u32,
65 pub writers: Vec<W>,
66}
67
68#[derive(Debug, mesh::MeshPayload)]
70pub enum OperationData<W: Write> {
71 OpQueryData(u32),
72 OpStartData(StartData<W>),
73}
74
75#[derive(Debug, mesh::MeshPayload)]
77pub struct PacketCaptureParams<W: Write> {
78 pub operation: PacketCaptureOperation,
80 pub op_data: Option<OperationData<W>>,
82}
83
84trait PcapWriter: Send + Sync {
85 fn write_pcapng_block_eb(&mut self, block: EnhancedPacketBlock<'_>) -> PcapResult<usize>;
87
88 fn write_pcapng_block_id(&mut self, block: InterfaceDescriptionBlock<'_>) -> PcapResult<usize>;
90}
91
92struct LocalPcapWriter<W: Write> {
93 inner: PcapNgWriter<W>,
94}
95
96impl<W: Write + Send + Sync> PcapWriter for LocalPcapWriter<W> {
97 fn write_pcapng_block_eb(&mut self, block: EnhancedPacketBlock<'_>) -> PcapResult<usize> {
98 self.inner.write_pcapng_block(block)
99 }
100
101 fn write_pcapng_block_id(&mut self, block: InterfaceDescriptionBlock<'_>) -> PcapResult<usize> {
102 self.inner.write_pcapng_block(block)
103 }
104}
105
106struct PacketCaptureOptions {
107 operation: PacketCaptureOperation,
108 snaplen: usize,
109 writer: Option<Box<dyn PcapWriter>>,
110}
111
112impl PacketCaptureOptions {
113 fn new_with_start<W: Write + Send + Sync + 'static>(snaplen: u32, writer: W) -> Self {
114 let pcap_ng_writer =
116 PcapNgWriter::with_endianness(writer, pcap_file::Endianness::Big).unwrap();
117
118 let local_writer = LocalPcapWriter {
119 inner: pcap_ng_writer,
120 };
121
122 Self {
123 operation: PacketCaptureOperation::Start,
124 snaplen: snaplen as usize,
125 writer: Some(Box::new(local_writer)),
126 }
127 }
128
129 fn new_with_stop() -> Self {
130 Self {
131 operation: PacketCaptureOperation::Stop,
132 snaplen: 0,
133 writer: None,
134 }
135 }
136}
137
138enum PacketCaptureEndpointCommand {
139 PacketCapture(FailableRpc<PacketCaptureOptions, ()>),
140}
141
142pub struct PacketCaptureEndpointControl {
143 control_tx: mesh::Sender<PacketCaptureEndpointCommand>,
144}
145
146impl PacketCaptureEndpointControl {
147 pub async fn packet_capture<W: Write + Send + Sync + 'static>(
148 &self,
149 params: PacketCaptureParams<W>,
150 ) -> anyhow::Result<PacketCaptureParams<W>> {
151 let mut params = params;
152 let options = match params.operation {
153 PacketCaptureOperation::Query | PacketCaptureOperation::Start => {
154 let Some(op_data) = &mut params.op_data else {
155 anyhow::bail!(
156 "Invalid input parameter. Expecting operational data, but none provided"
157 );
158 };
159
160 match op_data {
161 OperationData::OpQueryData(num_streams) => {
162 return Ok(PacketCaptureParams {
163 operation: params.operation,
164 op_data: Some(OperationData::OpQueryData(*num_streams + 1)),
165 });
166 }
167 OperationData::OpStartData(data) => {
168 if data.writers.is_empty() {
169 anyhow::bail!("Insufficient streams");
170 }
171 let socket = data.writers.remove(0);
172 PacketCaptureOptions::new_with_start(data.snaplen, socket)
173 }
174 }
175 }
176 PacketCaptureOperation::Stop => PacketCaptureOptions::new_with_stop(),
177 };
178
179 self.control_tx
180 .call_failable(PacketCaptureEndpointCommand::PacketCapture, options)
181 .await?;
182
183 Ok(params)
184 }
185}
186
187pub struct PacketCaptureEndpoint {
188 id: String,
191 endpoint: Box<dyn Endpoint>,
192 control_rx: Arc<Mutex<mesh::Receiver<PacketCaptureEndpointCommand>>>,
193 pcap: Arc<Pcap>,
194}
195
196impl InspectMut for PacketCaptureEndpoint {
197 fn inspect_mut(&mut self, req: inspect::Request<'_>) {
198 self.current_mut().inspect_mut(req)
199 }
200}
201
202impl PacketCaptureEndpoint {
203 pub fn new(endpoint: Box<dyn Endpoint>, id: String) -> (Self, PacketCaptureEndpointControl) {
204 let (control_tx, control_rx) = mesh::channel();
205 let control = PacketCaptureEndpointControl {
206 control_tx: control_tx.clone(),
207 };
208 let pcap = Arc::new(Pcap::new(control_tx.clone()));
209 (
210 Self {
211 id,
212 endpoint,
213 control_rx: Arc::new(Mutex::new(control_rx)),
214 pcap,
215 },
216 control,
217 )
218 }
219
220 fn current(&self) -> &dyn Endpoint {
221 self.endpoint.as_ref()
222 }
223
224 fn current_mut(&mut self) -> &mut dyn Endpoint {
225 self.endpoint.as_mut()
226 }
227}
228
229#[async_trait]
230impl Endpoint for PacketCaptureEndpoint {
231 fn endpoint_type(&self) -> &'static str {
232 self.current().endpoint_type()
233 }
234
235 async fn get_queues(
236 &mut self,
237 config: Vec<QueueConfig<'_>>,
238 rss: Option<&RssConfig<'_>>,
239 queues: &mut Vec<Box<dyn Queue>>,
240 ) -> anyhow::Result<()> {
241 if self.pcap.enabled.load(Ordering::Relaxed) {
242 tracing::trace!("using packet capture queues");
243 let mem = config[0].pool.guest_memory().clone();
244 let mut queues_inner: Vec<Box<dyn Queue>> = Vec::new();
245 self.current_mut()
246 .get_queues(config, rss, &mut queues_inner)
247 .await?;
248 while let Some(inner) = queues_inner.pop() {
249 queues.push(Box::new(PacketCaptureQueue {
250 queue: inner,
251 mem: mem.clone(),
252 pcap: self.pcap.clone(),
253 }));
254 }
255 } else {
256 tracing::trace!("using inner queues");
257 self.current_mut().get_queues(config, rss, queues).await?;
258 }
259 Ok(())
260 }
261
262 async fn stop(&mut self) {
263 self.current_mut().stop().await
264 }
265
266 fn is_ordered(&self) -> bool {
267 self.current().is_ordered()
268 }
269
270 fn tx_offload_support(&self) -> TxOffloadSupport {
271 self.current().tx_offload_support()
272 }
273
274 fn multiqueue_support(&self) -> MultiQueueSupport {
275 self.current().multiqueue_support()
276 }
277
278 fn tx_fast_completions(&self) -> bool {
279 self.current().tx_fast_completions()
280 }
281
282 async fn set_data_path_to_guest_vf(&self, use_vf: bool) -> anyhow::Result<()> {
283 self.current().set_data_path_to_guest_vf(use_vf).await
284 }
285
286 async fn get_data_path_to_guest_vf(&self) -> anyhow::Result<bool> {
287 self.current().get_data_path_to_guest_vf().await
288 }
289
290 async fn wait_for_endpoint_action(&mut self) -> EndpointAction {
291 enum Message {
292 PacketCaptureEndpointCommand(PacketCaptureEndpointCommand),
293 UpdateFromEndpoint(EndpointAction),
294 }
295 loop {
296 let receiver = self.control_rx.clone();
297 let mut receive_update = receiver.lock().await;
298 let update = async {
299 match receive_update.next().await {
300 Some(m) => Message::PacketCaptureEndpointCommand(m),
301 None => {
302 std::future::pending::<()>().await;
303 unreachable!()
304 }
305 }
306 };
307 let ep_update = self
308 .current_mut()
309 .wait_for_endpoint_action()
310 .map(Message::UpdateFromEndpoint);
311 let m = (update, ep_update).race().await;
312 match m {
313 Message::PacketCaptureEndpointCommand(
314 PacketCaptureEndpointCommand::PacketCapture(rpc),
315 ) => {
316 let (options, response) = rpc.split();
317 let result = async {
318 let id = &self.id;
319 let start = match options.operation {
320 PacketCaptureOperation::Start => {
321 tracing::info!(id, "starting trace");
322 true
323 }
324 PacketCaptureOperation::Stop => {
325 tracing::info!(id, "stopping trace");
326 false
327 }
328 _ => Err(anyhow::anyhow!("Unexpected packet capture option {id}"))?,
329 };
330
331 let mut pcap_writer = self.pcap.pcap_writer.lock();
333 let restart_required = start != self.pcap.enabled.load(Ordering::Relaxed);
334 self.pcap.snaplen.store(options.snaplen, Ordering::Relaxed);
335 self.pcap
336 .interface_descriptor_written
337 .store(false, Ordering::Relaxed);
338 self.pcap.enabled.store(start, Ordering::Relaxed);
339 *pcap_writer = options.writer;
340 anyhow::Ok(restart_required)
341 }
342 .await;
343 let (result, restart_required) = match result {
344 Err(e) => (Err(e), false),
345 Ok(value) => (Ok(()), value),
346 };
347 response.complete(result.map_err(RemoteError::new));
348 if restart_required {
349 break EndpointAction::RestartRequired;
350 }
351 }
352 Message::UpdateFromEndpoint(update) => break update,
353 }
354 }
355 }
356
357 fn link_speed(&self) -> u64 {
358 self.current().link_speed()
359 }
360}
361
362struct Pcap {
363 pcap_writer: parking_lot::Mutex<Option<Box<dyn PcapWriter>>>,
366 interface_descriptor_written: AtomicBool,
367 enabled: AtomicBool,
368 snaplen: AtomicUsize,
369 endpoint_control: mesh::Sender<PacketCaptureEndpointCommand>,
370}
371
372impl Pcap {
373 fn new(endpoint_control: mesh::Sender<PacketCaptureEndpointCommand>) -> Self {
374 Self {
375 enabled: AtomicBool::new(false),
376 snaplen: AtomicUsize::new(65535),
377 pcap_writer: parking_lot::Mutex::new(None),
378 interface_descriptor_written: AtomicBool::new(false),
379 endpoint_control,
380 }
381 }
382
383 fn write_packet(
384 &self,
385 buf: &[u8],
386 original_len: u32,
387 snaplen: u32,
388 timestamp: &Duration,
389 ) -> bool {
390 let mut locked_writer = self.pcap_writer.lock();
391 let Some(pcap_writer) = &mut *locked_writer else {
392 return false;
393 };
394
395 let handle_write_result = |r: PcapResult<usize>| match r {
396 Err(PcapError::IoError(_)) => {
398 if self.enabled.load(Ordering::Relaxed) {
401 self.enabled.store(false, Ordering::Relaxed);
402 let stop = PacketCaptureOptions::new_with_stop();
403 drop(
405 self.endpoint_control
406 .call(PacketCaptureEndpointCommand::PacketCapture, stop),
407 );
408 }
409 Err(())
410 }
411 _ => Ok(()),
412 };
413
414 if !self.interface_descriptor_written.load(Ordering::Relaxed) {
415 let interface = InterfaceDescriptionBlock {
416 linktype: DataLink::ETHERNET,
417 snaplen,
418 options: vec![],
419 };
420 if handle_write_result(pcap_writer.write_pcapng_block_id(interface)).is_err() {
421 *locked_writer = None;
422 return false;
423 }
424 self.interface_descriptor_written
425 .store(true, Ordering::Relaxed);
426 }
427
428 let packet = EnhancedPacketBlock {
429 interface_id: 0,
430 timestamp: *timestamp,
431 original_len,
432 data: Cow::Borrowed(buf),
433 options: vec![],
434 };
435
436 if handle_write_result(pcap_writer.write_pcapng_block_eb(packet)).is_err() {
437 *locked_writer = None;
438 return false;
439 }
440
441 true
442 }
443}
444
445struct PacketCaptureQueue {
446 queue: Box<dyn Queue>,
447 mem: GuestMemory,
448 pcap: Arc<Pcap>,
449}
450
451impl PacketCaptureQueue {
452 fn current_mut(&mut self) -> &mut dyn Queue {
453 self.queue.as_mut()
454 }
455}
456
457#[async_trait]
458impl Queue for PacketCaptureQueue {
459 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
460 self.current_mut().poll_ready(cx)
461 }
462
463 fn rx_avail(&mut self, done: &[RxId]) {
464 self.current_mut().rx_avail(done)
465 }
466
467 fn rx_poll(&mut self, packets: &mut [RxId]) -> anyhow::Result<usize> {
468 let n = self.current_mut().rx_poll(packets)?;
469 if self.pcap.enabled.load(Ordering::Relaxed) {
470 if let Some(pool) = self.queue.buffer_access() {
471 let timestamp = SystemTime::now()
472 .duration_since(UNIX_EPOCH)
473 .unwrap_or(Duration::new(0, 0));
474 let snaplen = self.pcap.snaplen.load(Ordering::Relaxed);
475 for id in &packets[..n] {
476 let mut buf = vec![0; snaplen];
477 let mut len = 0;
478 let mut pkt_len = 0;
479 for segment in pool.guest_addresses(*id).iter() {
480 pkt_len += segment.len;
481 if len == buf.len() {
482 continue;
483 }
484
485 let copy_length = std::cmp::min(buf.len() - len, segment.len as usize);
486 let _ = self.mem.read_at(segment.gpa, &mut buf[len..]);
487 len += copy_length;
488 }
489
490 if len == 0 {
491 continue;
492 }
493
494 if !self
495 .pcap
496 .write_packet(&buf[..len], pkt_len, snaplen as u32, ×tamp)
497 {
498 break;
499 }
500 }
501 }
502 }
503 Ok(n)
504 }
505
506 fn tx_avail(&mut self, segments: &[TxSegment]) -> anyhow::Result<(bool, usize)> {
507 if self.pcap.enabled.load(Ordering::Relaxed) {
508 let mut segments = segments;
509 let timestamp = SystemTime::now()
510 .duration_since(UNIX_EPOCH)
511 .unwrap_or(Duration::new(0, 0));
512 let snaplen = self.pcap.snaplen.load(Ordering::Relaxed);
513 while !segments.is_empty() {
514 let (metadata, this, rest) = next_packet(segments);
515 segments = rest;
516 if metadata.len == 0 {
517 continue;
518 }
519 let mut buf = vec![0; snaplen];
520 let mut len = 0;
521 for segment in this {
522 if len == buf.len() {
523 break;
524 }
525
526 let copy_length = std::cmp::min(buf.len() - len, segment.len as usize);
527 let _ = self.mem.read_at(segment.gpa, &mut buf[len..]);
528 len += copy_length;
529 }
530
531 if len == 0 {
532 continue;
533 }
534
535 if !self.pcap.write_packet(
536 &buf[..len],
537 metadata.len as u32,
538 snaplen as u32,
539 ×tamp,
540 ) {
541 break;
542 }
543 }
544 }
545 self.current_mut().tx_avail(segments)
546 }
547
548 fn tx_poll(&mut self, done: &mut [TxId]) -> Result<usize, TxError> {
549 self.current_mut().tx_poll(done)
550 }
551
552 fn buffer_access(&mut self) -> Option<&mut dyn BufferAccess> {
553 self.queue.buffer_access()
554 }
555}
556
557impl InspectMut for PacketCaptureQueue {
558 fn inspect_mut(&mut self, req: inspect::Request<'_>) {
559 self.current_mut().inspect_mut(req)
560 }
561}