pipette_client/
send.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! A thin wrapper around a `mesh::Sender<PipetteRequest>` that provides
5//! useful error handling semantics.
6
7use mesh::CancelContext;
8use mesh::rpc::Rpc;
9use mesh::rpc::RpcError;
10use mesh::rpc::RpcSend;
11use pipette_protocol::PipetteRequest;
12use std::time::Duration;
13
14pub(crate) struct PipetteSender(mesh::Sender<PipetteRequest>);
15
16impl PipetteSender {
17    pub(crate) fn new(sender: mesh::Sender<PipetteRequest>) -> Self {
18        Self(sender)
19    }
20
21    /// A wrapper around [`mesh::Sender::call`] that will sleep for 5 seconds on failure,
22    /// allowing any additional work occurring on the system to hopefully complete.
23    /// See also [`petri::PetriVmOpenVmm::wait_for_halt_or`]
24    pub(crate) async fn call<F, I, R>(&self, f: F, input: I) -> Result<R, RpcError>
25    where
26        F: FnOnce(Rpc<I, R>) -> PipetteRequest,
27        R: 'static + Send,
28    {
29        let result = self.0.call(f, input).await;
30        if result.is_err() {
31            tracing::warn!(
32                "Pipette request channel failed, sleeping for 5 seconds to let outstanding work finish"
33            );
34            let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
35            let _ = c.cancelled().await;
36        }
37        result
38    }
39
40    /// A wrapper around [`mesh::Sender::call_failable`] that will sleep for 5 seconds on failure,
41    /// allowing any additional work occurring on the system to hopefully complete.
42    /// See also [`petri::PetriVmOpenVmm::wait_for_halt_or`]
43    pub(crate) async fn call_failable<F, I, T, E>(&self, f: F, input: I) -> Result<T, RpcError<E>>
44    where
45        F: FnOnce(Rpc<I, Result<T, E>>) -> PipetteRequest,
46        T: 'static + Send,
47        E: 'static + Send,
48    {
49        let result = self.0.call_failable(f, input).await;
50        if result.is_err() {
51            tracing::warn!(
52                "Pipette request channel failed, sleeping for 5 seconds to let outstanding work finish"
53            );
54            let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
55            let _ = c.cancelled().await;
56        }
57        result
58    }
59}