1use crate::driver::Driver;
7use crate::driver::PollImpl;
8use crate::fd::PollFdReady;
9use crate::interest::InterestSlot;
10use crate::interest::PollEvents;
11use crate::socket::PollReady;
12use futures::AsyncRead;
13use futures::AsyncWrite;
14use pal::unix::pipe::set_nonblocking;
15use parking_lot::Mutex;
16use std::fs::File;
17use std::io;
18use std::io::Read;
19use std::io::Write;
20use std::os::unix::prelude::*;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Context;
24use std::task::Poll;
25
26pub struct PolledPipe {
28 fd_ready: PollImpl<dyn PollFdReady>,
29 file: File,
30}
31
32impl PolledPipe {
33 pub fn new(driver: &(impl ?Sized + Driver), file: File) -> io::Result<Self> {
35 let fd_ready = driver.new_dyn_fd_ready(file.as_raw_fd())?;
36 set_nonblocking(&file, true)?;
37 Ok(Self { fd_ready, file })
38 }
39
40 pub fn pair(driver: &(impl ?Sized + Driver)) -> io::Result<(Self, Self)> {
42 let (a, b) = Self::file_pair()?;
43 Ok((Self::new(driver, a)?, Self::new(driver, b)?))
44 }
45
46 pub fn file_pair() -> io::Result<(File, File)> {
49 pal::unix::pipe::pair()
50 }
51
52 pub fn into_inner(self) -> File {
54 set_nonblocking(&self.file, false).unwrap();
55 self.file
56 }
57
58 pub fn get(&self) -> &File {
60 &self.file
61 }
62
63 pub fn split(self) -> (ReadHalf, WriteHalf) {
71 let inner = Arc::new(SplitInner {
72 fd_ready: Mutex::new(self.fd_ready),
73 file: self.file,
74 });
75 (
76 ReadHalf {
77 inner: inner.clone(),
78 },
79 WriteHalf { inner },
80 )
81 }
82
83 fn poll_io<F, R>(
84 &mut self,
85 cx: &mut Context<'_>,
86 slot: InterestSlot,
87 events: PollEvents,
88 mut f: F,
89 ) -> Poll<io::Result<R>>
90 where
91 F: FnMut(&mut Self) -> io::Result<R>,
92 {
93 loop {
94 std::task::ready!(self.fd_ready.poll_fd_ready(cx, slot, events));
95 match f(self) {
96 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
97 self.fd_ready.clear_fd_ready(slot);
98 }
99 r => break Poll::Ready(r),
100 }
101 }
102 }
103}
104
105impl PollReady for PolledPipe {
106 fn poll_ready(&mut self, cx: &mut Context<'_>, events: PollEvents) -> Poll<PollEvents> {
107 self.fd_ready.poll_fd_ready(cx, InterestSlot::Read, events)
108 }
109}
110
111impl AsyncRead for PolledPipe {
112 fn poll_read(
113 mut self: Pin<&mut Self>,
114 cx: &mut Context<'_>,
115 buf: &mut [u8],
116 ) -> Poll<io::Result<usize>> {
117 self.poll_io(cx, InterestSlot::Read, PollEvents::IN, |this| {
118 this.file.read(buf)
119 })
120 }
121
122 fn poll_read_vectored(
123 mut self: Pin<&mut Self>,
124 cx: &mut Context<'_>,
125 bufs: &mut [io::IoSliceMut<'_>],
126 ) -> Poll<io::Result<usize>> {
127 self.poll_io(cx, InterestSlot::Read, PollEvents::IN, |this| {
128 this.file.read_vectored(bufs)
129 })
130 }
131}
132
133impl AsyncWrite for PolledPipe {
134 fn poll_write(
135 mut self: Pin<&mut Self>,
136 cx: &mut Context<'_>,
137 buf: &[u8],
138 ) -> Poll<io::Result<usize>> {
139 self.poll_io(cx, InterestSlot::Write, PollEvents::OUT, |this| {
140 this.file.write(buf)
141 })
142 }
143
144 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
145 self.poll_io(cx, InterestSlot::Write, PollEvents::OUT, |this| {
146 this.file.flush()
147 })
148 }
149
150 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
151 Poll::Ready(Err(io::ErrorKind::Unsupported.into()))
152 }
153
154 fn poll_write_vectored(
155 mut self: Pin<&mut Self>,
156 cx: &mut Context<'_>,
157 bufs: &[io::IoSlice<'_>],
158 ) -> Poll<io::Result<usize>> {
159 self.poll_io(cx, InterestSlot::Write, PollEvents::OUT, |this| {
160 this.file.write_vectored(bufs)
161 })
162 }
163}
164
165struct SplitInner {
166 fd_ready: Mutex<PollImpl<dyn PollFdReady>>, file: File,
168}
169
170pub struct ReadHalf {
172 inner: Arc<SplitInner>,
173}
174
175impl ReadHalf {
176 pub fn get(&self) -> &File {
178 &self.inner.file
179 }
180
181 pub fn poll_io<F, R>(&mut self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
187 where
188 F: FnMut(&mut Self) -> io::Result<R>,
189 {
190 loop {
191 std::task::ready!(self.inner.fd_ready.lock().poll_fd_ready(
192 cx,
193 InterestSlot::Read,
194 PollEvents::IN
195 ));
196 match f(self) {
197 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
198 self.inner
199 .fd_ready
200 .lock()
201 .clear_fd_ready(InterestSlot::Read);
202 }
203 r => break Poll::Ready(r),
204 }
205 }
206 }
207}
208
209pub struct WriteHalf {
211 inner: Arc<SplitInner>,
212}
213
214impl WriteHalf {
215 pub fn get(&self) -> &File {
217 &self.inner.file
218 }
219
220 pub fn poll_io<F, R>(&mut self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
226 where
227 F: FnMut(&mut Self) -> io::Result<R>,
228 {
229 loop {
230 std::task::ready!(self.inner.fd_ready.lock().poll_fd_ready(
231 cx,
232 InterestSlot::Write,
233 PollEvents::OUT
234 ));
235 match f(self) {
236 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
237 self.inner
238 .fd_ready
239 .lock()
240 .clear_fd_ready(InterestSlot::Write);
241 }
242 r => break Poll::Ready(r),
243 }
244 }
245 }
246}
247
248impl PollReady for ReadHalf {
249 fn poll_ready(&mut self, cx: &mut Context<'_>, events: PollEvents) -> Poll<PollEvents> {
250 self.inner
251 .fd_ready
252 .lock()
253 .poll_fd_ready(cx, InterestSlot::Read, events)
254 }
255}
256
257impl AsyncRead for ReadHalf {
258 fn poll_read(
259 mut self: Pin<&mut Self>,
260 cx: &mut Context<'_>,
261 buf: &mut [u8],
262 ) -> Poll<io::Result<usize>> {
263 self.poll_io(cx, |this| (&this.inner.file).read(buf))
264 }
265
266 fn poll_read_vectored(
267 mut self: Pin<&mut Self>,
268 cx: &mut Context<'_>,
269 bufs: &mut [io::IoSliceMut<'_>],
270 ) -> Poll<io::Result<usize>> {
271 self.poll_io(cx, |this| (&this.inner.file).read_vectored(bufs))
272 }
273}
274
275impl PollReady for WriteHalf {
276 fn poll_ready(&mut self, cx: &mut Context<'_>, events: PollEvents) -> Poll<PollEvents> {
277 self.inner
278 .fd_ready
279 .lock()
280 .poll_fd_ready(cx, InterestSlot::Write, events)
281 }
282}
283
284impl AsyncWrite for WriteHalf {
285 fn poll_write(
286 mut self: Pin<&mut Self>,
287 cx: &mut Context<'_>,
288 buf: &[u8],
289 ) -> Poll<io::Result<usize>> {
290 self.poll_io(cx, |this| (&this.inner.file).write(buf))
291 }
292
293 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
294 self.poll_io(cx, |this| (&this.inner.file).flush())
295 }
296
297 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
298 Poll::Ready(Err(io::ErrorKind::Unsupported.into()))
299 }
300
301 fn poll_write_vectored(
302 mut self: Pin<&mut Self>,
303 cx: &mut Context<'_>,
304 bufs: &[io::IoSlice<'_>],
305 ) -> Poll<io::Result<usize>> {
306 self.poll_io(cx, |this| (&this.inner.file).write_vectored(bufs))
307 }
308}