vmgs_broker/
broker.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use mesh_channel::Receiver;
use mesh_channel::rpc::Rpc;
use vmgs::Vmgs;
use vmgs::VmgsFileInfo;
use vmgs_format::FileId;

pub enum VmgsBrokerRpc {
    Inspect(inspect::Deferred),
    GetFileInfo(Rpc<FileId, Result<VmgsFileInfo, vmgs::Error>>),
    ReadFile(Rpc<FileId, Result<Vec<u8>, vmgs::Error>>),
    WriteFile(Rpc<(FileId, Vec<u8>), Result<(), vmgs::Error>>),
    #[cfg(with_encryption)]
    WriteFileEncrypted(Rpc<(FileId, Vec<u8>), Result<(), vmgs::Error>>),
    Save(Rpc<(), vmgs::save_restore::state::SavedVmgsState>),
}

pub struct VmgsBrokerTask {
    vmgs: Vmgs,
}

impl VmgsBrokerTask {
    /// Initialize the data store with the underlying block storage interface.
    pub fn new(vmgs: Vmgs) -> VmgsBrokerTask {
        VmgsBrokerTask { vmgs }
    }

    pub async fn run(&mut self, mut recv: Receiver<VmgsBrokerRpc>) {
        loop {
            match recv.recv().await {
                Ok(message) => self.process_message(message).await,
                Err(_) => return, // all mpsc senders went away
            }
        }
    }

    async fn process_message(&mut self, message: VmgsBrokerRpc) {
        match message {
            VmgsBrokerRpc::Inspect(req) => {
                req.inspect(&self.vmgs);
            }
            VmgsBrokerRpc::GetFileInfo(rpc) => {
                rpc.handle_sync(|file_id| self.vmgs.get_file_info(file_id))
            }
            VmgsBrokerRpc::ReadFile(rpc) => {
                rpc.handle(async |file_id| self.vmgs.read_file(file_id).await)
                    .await
            }
            VmgsBrokerRpc::WriteFile(rpc) => {
                rpc.handle(async |(file_id, buf)| self.vmgs.write_file(file_id, &buf).await)
                    .await
            }
            #[cfg(with_encryption)]
            VmgsBrokerRpc::WriteFileEncrypted(rpc) => {
                rpc.handle(async |(file_id, buf)| {
                    self.vmgs.write_file_encrypted(file_id, &buf).await
                })
                .await
            }
            VmgsBrokerRpc::Save(rpc) => rpc.handle_sync(|()| self.vmgs.save()),
        }
    }
}