1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! Remote Procedure Call functionality.

use super::error::RemoteResult;
use crate::error::RemoteError;
use crate::error::RemoteResultExt;
use crate::error::RpcError;
use crate::oneshot;
use crate::MpscSender;
use crate::OneshotReceiver;
use crate::OneshotSender;
use crate::Sender;
use mesh_node::message::MeshField;
use mesh_protobuf::Protobuf;
use std::future::Future;
use std::pin::Pin;
use std::task::ready;
use std::task::Poll;

/// An RPC message for a request with input of type `I` and output of type `R`.
/// The receiver of the message should process the request and return results
/// via the `Sender<R>`.
#[derive(Debug, Protobuf)]
#[mesh(
    bound = "I: MeshField, R: MeshField",
    resource = "mesh_node::resource::Resource"
)]
pub struct Rpc<I, R>(pub I, pub OneshotSender<R>);

/// An RPC message with a failable result.
pub type FailableRpc<I, R> = Rpc<I, RemoteResult<R>>;

impl<I, R: 'static + Send> Rpc<I, R> {
    /// Handles an RPC request by calling `f` and sending the result to the
    /// initiator.
    pub fn handle_sync<F>(self, f: F)
    where
        F: FnOnce(I) -> R,
    {
        let r = f(self.0);
        self.1.send(r);
    }

    /// Handles an RPC request by calling `f`, awaiting its result, and sending
    /// the result to the initiator.
    pub async fn handle<F, Fut>(self, f: F)
    where
        F: FnOnce(I) -> Fut,
        Fut: Future<Output = R>,
    {
        let r = f(self.0).await;
        self.1.send(r);
    }

    /// Handles an RPC request by calling `f`, awaiting its result, and sending
    /// Ok results back to the initiator.
    ///
    /// If `f` fails, the error is propagated back to the caller, and the RPC
    /// channel is dropped (resulting in a `RecvError::Closed` on the
    /// initiator).
    pub async fn handle_must_succeed<F, Fut, E>(self, f: F) -> Result<(), E>
    where
        F: FnOnce(I) -> Fut,
        Fut: Future<Output = Result<R, E>>,
    {
        let r = f(self.0).await?;
        self.1.send(r);
        Ok(())
    }

    /// Completes the RPC with the specified result value.
    pub fn complete(self, result: R) {
        self.1.send(result);
    }
}

impl<I, R: 'static + Send> Rpc<I, Result<R, RemoteError>> {
    /// Handles an RPC request by calling `f` and sending the result to the
    /// initiator, after converting any error to a [`RemoteError`].
    pub fn handle_failable_sync<F, E>(self, f: F)
    where
        F: FnOnce(I) -> Result<R, E>,
        E: Into<Box<dyn std::error::Error + Send + Sync>>,
    {
        let r = f(self.0);
        self.1.send(r.map_err(RemoteError::new));
    }

    /// Handles an RPC request by calling `f`, awaiting its result, and sending
    /// the result to the initiator, after converting any error to a
    /// [`RemoteError`].
    pub async fn handle_failable<F, Fut, E>(self, f: F)
    where
        F: FnOnce(I) -> Fut,
        Fut: Future<Output = Result<R, E>>,
        E: Into<Box<dyn std::error::Error + Send + Sync>>,
    {
        let r = f(self.0).await;
        self.1.send(r.map_err(RemoteError::new));
    }
}

/// A trait implemented by objects that can send RPC requests.
pub trait RpcSend {
    /// The message type for this sender.
    type Message;

    /// Send an RPC request.
    fn send_rpc(&self, message: Self::Message);

    /// Issues a request and returns a channel to receive the result.
    ///
    /// `f` maps an [`Rpc`] object to the message type and is often an enum
    /// variant name.
    ///
    /// `input` is the input to the call.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use mesh_channel::rpc::{Rpc, RpcSend};
    /// # use mesh_channel::Sender;
    /// enum Request {
    ///     Add(Rpc<(u32, u32), u32>),
    /// }
    /// async fn add(send: &Sender<Request>) {
    ///     assert_eq!(send.call(Request::Add, (3, 4)).await.unwrap(), 7);
    /// }
    /// ```
    fn call<F, I, R>(&self, f: F, input: I) -> OneshotReceiver<R>
    where
        F: FnOnce(Rpc<I, R>) -> Self::Message,
        R: 'static + Send,
    {
        let (result_send, result_recv) = oneshot();
        self.send_rpc(f(Rpc(input, result_send)));
        result_recv
    }

    /// Issues a request and returns an object to receive the result.
    ///
    /// This is like [`RpcSend::call`], but for RPCs that return a [`Result`].
    /// The returned object combines the channel error and the call's error into
    /// a single [`RpcError`] type, which makes it easier to handle errors.
    fn call_failable<F, I, T, E>(&self, f: F, input: I) -> RpcResultReceiver<Result<T, E>>
    where
        F: FnOnce(Rpc<I, Result<T, E>>) -> Self::Message,
        T: 'static + Send,
        E: 'static + Send,
    {
        RpcResultReceiver(self.call(f, input))
    }
}

/// The result future of an [`RpcSend::call_failable`] call.
#[must_use]
pub struct RpcResultReceiver<R>(OneshotReceiver<R>);

impl<T: 'static + Send, E: 'static + Send> Future for RpcResultReceiver<Result<T, E>> {
    type Output = Result<T, RpcError<E>>;

    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        Poll::Ready(ready!(Pin::new(&mut self.get_mut().0).poll(cx)).flatten())
    }
}

impl<T: 'static + Send> RpcSend for Sender<T> {
    type Message = T;
    fn send_rpc(&self, message: T) {
        self.send(message);
    }
}

impl<T: 'static + Send> RpcSend for MpscSender<T> {
    type Message = T;
    fn send_rpc(&self, message: T) {
        self.send(message);
    }
}

impl<T: RpcSend> RpcSend for &T {
    type Message = T::Message;
    fn send_rpc(&self, message: T::Message) {
        (*self).send_rpc(message);
    }
}