1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
//! Remote Procedure Call functionality.
use super::error::RemoteResult;
use crate::error::RemoteError;
use crate::error::RemoteResultExt;
use crate::error::RpcError;
use crate::oneshot;
use crate::MpscSender;
use crate::OneshotReceiver;
use crate::OneshotSender;
use crate::Sender;
use mesh_node::message::MeshField;
use mesh_protobuf::Protobuf;
use std::future::Future;
use std::pin::Pin;
use std::task::ready;
use std::task::Poll;
/// An RPC message for a request with input of type `I` and output of type `R`.
/// The receiver of the message should process the request and return results
/// via the `Sender<R>`.
#[derive(Debug, Protobuf)]
#[mesh(
bound = "I: MeshField, R: MeshField",
resource = "mesh_node::resource::Resource"
)]
pub struct Rpc<I, R>(pub I, pub OneshotSender<R>);
/// An RPC message with a failable result.
pub type FailableRpc<I, R> = Rpc<I, RemoteResult<R>>;
impl<I, R: 'static + Send> Rpc<I, R> {
/// Handles an RPC request by calling `f` and sending the result to the
/// initiator.
pub fn handle_sync<F>(self, f: F)
where
F: FnOnce(I) -> R,
{
let r = f(self.0);
self.1.send(r);
}
/// Handles an RPC request by calling `f`, awaiting its result, and sending
/// the result to the initiator.
pub async fn handle<F, Fut>(self, f: F)
where
F: FnOnce(I) -> Fut,
Fut: Future<Output = R>,
{
let r = f(self.0).await;
self.1.send(r);
}
/// Handles an RPC request by calling `f`, awaiting its result, and sending
/// Ok results back to the initiator.
///
/// If `f` fails, the error is propagated back to the caller, and the RPC
/// channel is dropped (resulting in a `RecvError::Closed` on the
/// initiator).
pub async fn handle_must_succeed<F, Fut, E>(self, f: F) -> Result<(), E>
where
F: FnOnce(I) -> Fut,
Fut: Future<Output = Result<R, E>>,
{
let r = f(self.0).await?;
self.1.send(r);
Ok(())
}
/// Completes the RPC with the specified result value.
pub fn complete(self, result: R) {
self.1.send(result);
}
}
impl<I, R: 'static + Send> Rpc<I, Result<R, RemoteError>> {
/// Handles an RPC request by calling `f` and sending the result to the
/// initiator, after converting any error to a [`RemoteError`].
pub fn handle_failable_sync<F, E>(self, f: F)
where
F: FnOnce(I) -> Result<R, E>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let r = f(self.0);
self.1.send(r.map_err(RemoteError::new));
}
/// Handles an RPC request by calling `f`, awaiting its result, and sending
/// the result to the initiator, after converting any error to a
/// [`RemoteError`].
pub async fn handle_failable<F, Fut, E>(self, f: F)
where
F: FnOnce(I) -> Fut,
Fut: Future<Output = Result<R, E>>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let r = f(self.0).await;
self.1.send(r.map_err(RemoteError::new));
}
}
/// A trait implemented by objects that can send RPC requests.
pub trait RpcSend {
/// The message type for this sender.
type Message;
/// Send an RPC request.
fn send_rpc(&self, message: Self::Message);
/// Issues a request and returns a channel to receive the result.
///
/// `f` maps an [`Rpc`] object to the message type and is often an enum
/// variant name.
///
/// `input` is the input to the call.
///
/// # Example
///
/// ```rust
/// # use mesh_channel::rpc::{Rpc, RpcSend};
/// # use mesh_channel::Sender;
/// enum Request {
/// Add(Rpc<(u32, u32), u32>),
/// }
/// async fn add(send: &Sender<Request>) {
/// assert_eq!(send.call(Request::Add, (3, 4)).await.unwrap(), 7);
/// }
/// ```
fn call<F, I, R>(&self, f: F, input: I) -> OneshotReceiver<R>
where
F: FnOnce(Rpc<I, R>) -> Self::Message,
R: 'static + Send,
{
let (result_send, result_recv) = oneshot();
self.send_rpc(f(Rpc(input, result_send)));
result_recv
}
/// Issues a request and returns an object to receive the result.
///
/// This is like [`RpcSend::call`], but for RPCs that return a [`Result`].
/// The returned object combines the channel error and the call's error into
/// a single [`RpcError`] type, which makes it easier to handle errors.
fn call_failable<F, I, T, E>(&self, f: F, input: I) -> RpcResultReceiver<Result<T, E>>
where
F: FnOnce(Rpc<I, Result<T, E>>) -> Self::Message,
T: 'static + Send,
E: 'static + Send,
{
RpcResultReceiver(self.call(f, input))
}
}
/// The result future of an [`RpcSend::call_failable`] call.
#[must_use]
pub struct RpcResultReceiver<R>(OneshotReceiver<R>);
impl<T: 'static + Send, E: 'static + Send> Future for RpcResultReceiver<Result<T, E>> {
type Output = Result<T, RpcError<E>>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Poll::Ready(ready!(Pin::new(&mut self.get_mut().0).poll(cx)).flatten())
}
}
impl<T: 'static + Send> RpcSend for Sender<T> {
type Message = T;
fn send_rpc(&self, message: T) {
self.send(message);
}
}
impl<T: 'static + Send> RpcSend for MpscSender<T> {
type Message = T;
fn send_rpc(&self, message: T) {
self.send(message);
}
}
impl<T: RpcSend> RpcSend for &T {
type Message = T::Message;
fn send_rpc(&self, message: T::Message) {
(*self).send_rpc(message);
}
}