use mesh::CancelContext;
use mesh::rpc::Rpc;
use mesh::rpc::RpcError;
use mesh::rpc::RpcSend;
use pipette_protocol::PipetteRequest;
use std::time::Duration;
pub(crate) struct PipetteSender(mesh::Sender<PipetteRequest>);
impl PipetteSender {
pub(crate) fn new(sender: mesh::Sender<PipetteRequest>) -> Self {
Self(sender)
}
pub(crate) async fn call<F, I, R>(&self, f: F, input: I) -> Result<R, RpcError>
where
F: FnOnce(Rpc<I, R>) -> PipetteRequest,
R: 'static + Send,
{
let result = self.0.call(f, input).await;
if result.is_err() {
tracing::warn!(
"Pipette request channel failed, sleeping for 5 seconds to let outstanding work finish"
);
let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
let _ = c.cancelled().await;
}
result
}
pub(crate) async fn call_failable<F, I, T, E>(&self, f: F, input: I) -> Result<T, RpcError<E>>
where
F: FnOnce(Rpc<I, Result<T, E>>) -> PipetteRequest,
T: 'static + Send,
E: 'static + Send,
{
let result = self.0.call_failable(f, input).await;
if result.is_err() {
tracing::warn!(
"Pipette request channel failed, sleeping for 5 seconds to let outstanding work finish"
);
let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
let _ = c.cancelled().await;
}
result
}
}