diag_client/
kmsg_stream.rs1use diag_proto::FILE_LINE_MAX;
8use futures::AsyncRead;
9use pal_async::socket::PolledSocket;
10use std::io;
11use std::pin::Pin;
12use std::task::Context;
13use std::task::Poll;
14
15pub struct KmsgStream {
18 socket: PolledSocket<socket2::Socket>,
19 buffer: Vec<u8>,
20 end: usize,
21}
22
23impl KmsgStream {
24 pub(crate) fn new(socket: PolledSocket<socket2::Socket>) -> Self {
25 Self {
26 socket,
27 buffer: vec![0; FILE_LINE_MAX],
28 end: 0,
29 }
30 }
31}
32
33impl futures::Stream for KmsgStream {
34 type Item = io::Result<Vec<u8>>;
35
36 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37 let this = self.get_mut();
38 loop {
41 if let Some(len) = this.buffer[..this.end].iter().position(|&x| x == 0) {
42 let line = this.buffer[..len].to_vec();
43 this.buffer.copy_within(len + 1..this.end, 0);
44 this.end -= len + 1;
45 break Poll::Ready(Some(Ok(line)));
46 } else if this.end == this.buffer.len() {
47 return Poll::Ready(Some(Err(io::Error::new(
48 io::ErrorKind::InvalidData,
49 "missing null terminator",
50 ))));
51 } else {
52 match std::task::ready!(
53 Pin::new(&mut this.socket).poll_read(cx, &mut this.buffer[this.end..])
54 ) {
55 Ok(n) => {
56 if n == 0 {
57 break Poll::Ready(None);
58 }
59 this.end += n
60 }
61 Err(err) => return Poll::Ready(Some(Err(err))),
62 }
63 }
64 }
65 }
66}