flowey_cli/cli/
exec_snippet.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::FlowBackendCli;
5use anyhow::Context;
6use flowey_core::node::FlowArch;
7use flowey_core::node::FlowBackend;
8use flowey_core::node::FlowPlatform;
9use flowey_core::node::GhOutput;
10use flowey_core::node::GhToRust;
11use flowey_core::node::NodeHandle;
12use flowey_core::node::RustToGh;
13use flowey_core::node::steps::rust::RustRuntimeServices;
14use flowey_core::node::user_facing::ClaimedGhParam;
15use flowey_core::node::user_facing::GhPermission;
16use flowey_core::node::user_facing::GhPermissionValue;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use serde::Deserialize;
20use serde::Serialize;
21use std::collections::BTreeMap;
22use std::path::PathBuf;
23
24pub fn construct_exec_snippet_cli(
25    flowey_bin: &str,
26    node_modpath: &str,
27    snippet_idx: usize,
28    job_idx: usize,
29) -> String {
30    format!(r#"{flowey_bin} e {job_idx} {node_modpath} {snippet_idx}"#)
31}
32
33/// (internal) execute an inline code snippet from the given node.
34#[derive(clap::Args)]
35pub struct ExecSnippet {
36    /// Job idx to query `pipeline_static_db` with
37    pub(crate) job_idx: usize,
38
39    node_modpath_and_snippet_idx: Vec<String>,
40
41    /// (debug) If true, the snippet will not actually be run
42    #[clap(long)]
43    dry_run: bool,
44}
45
46pub const VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR: &str = "_internal_WORKING_DIR";
47pub const VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR: &str = "_internal_PERSISTENT_STORAGE_DIR";
48
49impl ExecSnippet {
50    pub fn run(self) -> anyhow::Result<()> {
51        let Self {
52            node_modpath_and_snippet_idx,
53            job_idx,
54            dry_run,
55        } = self;
56
57        let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
58        let flow_arch = FlowArch::host(PipelineBackendHint::Local);
59
60        let mut runtime_var_db = super::var_db::open_var_db(job_idx)?;
61
62        let working_dir: PathBuf = {
63            let Some((working_dir, _)) =
64                runtime_var_db.try_get_var(VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR)
65            else {
66                anyhow::bail!("var db was not seeded with {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR}");
67            };
68            serde_json::from_slice::<String>(&working_dir)
69                .context(format!(
70                    "found {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR} in db, but it wasn't a json string!"
71                ))?
72                .into()
73        };
74
75        let FloweyPipelineStaticDb {
76            flow_backend,
77            var_db_backend_kind: _,
78            job_reqs,
79        } = {
80            let current_exe = std::env::current_exe()
81                .context("failed to get path to current flowey executable")?;
82            let pipeline_static_db =
83                fs_err::File::open(current_exe.with_file_name("pipeline.json"))?;
84            serde_json::from_reader(pipeline_static_db)?
85        };
86
87        for [node_modpath, snippet_idx] in node_modpath_and_snippet_idx
88            .chunks_exact(2)
89            .map(|x| -> [String; 2] { x.to_vec().try_into().unwrap() })
90        {
91            let snippet_idx = snippet_idx.parse::<usize>().unwrap();
92
93            let raw_json_reqs: Vec<Box<[u8]>> = job_reqs
94                .get(&job_idx)
95                .context("invalid job_idx")?
96                .get(&node_modpath)
97                .context("pipeline db did not include data for specified node")?
98                .iter()
99                .map(|v| v.0.clone())
100                .collect::<Vec<_>>();
101
102            let Some(node_handle) = NodeHandle::try_from_modpath(&node_modpath) else {
103                anyhow::bail!("could not find node with that name")
104            };
105
106            let mut node = node_handle.new_erased_node();
107
108            // each snippet gets its own isolated working dir
109            {
110                let snippet_working_dir = working_dir.join(format!(
111                    "{}_{}",
112                    node_handle.modpath().replace("::", "__"),
113                    snippet_idx
114                ));
115                if !snippet_working_dir.exists() {
116                    fs_err::create_dir_all(&snippet_working_dir)?;
117                }
118                log::trace!(
119                    "Setting current working directory from {:?} to {:?}",
120                    std::env::current_dir()?,
121                    snippet_working_dir
122                );
123                std::env::set_current_dir(snippet_working_dir)?;
124            }
125
126            // not all backends support a persistent storage dir, therefore it is optional
127            let persistent_storage_dir_var = runtime_var_db
128                .try_get_var(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR)
129                .is_some()
130                .then_some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.to_owned());
131
132            let mut rust_runtime_services =
133                flowey_core::node::steps::rust::new_rust_runtime_services(
134                    &mut runtime_var_db,
135                    flow_backend.into(),
136                    flow_platform,
137                    flow_arch,
138                );
139
140            let mut ctx_backend = ExecSnippetCtx::new(
141                flow_backend.into(),
142                flow_platform,
143                flow_arch,
144                node_handle,
145                snippet_idx,
146                dry_run,
147                persistent_storage_dir_var,
148                &mut rust_runtime_services,
149            );
150
151            let mut ctx = flowey_core::node::new_node_ctx(&mut ctx_backend);
152            node.emit(raw_json_reqs.clone(), &mut ctx)?;
153
154            match ctx_backend.into_result() {
155                Some(res) => res?,
156                None => {
157                    if dry_run {
158                        // all good, expected
159                    } else {
160                        anyhow::bail!("snippet wasn't run (invalid index)")
161                    }
162                }
163            }
164        }
165
166        Ok(())
167    }
168}
169
170pub struct ExecSnippetCtx<'a, 'b> {
171    flow_backend: FlowBackend,
172    flow_platform: FlowPlatform,
173    flow_arch: FlowArch,
174    node_handle: NodeHandle,
175    rust_runtime_services: &'a mut RustRuntimeServices<'b>,
176    idx_tracker: usize,
177    var_tracker: usize,
178    target_idx: usize,
179    dry_run: bool,
180    persistent_storage_dir_var: Option<String>,
181    result: Option<anyhow::Result<()>>,
182}
183
184impl<'a, 'b> ExecSnippetCtx<'a, 'b> {
185    pub fn new(
186        flow_backend: FlowBackend,
187        flow_platform: FlowPlatform,
188        flow_arch: FlowArch,
189        node_handle: NodeHandle,
190        target_idx: usize,
191        dry_run: bool,
192        persistent_storage_dir_var: Option<String>,
193        rust_runtime_services: &'a mut RustRuntimeServices<'b>,
194    ) -> Self {
195        Self {
196            flow_backend,
197            flow_platform,
198            flow_arch,
199            node_handle,
200            rust_runtime_services,
201            var_tracker: 0,
202            idx_tracker: 0,
203            target_idx,
204            dry_run,
205            persistent_storage_dir_var,
206            result: None,
207        }
208    }
209
210    pub fn into_result(self) -> Option<anyhow::Result<()>> {
211        self.result
212    }
213}
214
215impl flowey_core::node::NodeCtxBackend for ExecSnippetCtx<'_, '_> {
216    fn on_request(&mut self, _node_handle: NodeHandle, _req: anyhow::Result<Box<[u8]>>) {
217        // nothing to do - filing requests only matters pre-exec
218    }
219
220    fn on_new_var(&mut self) -> String {
221        let v = self.var_tracker;
222        self.var_tracker += 1;
223        format!("{}:{}", self.node_handle.modpath(), v)
224    }
225
226    fn on_claimed_runtime_var(&mut self, _var: &str, _is_read: bool) {
227        // nothing to do - variable claims only matter pre-exec
228    }
229
230    fn on_emit_rust_step(
231        &mut self,
232        label: &str,
233        _can_merge: bool,
234        code: Box<
235            dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
236        >,
237    ) {
238        if self.idx_tracker == self.target_idx {
239            let label = if !label.is_empty() {
240                label
241            } else {
242                "<unnamed snippet>"
243            };
244
245            self.result = Some(run_code(
246                self.flow_backend,
247                format!("{} ({})", label, self.node_handle.modpath()),
248                self.dry_run,
249                || code(self.rust_runtime_services),
250            ));
251        }
252        self.idx_tracker += 1;
253    }
254
255    fn on_emit_ado_step(
256        &mut self,
257        label: &str,
258        _yaml_snippet: Box<
259            dyn for<'a> FnOnce(
260                &'a mut flowey_core::node::user_facing::AdoStepServices<'_>,
261            ) -> String,
262        >,
263        code: Option<
264            Box<
265                dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
266            >,
267        >,
268        _condvar: Option<String>,
269    ) {
270        // don't need to care about condvar, since we wouldn't have been called
271        // if the YAML resolved the condvar to false.
272        if self.idx_tracker == self.target_idx {
273            if let Some(code) = code {
274                self.result = Some(run_code(
275                    self.flow_backend,
276                    format!(
277                        "(inline snippet) {} ({})",
278                        label,
279                        self.node_handle.modpath()
280                    ),
281                    self.dry_run,
282                    || code(self.rust_runtime_services),
283                ));
284            }
285        }
286
287        self.idx_tracker += 1;
288    }
289
290    fn on_emit_gh_step(
291        &mut self,
292        _label: &str,
293        _uses: &str,
294        _with: BTreeMap<String, ClaimedGhParam>,
295        _condvar: Option<String>,
296        _outputs: BTreeMap<String, Vec<GhOutput>>,
297        _permissions: BTreeMap<GhPermission, GhPermissionValue>,
298        _gh_to_rust: Vec<GhToRust>,
299        _rust_to_gh: Vec<RustToGh>,
300    ) {
301        self.idx_tracker += 1;
302    }
303
304    fn on_emit_side_effect_step(&mut self) {
305        // not executable, we simply skip
306    }
307
308    fn backend(&mut self) -> FlowBackend {
309        self.flow_backend
310    }
311
312    fn platform(&mut self) -> FlowPlatform {
313        self.flow_platform
314    }
315
316    fn arch(&mut self) -> FlowArch {
317        self.flow_arch
318    }
319
320    fn current_node(&self) -> NodeHandle {
321        self.node_handle
322    }
323
324    fn persistent_dir_path_var(&mut self) -> Option<String> {
325        self.persistent_storage_dir_var.clone()
326    }
327
328    fn on_unused_read_var(&mut self, _var: &str) {
329        // not relevant at runtime
330    }
331}
332
333#[derive(Serialize, Deserialize)]
334pub(crate) enum VarDbBackendKind {
335    Json,
336}
337
338#[derive(Serialize, Deserialize)]
339pub(crate) struct FloweyPipelineStaticDb {
340    pub flow_backend: FlowBackendCli,
341    pub var_db_backend_kind: VarDbBackendKind,
342    pub job_reqs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
343}
344
345// encode requests as JSON stored in a JSON string (to make human inspection
346// easier).
347#[derive(Serialize, Deserialize)]
348#[serde(transparent)]
349pub(crate) struct SerializedRequest(#[serde(with = "serialized_request")] pub Box<[u8]>);
350
351pub(crate) mod serialized_request {
352    use serde::Deserialize;
353    use serde::Deserializer;
354    use serde::Serializer;
355
356    #[expect(clippy::borrowed_box, reason = "required by serde")]
357    pub fn serialize<S: Serializer>(v: &Box<[u8]>, ser: S) -> Result<S::Ok, S::Error> {
358        ser.serialize_str(
359            &serde_json::to_string(&serde_json::from_slice::<serde_json::Value>(v).unwrap())
360                .unwrap(),
361        )
362    }
363
364    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Box<[u8]>, D::Error> {
365        let s: String = Deserialize::deserialize(d)?;
366        Ok(
367            serde_json::to_vec(&serde_json::from_str::<serde_json::Value>(&s).unwrap())
368                .unwrap()
369                .into(),
370        )
371    }
372}
373
374fn run_code(
375    flow_backend: FlowBackend,
376    label: impl std::fmt::Display,
377    dry_run: bool,
378    code: impl FnOnce() -> anyhow::Result<()>,
379) -> anyhow::Result<()> {
380    if matches!(flow_backend, FlowBackend::Ado) {
381        println!("##[group]=== {} ===", label)
382    } else {
383        // green color
384        log::info!("\x1B[0;32m=== {} ===\x1B[0m", label);
385    }
386
387    let result = if dry_run {
388        log::info!("...but not actually, because of --dry-run");
389        Ok(())
390    } else {
391        code()
392    };
393
394    // green color
395    log::info!("\x1B[0;32m=== done! ===\x1B[0m");
396
397    if matches!(flow_backend, FlowBackend::Ado) {
398        println!("##[endgroup]")
399    } else {
400        log::info!(""); // log a newline, for the pretty
401    }
402
403    result
404}