1use mesh::CancelContext;
8use mesh::rpc::Rpc;
9use mesh::rpc::RpcError;
10use mesh::rpc::RpcSend;
11use pipette_protocol::PipetteRequest;
12use std::time::Duration;
13
14pub(crate) struct PipetteSender(mesh::Sender<PipetteRequest>);
15
16impl PipetteSender {
17 pub(crate) fn new(sender: mesh::Sender<PipetteRequest>) -> Self {
18 Self(sender)
19 }
20
21 pub(crate) async fn call<F, I, R>(&self, f: F, input: I) -> Result<R, RpcError>
25 where
26 F: FnOnce(Rpc<I, R>) -> PipetteRequest,
27 R: 'static + Send,
28 {
29 let result = self.0.call(f, input).await;
30 if result.is_err() {
31 tracing::warn!(
32 "Pipette request channel failed, sleeping for 5 seconds to let outstanding work finish"
33 );
34 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
35 let _ = c.cancelled().await;
36 }
37 result
38 }
39
40 pub(crate) async fn call_failable<F, I, T, E>(&self, f: F, input: I) -> Result<T, RpcError<E>>
44 where
45 F: FnOnce(Rpc<I, Result<T, E>>) -> PipetteRequest,
46 T: 'static + Send,
47 E: 'static + Send,
48 {
49 let result = self.0.call_failable(f, input).await;
50 if result.is_err() {
51 tracing::warn!(
52 "Pipette request channel failed, sleeping for 5 seconds to let outstanding work finish"
53 );
54 let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
55 let _ = c.cancelled().await;
56 }
57 result
58 }
59}