diag_client/
kmsg_stream.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Types for handling a kmsg byte stream, which is a series of kmsg entries
5//! separated by null terminators.
6
7use diag_proto::FILE_LINE_MAX;
8use futures::AsyncRead;
9use pal_async::socket::PolledSocket;
10use std::io;
11use std::pin::Pin;
12use std::task::Context;
13use std::task::Poll;
14
15/// A stream of data from a /dev/kmsg device, whose contents are defined to have
16/// distinct entries separated by null bytes.
17pub struct KmsgStream {
18    socket: PolledSocket<socket2::Socket>,
19    buffer: Vec<u8>,
20    end: usize,
21}
22
23impl KmsgStream {
24    pub(crate) fn new(socket: PolledSocket<socket2::Socket>) -> Self {
25        Self {
26            socket,
27            buffer: vec![0; FILE_LINE_MAX],
28            end: 0,
29        }
30    }
31}
32
33impl futures::Stream for KmsgStream {
34    type Item = io::Result<Vec<u8>>;
35
36    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37        let this = self.get_mut();
38        // The entries are separated by null terminators. Read until we
39        // find a null terminator.
40        loop {
41            if let Some(len) = this.buffer[..this.end].iter().position(|&x| x == 0) {
42                let line = this.buffer[..len].to_vec();
43                this.buffer.copy_within(len + 1..this.end, 0);
44                this.end -= len + 1;
45                break Poll::Ready(Some(Ok(line)));
46            } else if this.end == this.buffer.len() {
47                return Poll::Ready(Some(Err(io::Error::new(
48                    io::ErrorKind::InvalidData,
49                    "missing null terminator",
50                ))));
51            } else {
52                match std::task::ready!(
53                    Pin::new(&mut this.socket).poll_read(cx, &mut this.buffer[this.end..])
54                ) {
55                    Ok(n) => {
56                        if n == 0 {
57                            break Poll::Ready(None);
58                        }
59                        this.end += n
60                    }
61                    Err(err) => return Poll::Ready(Some(Err(err))),
62                }
63            }
64        }
65    }
66}