vmbus_async/
async_dgram.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Traits for async send and receive of datagrams.
5//!
6//! Datagrams are self-contained messages that are not split or combined when
7//! sent or received and instead always maintain their original message
8//! boundaries.
9//!
10//! This is different from bytes sent or received over a byte stream (as in
11//! [`futures::AsyncRead`]), where one send can be split into multiple receives,
12//! or multiple sends can be combined into one receive.
13
14use std::future::Future;
15use std::io;
16use std::io::IoSlice;
17use std::io::IoSliceMut;
18use std::pin::Pin;
19use std::task::Context;
20use std::task::Poll;
21use std::task::ready;
22use thiserror::Error;
23
24/// Trait implemented by types that can receive datagrams.
25///
26/// This is different from [`futures::AsyncRead`], which is used for byte
27/// streams.
28pub trait AsyncRecv {
29    /// Polls for an incoming datagram, which will be gathered into `bufs`.
30    ///
31    /// At most one datagram will be received per call.
32    fn poll_recv(
33        &mut self,
34        cx: &mut Context<'_>,
35        bufs: &mut [IoSliceMut<'_>],
36    ) -> Poll<io::Result<usize>>;
37}
38
39impl<T: AsyncRecv + ?Sized> AsyncRecv for &mut T {
40    fn poll_recv(
41        &mut self,
42        cx: &mut Context<'_>,
43        bufs: &mut [IoSliceMut<'_>],
44    ) -> Poll<io::Result<usize>> {
45        (*self).poll_recv(cx, bufs)
46    }
47}
48
49/// Extension trait for [`AsyncRecv`].
50pub trait AsyncRecvExt: AsyncRecv {
51    /// Receive a datagram into `buf`.
52    fn recv<'a>(&'a mut self, buf: &'a mut [u8]) -> Recv<'a, Self> {
53        Recv { recv: self, buf }
54    }
55
56    /// Receive a datagram into `buf`, failing if its size is not exactly the
57    /// size of `buf`.
58    fn recv_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> RecvExact<'a, Self> {
59        RecvExact { recv: self, buf }
60    }
61
62    /// Read a single datagram into `bufs`.
63    ///
64    /// Slice will be written in order, with the next one used only after the
65    /// previous one is completely filled.
66    fn recv_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> RecvVectored<'a, Self> {
67        RecvVectored { recv: self, bufs }
68    }
69}
70
71impl<T: AsyncRecv + ?Sized> AsyncRecvExt for T {}
72
73/// A future for [`AsyncRecvExt::recv`].
74pub struct Recv<'a, T: ?Sized> {
75    recv: &'a mut T,
76    buf: &'a mut [u8],
77}
78
79impl<T: AsyncRecv + ?Sized> Future for Recv<'_, T> {
80    type Output = io::Result<usize>;
81
82    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83        let this = self.get_mut();
84        this.recv.poll_recv(cx, &mut [IoSliceMut::new(this.buf)])
85    }
86}
87
88/// A future for [`AsyncRecvExt::recv_exact`].
89pub struct RecvExact<'a, T: ?Sized> {
90    recv: &'a mut T,
91    buf: &'a mut [u8],
92}
93
94#[derive(Debug, Error)]
95#[error("message too small")]
96struct MessageTooSmall;
97
98impl<T: AsyncRecv + ?Sized> Future for RecvExact<'_, T> {
99    type Output = io::Result<()>;
100
101    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
102        let this = self.get_mut();
103        let n = ready!(this.recv.poll_recv(cx, &mut [IoSliceMut::new(this.buf)]))?;
104        if n != this.buf.len() {
105            Err(io::Error::new(io::ErrorKind::InvalidData, MessageTooSmall))?;
106        }
107        Poll::Ready(Ok(()))
108    }
109}
110
111/// A future for [`AsyncRecvExt::recv_vectored`].
112pub struct RecvVectored<'a, T: ?Sized> {
113    recv: &'a mut T,
114    bufs: &'a mut [IoSliceMut<'a>],
115}
116
117impl<T: AsyncRecv + ?Sized> Future for RecvVectored<'_, T> {
118    type Output = io::Result<usize>;
119
120    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
121        let this = self.get_mut();
122        this.recv.poll_recv(cx, this.bufs)
123    }
124}
125
126/// A trait implemented by types that can send datagrams.
127pub trait AsyncSend {
128    /// Polls to send a datagram given by `bufs`.
129    ///
130    /// There are no partial sends--either the datagram is sent or it is not.
131    fn poll_send(&mut self, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) -> Poll<io::Result<()>>;
132}
133
134impl<T: AsyncSend + ?Sized> AsyncSend for &mut T {
135    fn poll_send(&mut self, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) -> Poll<io::Result<()>> {
136        (*self).poll_send(cx, bufs)
137    }
138}
139
140/// Extension trait for [`AsyncSend`].
141pub trait AsyncSendExt: AsyncSend {
142    /// Sends the datagram in `buf`.
143    fn send<'a>(&'a mut self, buf: &'a [u8]) -> Send<'a, Self> {
144        Send { send: self, buf }
145    }
146
147    /// Sends the datagram in `bufs`.
148    fn send_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> SendVectored<'a, Self> {
149        SendVectored { send: self, bufs }
150    }
151}
152
153impl<T: AsyncSend + ?Sized> AsyncSendExt for T {}
154
155/// A future for [`AsyncSendExt::send`].
156pub struct Send<'a, T: ?Sized> {
157    send: &'a mut T,
158    buf: &'a [u8],
159}
160
161impl<T: AsyncSend + ?Sized> Future for Send<'_, T> {
162    type Output = io::Result<()>;
163
164    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
165        let this = self.get_mut();
166        this.send.poll_send(cx, &[IoSlice::new(this.buf)])
167    }
168}
169
170/// A future for [`AsyncSendExt::send_vectored`].
171pub struct SendVectored<'a, T: ?Sized> {
172    send: &'a mut T,
173    bufs: &'a [IoSlice<'a>],
174}
175
176impl<T: AsyncSend + ?Sized> Future for SendVectored<'_, T> {
177    type Output = io::Result<()>;
178
179    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
180        let this = self.get_mut();
181        this.send.poll_send(cx, this.bufs)
182    }
183}