Skip to main content

flowey_cli/cli/
exec_snippet.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::FlowBackendCli;
5use anyhow::Context;
6use flowey_core::node::FlowArch;
7use flowey_core::node::FlowBackend;
8use flowey_core::node::FlowPlatform;
9use flowey_core::node::GhOutput;
10use flowey_core::node::GhToRust;
11use flowey_core::node::NodeHandle;
12use flowey_core::node::RustToGh;
13use flowey_core::node::steps::rust::RustRuntimeServices;
14use flowey_core::node::user_facing::ClaimedGhParam;
15use flowey_core::node::user_facing::GhPermission;
16use flowey_core::node::user_facing::GhPermissionValue;
17use serde::Deserialize;
18use serde::Serialize;
19use std::collections::BTreeMap;
20use std::path::PathBuf;
21
22pub fn construct_exec_snippet_cli(
23    flowey_bin: &str,
24    node_modpath: &str,
25    snippet_idx: usize,
26    job_idx: usize,
27) -> String {
28    format!(r#"{flowey_bin} e {job_idx} {node_modpath} {snippet_idx}"#)
29}
30
31/// (internal) execute an inline code snippet from the given node.
32#[derive(clap::Args)]
33pub struct ExecSnippet {
34    /// Job idx to query `pipeline_static_db` with
35    pub(crate) job_idx: usize,
36
37    node_modpath_and_snippet_idx: Vec<String>,
38
39    /// (debug) If true, the snippet will not actually be run
40    #[clap(long)]
41    dry_run: bool,
42}
43
44pub const VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR: &str = "_internal_WORKING_DIR";
45pub const VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR: &str = "_internal_PERSISTENT_STORAGE_DIR";
46
47impl ExecSnippet {
48    pub fn run(self) -> anyhow::Result<()> {
49        let Self {
50            node_modpath_and_snippet_idx,
51            job_idx,
52            dry_run,
53        } = self;
54
55        let mut runtime_var_db = super::var_db::open_var_db(job_idx)?;
56
57        let working_dir: PathBuf = {
58            let Some((working_dir, _)) =
59                runtime_var_db.try_get_var(VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR)
60            else {
61                anyhow::bail!("var db was not seeded with {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR}");
62            };
63            serde_json::from_slice::<String>(&working_dir)
64                .context(format!(
65                    "found {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR} in db, but it wasn't a json string!"
66                ))?
67                .into()
68        };
69
70        let FloweyPipelineStaticDb {
71            flow_backend,
72            var_db_backend_kind: _,
73            job_reqs,
74            job_configs,
75            job_command_wrappers,
76            job_platforms,
77            job_archs,
78        } = {
79            let current_exe = std::env::current_exe()
80                .context("failed to get path to current flowey executable")?;
81            let pipeline_static_db =
82                fs_err::File::open(current_exe.with_file_name("pipeline.json"))?;
83            serde_json::from_reader(pipeline_static_db)?
84        };
85
86        let flow_platform = *job_platforms
87            .get(&job_idx)
88            .context("invalid job_idx: missing platform")?;
89        let flow_arch = *job_archs
90            .get(&job_idx)
91            .context("invalid job_idx: missing arch")?;
92
93        let command_wrapper = job_command_wrappers.get(&job_idx).cloned();
94
95        for [node_modpath, snippet_idx] in node_modpath_and_snippet_idx
96            .chunks_exact(2)
97            .map(|x| -> [String; 2] { x.to_vec().try_into().unwrap() })
98        {
99            let snippet_idx = snippet_idx.parse::<usize>().unwrap();
100
101            let raw_json_reqs: Vec<Box<[u8]>> = job_reqs
102                .get(&job_idx)
103                .context("invalid job_idx")?
104                .get(&node_modpath)
105                .context("pipeline db did not include data for specified node")?
106                .iter()
107                .map(|v| v.0.clone())
108                .collect::<Vec<_>>();
109
110            let raw_config_bytes: Vec<Box<[u8]>> = job_configs
111                .get(&job_idx)
112                .and_then(|m| m.get(&node_modpath))
113                .map(|v| v.iter().map(|v| v.0.clone()).collect())
114                .unwrap_or_default();
115
116            let Some(node_handle) = NodeHandle::try_from_modpath(&node_modpath) else {
117                anyhow::bail!("could not find node with that name")
118            };
119
120            let mut node = node_handle.new_erased_node();
121
122            // each snippet gets its own isolated working dir
123            {
124                let snippet_working_dir = working_dir.join(format!(
125                    "{}_{}",
126                    node_handle.modpath().replace("::", "__"),
127                    snippet_idx
128                ));
129                if !snippet_working_dir.exists() {
130                    fs_err::create_dir_all(&snippet_working_dir)?;
131                }
132                log::trace!(
133                    "Setting current working directory from {:?} to {:?}",
134                    std::env::current_dir()?,
135                    snippet_working_dir
136                );
137                std::env::set_current_dir(snippet_working_dir)?;
138            }
139
140            // not all backends support a persistent storage dir, therefore it is optional
141            let persistent_storage_dir_var = runtime_var_db
142                .try_get_var(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR)
143                .is_some()
144                .then_some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.to_owned());
145
146            let mut rust_runtime_services =
147                flowey_core::node::steps::rust::new_rust_runtime_services(
148                    &mut runtime_var_db,
149                    flow_backend.into(),
150                    flow_platform,
151                    flow_arch,
152                )?;
153
154            if let Some(ref wrapper) = command_wrapper {
155                rust_runtime_services.sh.set_wrapper(Some(wrapper.clone()));
156            }
157
158            let mut ctx_backend = ExecSnippetCtx::new(
159                flow_backend.into(),
160                flow_platform,
161                flow_arch,
162                node_handle,
163                snippet_idx,
164                dry_run,
165                persistent_storage_dir_var,
166                &mut rust_runtime_services,
167            );
168
169            let mut ctx = flowey_core::node::new_node_ctx(&mut ctx_backend);
170            node.emit(raw_config_bytes, raw_json_reqs.clone(), &mut ctx)?;
171
172            match ctx_backend.into_result() {
173                Some(res) => res?,
174                None => {
175                    if dry_run {
176                        // all good, expected
177                    } else {
178                        anyhow::bail!("snippet wasn't run (invalid index)")
179                    }
180                }
181            }
182        }
183
184        // Leave the last snippet's working dir so it can be deleted by later steps
185        std::env::set_current_dir(working_dir)?;
186
187        Ok(())
188    }
189}
190
191pub struct ExecSnippetCtx<'a, 'b> {
192    flow_backend: FlowBackend,
193    flow_platform: FlowPlatform,
194    flow_arch: FlowArch,
195    node_handle: NodeHandle,
196    rust_runtime_services: &'a mut RustRuntimeServices<'b>,
197    idx_tracker: usize,
198    var_tracker: usize,
199    target_idx: usize,
200    dry_run: bool,
201    persistent_storage_dir_var: Option<String>,
202    result: Option<anyhow::Result<()>>,
203}
204
205impl<'a, 'b> ExecSnippetCtx<'a, 'b> {
206    pub fn new(
207        flow_backend: FlowBackend,
208        flow_platform: FlowPlatform,
209        flow_arch: FlowArch,
210        node_handle: NodeHandle,
211        target_idx: usize,
212        dry_run: bool,
213        persistent_storage_dir_var: Option<String>,
214        rust_runtime_services: &'a mut RustRuntimeServices<'b>,
215    ) -> Self {
216        Self {
217            flow_backend,
218            flow_platform,
219            flow_arch,
220            node_handle,
221            rust_runtime_services,
222            var_tracker: 0,
223            idx_tracker: 0,
224            target_idx,
225            dry_run,
226            persistent_storage_dir_var,
227            result: None,
228        }
229    }
230
231    pub fn into_result(self) -> Option<anyhow::Result<()>> {
232        self.result
233    }
234}
235
236impl flowey_core::node::NodeCtxBackend for ExecSnippetCtx<'_, '_> {
237    fn on_request(&mut self, _node_handle: NodeHandle, _req: anyhow::Result<Box<[u8]>>) {
238        // nothing to do - filing requests only matters pre-exec
239    }
240
241    fn on_config(&mut self, _node_handle: NodeHandle, _config: anyhow::Result<Box<[u8]>>) {
242        // nothing to do - config is already merged pre-exec
243    }
244
245    fn on_new_var(&mut self) -> String {
246        let v = self.var_tracker;
247        self.var_tracker += 1;
248        format!("{}:{}", self.node_handle.modpath(), v)
249    }
250
251    fn on_claimed_runtime_var(&mut self, _var: &str, _is_read: bool) {
252        // nothing to do - variable claims only matter pre-exec
253    }
254
255    fn on_emit_rust_step(
256        &mut self,
257        label: &str,
258        _can_merge: bool,
259        code: Box<
260            dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
261        >,
262    ) {
263        if self.idx_tracker == self.target_idx {
264            let label = if !label.is_empty() {
265                label
266            } else {
267                "<unnamed snippet>"
268            };
269
270            self.result = Some(run_code(
271                self.flow_backend,
272                format!("{} ({})", label, self.node_handle.modpath()),
273                self.dry_run,
274                || code(self.rust_runtime_services),
275            ));
276        }
277        self.idx_tracker += 1;
278    }
279
280    fn on_emit_ado_step(
281        &mut self,
282        label: &str,
283        _yaml_snippet: Box<
284            dyn for<'a> FnOnce(
285                &'a mut flowey_core::node::user_facing::AdoStepServices<'_>,
286            ) -> String,
287        >,
288        code: Option<
289            Box<
290                dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
291            >,
292        >,
293        _condvar: Option<String>,
294    ) {
295        // don't need to care about condvar, since we wouldn't have been called
296        // if the YAML resolved the condvar to false.
297        if self.idx_tracker == self.target_idx {
298            if let Some(code) = code {
299                self.result = Some(run_code(
300                    self.flow_backend,
301                    format!(
302                        "(inline snippet) {} ({})",
303                        label,
304                        self.node_handle.modpath()
305                    ),
306                    self.dry_run,
307                    || code(self.rust_runtime_services),
308                ));
309            }
310        }
311
312        self.idx_tracker += 1;
313    }
314
315    fn on_emit_gh_step(
316        &mut self,
317        _label: &str,
318        _uses: &str,
319        _with: BTreeMap<String, ClaimedGhParam>,
320        _condvar: Option<String>,
321        _outputs: BTreeMap<String, Vec<GhOutput>>,
322        _permissions: BTreeMap<GhPermission, GhPermissionValue>,
323        _gh_to_rust: Vec<GhToRust>,
324        _rust_to_gh: Vec<RustToGh>,
325    ) {
326        self.idx_tracker += 1;
327    }
328
329    fn on_emit_side_effect_step(&mut self) {
330        // not executable, we simply skip
331    }
332
333    fn backend(&mut self) -> FlowBackend {
334        self.flow_backend
335    }
336
337    fn platform(&mut self) -> FlowPlatform {
338        self.flow_platform
339    }
340
341    fn arch(&mut self) -> FlowArch {
342        self.flow_arch
343    }
344
345    fn current_node(&self) -> NodeHandle {
346        self.node_handle
347    }
348
349    fn persistent_dir_path_var(&mut self) -> Option<String> {
350        self.persistent_storage_dir_var.clone()
351    }
352
353    fn on_unused_read_var(&mut self, _var: &str) {
354        // not relevant at runtime
355    }
356}
357
358#[derive(Serialize, Deserialize)]
359pub(crate) enum VarDbBackendKind {
360    Json,
361}
362
363#[derive(Serialize, Deserialize)]
364pub(crate) struct FloweyPipelineStaticDb {
365    pub flow_backend: FlowBackendCli,
366    pub var_db_backend_kind: VarDbBackendKind,
367    pub job_reqs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
368    #[serde(default)]
369    pub job_configs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
370    pub job_command_wrappers: BTreeMap<usize, flowey_core::shell::CommandWrapperKind>,
371    pub job_platforms: BTreeMap<usize, FlowPlatform>,
372    pub job_archs: BTreeMap<usize, FlowArch>,
373}
374
375// encode requests as JSON stored in a JSON string (to make human inspection
376// easier).
377#[derive(Serialize, Deserialize)]
378#[serde(transparent)]
379pub(crate) struct SerializedRequest(#[serde(with = "serialized_request")] pub Box<[u8]>);
380
381pub(crate) mod serialized_request {
382    use serde::Deserialize;
383    use serde::Deserializer;
384    use serde::Serializer;
385
386    #[expect(clippy::borrowed_box, reason = "required by serde")]
387    pub fn serialize<S: Serializer>(v: &Box<[u8]>, ser: S) -> Result<S::Ok, S::Error> {
388        ser.serialize_str(
389            &serde_json::to_string(&serde_json::from_slice::<serde_json::Value>(v).unwrap())
390                .unwrap(),
391        )
392    }
393
394    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Box<[u8]>, D::Error> {
395        let s: String = Deserialize::deserialize(d)?;
396        Ok(
397            serde_json::to_vec(&serde_json::from_str::<serde_json::Value>(&s).unwrap())
398                .unwrap()
399                .into(),
400        )
401    }
402}
403
404fn run_code(
405    flow_backend: FlowBackend,
406    label: impl std::fmt::Display,
407    dry_run: bool,
408    code: impl FnOnce() -> anyhow::Result<()>,
409) -> anyhow::Result<()> {
410    if matches!(flow_backend, FlowBackend::Ado) {
411        println!("##[group]=== {} ===", label)
412    } else {
413        // green color
414        log::info!("\x1B[0;32m=== {} ===\x1B[0m", label);
415    }
416
417    let result = if dry_run {
418        log::info!("...but not actually, because of --dry-run");
419        Ok(())
420    } else {
421        code()
422    };
423
424    // green color
425    log::info!("\x1B[0;32m=== done! ===\x1B[0m");
426
427    if matches!(flow_backend, FlowBackend::Ado) {
428        println!("##[endgroup]")
429    } else {
430        log::info!(""); // log a newline, for the pretty
431    }
432
433    result
434}