pal_async/unix/
pipe.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Unix polled pipe wrapper.
5
6use crate::driver::Driver;
7use crate::driver::PollImpl;
8use crate::fd::PollFdReady;
9use crate::interest::InterestSlot;
10use crate::interest::PollEvents;
11use crate::socket::PollReady;
12use futures::AsyncRead;
13use futures::AsyncWrite;
14use pal::unix::pipe::set_nonblocking;
15use parking_lot::Mutex;
16use std::fs::File;
17use std::io;
18use std::io::Read;
19use std::io::Write;
20use std::os::unix::prelude::*;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Context;
24use std::task::Poll;
25
26/// A polled Unix pipe (or other pipe-like file).
27pub struct PolledPipe {
28    fd_ready: PollImpl<dyn PollFdReady>,
29    file: File,
30}
31
32impl PolledPipe {
33    /// Creates a polled pipe from a file.
34    pub fn new(driver: &(impl ?Sized + Driver), file: File) -> io::Result<Self> {
35        let fd_ready = driver.new_dyn_fd_ready(file.as_raw_fd())?;
36        set_nonblocking(&file, true)?;
37        Ok(Self { fd_ready, file })
38    }
39
40    /// Creates a connected pair of polled pipes, returning (read pipe, write pipe).
41    pub fn pair(driver: &(impl ?Sized + Driver)) -> io::Result<(Self, Self)> {
42        let (a, b) = Self::file_pair()?;
43        Ok((Self::new(driver, a)?, Self::new(driver, b)?))
44    }
45
46    /// Creates a connected pair of pipes (read pipe, write pipe) suitable for
47    /// passing to [`Self::new`].
48    pub fn file_pair() -> io::Result<(File, File)> {
49        pal::unix::pipe::pair()
50    }
51
52    /// Returns the inner pipe file.
53    pub fn into_inner(self) -> File {
54        set_nonblocking(&self.file, false).unwrap();
55        self.file
56    }
57
58    /// Returns the inner file.
59    pub fn get(&self) -> &File {
60        &self.file
61    }
62
63    /// Splits the file into a read and write half that can be used
64    /// concurrently.
65    ///
66    /// This is more flexible and efficient than
67    /// [`futures::io::AsyncReadExt::split`], since it avoids holding a lock
68    /// while calling into the kernel, and it provides access to the underlying
69    /// file for more advanced operations.
70    pub fn split(self) -> (ReadHalf, WriteHalf) {
71        let inner = Arc::new(SplitInner {
72            fd_ready: Mutex::new(self.fd_ready),
73            file: self.file,
74        });
75        (
76            ReadHalf {
77                inner: inner.clone(),
78            },
79            WriteHalf { inner },
80        )
81    }
82
83    fn poll_io<F, R>(
84        &mut self,
85        cx: &mut Context<'_>,
86        slot: InterestSlot,
87        events: PollEvents,
88        mut f: F,
89    ) -> Poll<io::Result<R>>
90    where
91        F: FnMut(&mut Self) -> io::Result<R>,
92    {
93        loop {
94            std::task::ready!(self.fd_ready.poll_fd_ready(cx, slot, events));
95            match f(self) {
96                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
97                    self.fd_ready.clear_fd_ready(slot);
98                }
99                r => break Poll::Ready(r),
100            }
101        }
102    }
103}
104
105impl PollReady for PolledPipe {
106    fn poll_ready(&mut self, cx: &mut Context<'_>, events: PollEvents) -> Poll<PollEvents> {
107        self.fd_ready.poll_fd_ready(cx, InterestSlot::Read, events)
108    }
109}
110
111impl AsyncRead for PolledPipe {
112    fn poll_read(
113        mut self: Pin<&mut Self>,
114        cx: &mut Context<'_>,
115        buf: &mut [u8],
116    ) -> Poll<io::Result<usize>> {
117        self.poll_io(cx, InterestSlot::Read, PollEvents::IN, |this| {
118            this.file.read(buf)
119        })
120    }
121
122    fn poll_read_vectored(
123        mut self: Pin<&mut Self>,
124        cx: &mut Context<'_>,
125        bufs: &mut [io::IoSliceMut<'_>],
126    ) -> Poll<io::Result<usize>> {
127        self.poll_io(cx, InterestSlot::Read, PollEvents::IN, |this| {
128            this.file.read_vectored(bufs)
129        })
130    }
131}
132
133impl AsyncWrite for PolledPipe {
134    fn poll_write(
135        mut self: Pin<&mut Self>,
136        cx: &mut Context<'_>,
137        buf: &[u8],
138    ) -> Poll<io::Result<usize>> {
139        self.poll_io(cx, InterestSlot::Write, PollEvents::OUT, |this| {
140            this.file.write(buf)
141        })
142    }
143
144    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
145        self.poll_io(cx, InterestSlot::Write, PollEvents::OUT, |this| {
146            this.file.flush()
147        })
148    }
149
150    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
151        Poll::Ready(Err(io::ErrorKind::Unsupported.into()))
152    }
153
154    fn poll_write_vectored(
155        mut self: Pin<&mut Self>,
156        cx: &mut Context<'_>,
157        bufs: &[io::IoSlice<'_>],
158    ) -> Poll<io::Result<usize>> {
159        self.poll_io(cx, InterestSlot::Write, PollEvents::OUT, |this| {
160            this.file.write_vectored(bufs)
161        })
162    }
163}
164
165struct SplitInner {
166    fd_ready: Mutex<PollImpl<dyn PollFdReady>>, // must be first--some executors require that it's dropped before file.
167    file: File,
168}
169
170/// The read half of a file, via [`PolledPipe::split`].
171pub struct ReadHalf {
172    inner: Arc<SplitInner>,
173}
174
175impl ReadHalf {
176    /// Gets a reference to the inner file.
177    pub fn get(&self) -> &File {
178        &self.inner.file
179    }
180
181    /// Calls nonblocking operation `f` when the file is ready for read.
182    ///
183    /// If `f` returns `Err(err)` with `err.kind() ==
184    /// io::ErrorKind::WouldBlock`, then this re-polls the file for readiness
185    /// and returns `Poll::Pending`.
186    pub fn poll_io<F, R>(&mut self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
187    where
188        F: FnMut(&mut Self) -> io::Result<R>,
189    {
190        loop {
191            std::task::ready!(self.inner.fd_ready.lock().poll_fd_ready(
192                cx,
193                InterestSlot::Read,
194                PollEvents::IN
195            ));
196            match f(self) {
197                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
198                    self.inner
199                        .fd_ready
200                        .lock()
201                        .clear_fd_ready(InterestSlot::Read);
202                }
203                r => break Poll::Ready(r),
204            }
205        }
206    }
207}
208
209/// The write half of a file, via [`PolledPipe::split`].
210pub struct WriteHalf {
211    inner: Arc<SplitInner>,
212}
213
214impl WriteHalf {
215    /// Gets a reference to the inner file.
216    pub fn get(&self) -> &File {
217        &self.inner.file
218    }
219
220    /// Calls nonblocking operation `f` when the file is ready for write.
221    ///
222    /// If `f` returns `Err(err)` with `err.kind() ==
223    /// io::ErrorKind::WouldBlock`, then this re-polls the file for readiness
224    /// and returns `Poll::Pending`.
225    pub fn poll_io<F, R>(&mut self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
226    where
227        F: FnMut(&mut Self) -> io::Result<R>,
228    {
229        loop {
230            std::task::ready!(self.inner.fd_ready.lock().poll_fd_ready(
231                cx,
232                InterestSlot::Write,
233                PollEvents::OUT
234            ));
235            match f(self) {
236                Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
237                    self.inner
238                        .fd_ready
239                        .lock()
240                        .clear_fd_ready(InterestSlot::Write);
241                }
242                r => break Poll::Ready(r),
243            }
244        }
245    }
246}
247
248impl PollReady for ReadHalf {
249    fn poll_ready(&mut self, cx: &mut Context<'_>, events: PollEvents) -> Poll<PollEvents> {
250        self.inner
251            .fd_ready
252            .lock()
253            .poll_fd_ready(cx, InterestSlot::Read, events)
254    }
255}
256
257impl AsyncRead for ReadHalf {
258    fn poll_read(
259        mut self: Pin<&mut Self>,
260        cx: &mut Context<'_>,
261        buf: &mut [u8],
262    ) -> Poll<io::Result<usize>> {
263        self.poll_io(cx, |this| (&this.inner.file).read(buf))
264    }
265
266    fn poll_read_vectored(
267        mut self: Pin<&mut Self>,
268        cx: &mut Context<'_>,
269        bufs: &mut [io::IoSliceMut<'_>],
270    ) -> Poll<io::Result<usize>> {
271        self.poll_io(cx, |this| (&this.inner.file).read_vectored(bufs))
272    }
273}
274
275impl PollReady for WriteHalf {
276    fn poll_ready(&mut self, cx: &mut Context<'_>, events: PollEvents) -> Poll<PollEvents> {
277        self.inner
278            .fd_ready
279            .lock()
280            .poll_fd_ready(cx, InterestSlot::Write, events)
281    }
282}
283
284impl AsyncWrite for WriteHalf {
285    fn poll_write(
286        mut self: Pin<&mut Self>,
287        cx: &mut Context<'_>,
288        buf: &[u8],
289    ) -> Poll<io::Result<usize>> {
290        self.poll_io(cx, |this| (&this.inner.file).write(buf))
291    }
292
293    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
294        self.poll_io(cx, |this| (&this.inner.file).flush())
295    }
296
297    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
298        Poll::Ready(Err(io::ErrorKind::Unsupported.into()))
299    }
300
301    fn poll_write_vectored(
302        mut self: Pin<&mut Self>,
303        cx: &mut Context<'_>,
304        bufs: &[io::IoSlice<'_>],
305    ) -> Poll<io::Result<usize>> {
306        self.poll_io(cx, |this| (&this.inner.file).write_vectored(bufs))
307    }
308}