flowey_cli/cli/
exec_snippet.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::FlowBackendCli;
5use anyhow::Context;
6use flowey_core::node::FlowArch;
7use flowey_core::node::FlowBackend;
8use flowey_core::node::FlowPlatform;
9use flowey_core::node::GhOutput;
10use flowey_core::node::GhToRust;
11use flowey_core::node::NodeHandle;
12use flowey_core::node::RustToGh;
13use flowey_core::node::steps::rust::RustRuntimeServices;
14use flowey_core::node::user_facing::ClaimedGhParam;
15use flowey_core::node::user_facing::GhPermission;
16use flowey_core::node::user_facing::GhPermissionValue;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use serde::Deserialize;
20use serde::Serialize;
21use std::collections::BTreeMap;
22use std::path::PathBuf;
23
24pub fn construct_exec_snippet_cli(
25    flowey_bin: &str,
26    node_modpath: &str,
27    snippet_idx: usize,
28    job_idx: usize,
29) -> String {
30    format!(r#"{flowey_bin} e {job_idx} {node_modpath} {snippet_idx}"#)
31}
32
33/// (internal) execute an inline code snippet from the given node.
34#[derive(clap::Args)]
35pub struct ExecSnippet {
36    /// Job idx to query `pipeline_static_db` with
37    pub(crate) job_idx: usize,
38
39    node_modpath_and_snippet_idx: Vec<String>,
40
41    /// (debug) If true, the snippet will not actually be run
42    #[clap(long)]
43    dry_run: bool,
44}
45
46pub const VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR: &str = "_internal_WORKING_DIR";
47pub const VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR: &str = "_internal_PERSISTENT_STORAGE_DIR";
48
49impl ExecSnippet {
50    pub fn run(self) -> anyhow::Result<()> {
51        let Self {
52            node_modpath_and_snippet_idx,
53            job_idx,
54            dry_run,
55        } = self;
56
57        let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
58        let flow_arch = FlowArch::host(PipelineBackendHint::Local);
59
60        let mut runtime_var_db = super::var_db::open_var_db(job_idx)?;
61
62        let working_dir: PathBuf = {
63            let Some((working_dir, _)) =
64                runtime_var_db.try_get_var(VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR)
65            else {
66                anyhow::bail!("var db was not seeded with {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR}");
67            };
68            serde_json::from_slice::<String>(&working_dir)
69                .context(format!(
70                    "found {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR} in db, but it wasn't a json string!"
71                ))?
72                .into()
73        };
74
75        let FloweyPipelineStaticDb {
76            flow_backend,
77            var_db_backend_kind: _,
78            job_reqs,
79        } = {
80            let current_exe = std::env::current_exe()
81                .context("failed to get path to current flowey executable")?;
82            let pipeline_static_db =
83                fs_err::File::open(current_exe.with_file_name("pipeline.json"))?;
84            serde_json::from_reader(pipeline_static_db)?
85        };
86
87        for [node_modpath, snippet_idx] in node_modpath_and_snippet_idx
88            .chunks_exact(2)
89            .map(|x| -> [String; 2] { x.to_vec().try_into().unwrap() })
90        {
91            let snippet_idx = snippet_idx.parse::<usize>().unwrap();
92
93            let raw_json_reqs: Vec<Box<[u8]>> = job_reqs
94                .get(&job_idx)
95                .context("invalid job_idx")?
96                .get(&node_modpath)
97                .context("pipeline db did not include data for specified node")?
98                .iter()
99                .map(|v| v.0.clone())
100                .collect::<Vec<_>>();
101
102            let Some(node_handle) = NodeHandle::try_from_modpath(&node_modpath) else {
103                anyhow::bail!("could not find node with that name")
104            };
105
106            let mut node = node_handle.new_erased_node();
107
108            // each snippet gets its own isolated working dir
109            {
110                let snippet_working_dir = working_dir.join(format!(
111                    "{}_{}",
112                    node_handle.modpath().replace("::", "__"),
113                    snippet_idx
114                ));
115                if !snippet_working_dir.exists() {
116                    fs_err::create_dir_all(&snippet_working_dir)?;
117                }
118                log::trace!(
119                    "Setting current working directory from {:?} to {:?}",
120                    std::env::current_dir()?,
121                    snippet_working_dir
122                );
123                std::env::set_current_dir(snippet_working_dir)?;
124            }
125
126            // not all backends support a persistent storage dir, therefore it is optional
127            let persistent_storage_dir_var = runtime_var_db
128                .try_get_var(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR)
129                .is_some()
130                .then_some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.to_owned());
131
132            let mut rust_runtime_services =
133                flowey_core::node::steps::rust::new_rust_runtime_services(
134                    &mut runtime_var_db,
135                    flow_backend.into(),
136                    flow_platform,
137                    flow_arch,
138                );
139
140            let mut ctx_backend = ExecSnippetCtx::new(
141                flow_backend.into(),
142                flow_platform,
143                flow_arch,
144                node_handle,
145                snippet_idx,
146                dry_run,
147                persistent_storage_dir_var,
148                &mut rust_runtime_services,
149            );
150
151            let mut ctx = flowey_core::node::new_node_ctx(&mut ctx_backend);
152            node.emit(raw_json_reqs.clone(), &mut ctx)?;
153
154            match ctx_backend.into_result() {
155                Some(res) => res?,
156                None => {
157                    if dry_run {
158                        // all good, expected
159                    } else {
160                        anyhow::bail!("snippet wasn't run (invalid index)")
161                    }
162                }
163            }
164        }
165
166        // Leave the last snippet's working dir so it can be deleted by later steps
167        std::env::set_current_dir(working_dir)?;
168
169        Ok(())
170    }
171}
172
173pub struct ExecSnippetCtx<'a, 'b> {
174    flow_backend: FlowBackend,
175    flow_platform: FlowPlatform,
176    flow_arch: FlowArch,
177    node_handle: NodeHandle,
178    rust_runtime_services: &'a mut RustRuntimeServices<'b>,
179    idx_tracker: usize,
180    var_tracker: usize,
181    target_idx: usize,
182    dry_run: bool,
183    persistent_storage_dir_var: Option<String>,
184    result: Option<anyhow::Result<()>>,
185}
186
187impl<'a, 'b> ExecSnippetCtx<'a, 'b> {
188    pub fn new(
189        flow_backend: FlowBackend,
190        flow_platform: FlowPlatform,
191        flow_arch: FlowArch,
192        node_handle: NodeHandle,
193        target_idx: usize,
194        dry_run: bool,
195        persistent_storage_dir_var: Option<String>,
196        rust_runtime_services: &'a mut RustRuntimeServices<'b>,
197    ) -> Self {
198        Self {
199            flow_backend,
200            flow_platform,
201            flow_arch,
202            node_handle,
203            rust_runtime_services,
204            var_tracker: 0,
205            idx_tracker: 0,
206            target_idx,
207            dry_run,
208            persistent_storage_dir_var,
209            result: None,
210        }
211    }
212
213    pub fn into_result(self) -> Option<anyhow::Result<()>> {
214        self.result
215    }
216}
217
218impl flowey_core::node::NodeCtxBackend for ExecSnippetCtx<'_, '_> {
219    fn on_request(&mut self, _node_handle: NodeHandle, _req: anyhow::Result<Box<[u8]>>) {
220        // nothing to do - filing requests only matters pre-exec
221    }
222
223    fn on_new_var(&mut self) -> String {
224        let v = self.var_tracker;
225        self.var_tracker += 1;
226        format!("{}:{}", self.node_handle.modpath(), v)
227    }
228
229    fn on_claimed_runtime_var(&mut self, _var: &str, _is_read: bool) {
230        // nothing to do - variable claims only matter pre-exec
231    }
232
233    fn on_emit_rust_step(
234        &mut self,
235        label: &str,
236        _can_merge: bool,
237        code: Box<
238            dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
239        >,
240    ) {
241        if self.idx_tracker == self.target_idx {
242            let label = if !label.is_empty() {
243                label
244            } else {
245                "<unnamed snippet>"
246            };
247
248            self.result = Some(run_code(
249                self.flow_backend,
250                format!("{} ({})", label, self.node_handle.modpath()),
251                self.dry_run,
252                || code(self.rust_runtime_services),
253            ));
254        }
255        self.idx_tracker += 1;
256    }
257
258    fn on_emit_ado_step(
259        &mut self,
260        label: &str,
261        _yaml_snippet: Box<
262            dyn for<'a> FnOnce(
263                &'a mut flowey_core::node::user_facing::AdoStepServices<'_>,
264            ) -> String,
265        >,
266        code: Option<
267            Box<
268                dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
269            >,
270        >,
271        _condvar: Option<String>,
272    ) {
273        // don't need to care about condvar, since we wouldn't have been called
274        // if the YAML resolved the condvar to false.
275        if self.idx_tracker == self.target_idx {
276            if let Some(code) = code {
277                self.result = Some(run_code(
278                    self.flow_backend,
279                    format!(
280                        "(inline snippet) {} ({})",
281                        label,
282                        self.node_handle.modpath()
283                    ),
284                    self.dry_run,
285                    || code(self.rust_runtime_services),
286                ));
287            }
288        }
289
290        self.idx_tracker += 1;
291    }
292
293    fn on_emit_gh_step(
294        &mut self,
295        _label: &str,
296        _uses: &str,
297        _with: BTreeMap<String, ClaimedGhParam>,
298        _condvar: Option<String>,
299        _outputs: BTreeMap<String, Vec<GhOutput>>,
300        _permissions: BTreeMap<GhPermission, GhPermissionValue>,
301        _gh_to_rust: Vec<GhToRust>,
302        _rust_to_gh: Vec<RustToGh>,
303    ) {
304        self.idx_tracker += 1;
305    }
306
307    fn on_emit_side_effect_step(&mut self) {
308        // not executable, we simply skip
309    }
310
311    fn backend(&mut self) -> FlowBackend {
312        self.flow_backend
313    }
314
315    fn platform(&mut self) -> FlowPlatform {
316        self.flow_platform
317    }
318
319    fn arch(&mut self) -> FlowArch {
320        self.flow_arch
321    }
322
323    fn current_node(&self) -> NodeHandle {
324        self.node_handle
325    }
326
327    fn persistent_dir_path_var(&mut self) -> Option<String> {
328        self.persistent_storage_dir_var.clone()
329    }
330
331    fn on_unused_read_var(&mut self, _var: &str) {
332        // not relevant at runtime
333    }
334}
335
336#[derive(Serialize, Deserialize)]
337pub(crate) enum VarDbBackendKind {
338    Json,
339}
340
341#[derive(Serialize, Deserialize)]
342pub(crate) struct FloweyPipelineStaticDb {
343    pub flow_backend: FlowBackendCli,
344    pub var_db_backend_kind: VarDbBackendKind,
345    pub job_reqs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
346}
347
348// encode requests as JSON stored in a JSON string (to make human inspection
349// easier).
350#[derive(Serialize, Deserialize)]
351#[serde(transparent)]
352pub(crate) struct SerializedRequest(#[serde(with = "serialized_request")] pub Box<[u8]>);
353
354pub(crate) mod serialized_request {
355    use serde::Deserialize;
356    use serde::Deserializer;
357    use serde::Serializer;
358
359    #[expect(clippy::borrowed_box, reason = "required by serde")]
360    pub fn serialize<S: Serializer>(v: &Box<[u8]>, ser: S) -> Result<S::Ok, S::Error> {
361        ser.serialize_str(
362            &serde_json::to_string(&serde_json::from_slice::<serde_json::Value>(v).unwrap())
363                .unwrap(),
364        )
365    }
366
367    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Box<[u8]>, D::Error> {
368        let s: String = Deserialize::deserialize(d)?;
369        Ok(
370            serde_json::to_vec(&serde_json::from_str::<serde_json::Value>(&s).unwrap())
371                .unwrap()
372                .into(),
373        )
374    }
375}
376
377fn run_code(
378    flow_backend: FlowBackend,
379    label: impl std::fmt::Display,
380    dry_run: bool,
381    code: impl FnOnce() -> anyhow::Result<()>,
382) -> anyhow::Result<()> {
383    if matches!(flow_backend, FlowBackend::Ado) {
384        println!("##[group]=== {} ===", label)
385    } else {
386        // green color
387        log::info!("\x1B[0;32m=== {} ===\x1B[0m", label);
388    }
389
390    let result = if dry_run {
391        log::info!("...but not actually, because of --dry-run");
392        Ok(())
393    } else {
394        code()
395    };
396
397    // green color
398    log::info!("\x1B[0;32m=== done! ===\x1B[0m");
399
400    if matches!(flow_backend, FlowBackend::Ado) {
401        println!("##[endgroup]")
402    } else {
403        log::info!(""); // log a newline, for the pretty
404    }
405
406    result
407}