pipette_client/
send.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! A thin wrapper around a `mesh::Sender<PipetteRequest>` that provides
//! useful error handling semantics.

use mesh::CancelContext;
use mesh::rpc::Rpc;
use mesh::rpc::RpcError;
use mesh::rpc::RpcSend;
use pipette_protocol::PipetteRequest;
use std::time::Duration;

pub(crate) struct PipetteSender(mesh::Sender<PipetteRequest>);

impl PipetteSender {
    pub(crate) fn new(sender: mesh::Sender<PipetteRequest>) -> Self {
        Self(sender)
    }

    /// A wrapper around [`mesh::Sender::call`] that will sleep for 5 seconds on failure,
    /// allowing any additional work occurring on the system to hopefully complete.
    /// See also [`petri::PetriVmOpenVmm::wait_for_halt_or`]
    pub(crate) async fn call<F, I, R>(&self, f: F, input: I) -> Result<R, RpcError>
    where
        F: FnOnce(Rpc<I, R>) -> PipetteRequest,
        R: 'static + Send,
    {
        let result = self.0.call(f, input).await;
        if result.is_err() {
            tracing::warn!(
                "Pipette request channel failed, sleeping for 5 seconds to let outstanding work finish"
            );
            let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
            let _ = c.cancelled().await;
        }
        result
    }

    /// A wrapper around [`mesh::Sender::call_failable`] that will sleep for 5 seconds on failure,
    /// allowing any additional work occurring on the system to hopefully complete.
    /// See also [`petri::PetriVmOpenVmm::wait_for_halt_or`]
    pub(crate) async fn call_failable<F, I, T, E>(&self, f: F, input: I) -> Result<T, RpcError<E>>
    where
        F: FnOnce(Rpc<I, Result<T, E>>) -> PipetteRequest,
        T: 'static + Send,
        E: 'static + Send,
    {
        let result = self.0.call_failable(f, input).await;
        if result.is_err() {
            tracing::warn!(
                "Pipette request channel failed, sleeping for 5 seconds to let outstanding work finish"
            );
            let mut c = CancelContext::new().with_timeout(Duration::from_secs(5));
            let _ = c.cancelled().await;
        }
        result
    }
}