pal_async/
pipe.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Asynchronous OS pipes.
5
6pub use crate::sys::pipe::PolledPipe;
7
8#[cfg(test)]
9mod tests {
10    use super::PolledPipe;
11    use crate::DefaultDriver;
12    use futures::AsyncReadExt;
13    use futures::AsyncWriteExt;
14    use pal_async_test::async_test;
15
16    #[async_test]
17    async fn pipe(driver: DefaultDriver) {
18        let (mut a, mut b) = PolledPipe::pair(&driver).unwrap();
19        b.write_all(b"abc").await.unwrap();
20        let mut v = vec![0; 3];
21        a.read_exact(&mut v).await.unwrap();
22        assert_eq!(v.as_slice(), b"abc");
23        let mut read = a.read_to_end(&mut v);
24        assert!(futures::poll!(&mut read).is_pending());
25        b.write_all(b"def").await.unwrap();
26        drop(b);
27        read.await.unwrap();
28        assert_eq!(v.as_slice(), b"abcdef");
29    }
30
31    #[cfg(windows)] // relies on pipes being bidirectional
32    #[async_test]
33    async fn pipe_relay(driver: DefaultDriver) {
34        let (a, b) = PolledPipe::pair(&driver).unwrap();
35        let (a_r, mut a_w) = a.split();
36        let (b_r, mut b_w) = b.split();
37        let relay = async move {
38            assert_eq!(
39                futures::io::copy(b_r.take(0x100000), &mut b_w)
40                    .await
41                    .unwrap(),
42                0x100000
43            );
44            drop(b_w);
45        };
46        let feed = async move {
47            assert_eq!(
48                futures::io::copy(futures::io::repeat(0xcc).take(0x100000), &mut a_w)
49                    .await
50                    .unwrap(),
51                0x100000
52            );
53            drop(a_w);
54        };
55        let drain = async {
56            assert_eq!(
57                futures::io::copy(a_r, &mut futures::io::sink())
58                    .await
59                    .unwrap(),
60                0x100000
61            );
62        };
63        futures::future::join3(relay, feed, drain).await;
64    }
65}