flowey_cli/cli/
exec_snippet.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::FlowBackendCli;
5use anyhow::Context;
6use flowey_core::node::FlowArch;
7use flowey_core::node::FlowBackend;
8use flowey_core::node::FlowPlatform;
9use flowey_core::node::GhOutput;
10use flowey_core::node::GhToRust;
11use flowey_core::node::NodeHandle;
12use flowey_core::node::RustToGh;
13use flowey_core::node::steps::rust::RustRuntimeServices;
14use flowey_core::node::user_facing::ClaimedGhParam;
15use flowey_core::node::user_facing::GhPermission;
16use flowey_core::node::user_facing::GhPermissionValue;
17use serde::Deserialize;
18use serde::Serialize;
19use std::collections::BTreeMap;
20use std::path::PathBuf;
21
22pub fn construct_exec_snippet_cli(
23    flowey_bin: &str,
24    node_modpath: &str,
25    snippet_idx: usize,
26    job_idx: usize,
27) -> String {
28    format!(r#"{flowey_bin} e {job_idx} {node_modpath} {snippet_idx}"#)
29}
30
31/// (internal) execute an inline code snippet from the given node.
32#[derive(clap::Args)]
33pub struct ExecSnippet {
34    /// Job idx to query `pipeline_static_db` with
35    pub(crate) job_idx: usize,
36
37    node_modpath_and_snippet_idx: Vec<String>,
38
39    /// (debug) If true, the snippet will not actually be run
40    #[clap(long)]
41    dry_run: bool,
42}
43
44pub const VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR: &str = "_internal_WORKING_DIR";
45pub const VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR: &str = "_internal_PERSISTENT_STORAGE_DIR";
46
47impl ExecSnippet {
48    pub fn run(self) -> anyhow::Result<()> {
49        let Self {
50            node_modpath_and_snippet_idx,
51            job_idx,
52            dry_run,
53        } = self;
54
55        let mut runtime_var_db = super::var_db::open_var_db(job_idx)?;
56
57        let working_dir: PathBuf = {
58            let Some((working_dir, _)) =
59                runtime_var_db.try_get_var(VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR)
60            else {
61                anyhow::bail!("var db was not seeded with {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR}");
62            };
63            serde_json::from_slice::<String>(&working_dir)
64                .context(format!(
65                    "found {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR} in db, but it wasn't a json string!"
66                ))?
67                .into()
68        };
69
70        let FloweyPipelineStaticDb {
71            flow_backend,
72            var_db_backend_kind: _,
73            job_reqs,
74            job_command_wrappers,
75            job_platforms,
76            job_archs,
77        } = {
78            let current_exe = std::env::current_exe()
79                .context("failed to get path to current flowey executable")?;
80            let pipeline_static_db =
81                fs_err::File::open(current_exe.with_file_name("pipeline.json"))?;
82            serde_json::from_reader(pipeline_static_db)?
83        };
84
85        let flow_platform = *job_platforms
86            .get(&job_idx)
87            .context("invalid job_idx: missing platform")?;
88        let flow_arch = *job_archs
89            .get(&job_idx)
90            .context("invalid job_idx: missing arch")?;
91
92        let command_wrapper = job_command_wrappers.get(&job_idx).cloned();
93
94        for [node_modpath, snippet_idx] in node_modpath_and_snippet_idx
95            .chunks_exact(2)
96            .map(|x| -> [String; 2] { x.to_vec().try_into().unwrap() })
97        {
98            let snippet_idx = snippet_idx.parse::<usize>().unwrap();
99
100            let raw_json_reqs: Vec<Box<[u8]>> = job_reqs
101                .get(&job_idx)
102                .context("invalid job_idx")?
103                .get(&node_modpath)
104                .context("pipeline db did not include data for specified node")?
105                .iter()
106                .map(|v| v.0.clone())
107                .collect::<Vec<_>>();
108
109            let Some(node_handle) = NodeHandle::try_from_modpath(&node_modpath) else {
110                anyhow::bail!("could not find node with that name")
111            };
112
113            let mut node = node_handle.new_erased_node();
114
115            // each snippet gets its own isolated working dir
116            {
117                let snippet_working_dir = working_dir.join(format!(
118                    "{}_{}",
119                    node_handle.modpath().replace("::", "__"),
120                    snippet_idx
121                ));
122                if !snippet_working_dir.exists() {
123                    fs_err::create_dir_all(&snippet_working_dir)?;
124                }
125                log::trace!(
126                    "Setting current working directory from {:?} to {:?}",
127                    std::env::current_dir()?,
128                    snippet_working_dir
129                );
130                std::env::set_current_dir(snippet_working_dir)?;
131            }
132
133            // not all backends support a persistent storage dir, therefore it is optional
134            let persistent_storage_dir_var = runtime_var_db
135                .try_get_var(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR)
136                .is_some()
137                .then_some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.to_owned());
138
139            let mut rust_runtime_services =
140                flowey_core::node::steps::rust::new_rust_runtime_services(
141                    &mut runtime_var_db,
142                    flow_backend.into(),
143                    flow_platform,
144                    flow_arch,
145                )?;
146
147            if let Some(ref wrapper) = command_wrapper {
148                rust_runtime_services.sh.set_wrapper(Some(wrapper.clone()));
149            }
150
151            let mut ctx_backend = ExecSnippetCtx::new(
152                flow_backend.into(),
153                flow_platform,
154                flow_arch,
155                node_handle,
156                snippet_idx,
157                dry_run,
158                persistent_storage_dir_var,
159                &mut rust_runtime_services,
160            );
161
162            let mut ctx = flowey_core::node::new_node_ctx(&mut ctx_backend);
163            node.emit(raw_json_reqs.clone(), &mut ctx)?;
164
165            match ctx_backend.into_result() {
166                Some(res) => res?,
167                None => {
168                    if dry_run {
169                        // all good, expected
170                    } else {
171                        anyhow::bail!("snippet wasn't run (invalid index)")
172                    }
173                }
174            }
175        }
176
177        // Leave the last snippet's working dir so it can be deleted by later steps
178        std::env::set_current_dir(working_dir)?;
179
180        Ok(())
181    }
182}
183
184pub struct ExecSnippetCtx<'a, 'b> {
185    flow_backend: FlowBackend,
186    flow_platform: FlowPlatform,
187    flow_arch: FlowArch,
188    node_handle: NodeHandle,
189    rust_runtime_services: &'a mut RustRuntimeServices<'b>,
190    idx_tracker: usize,
191    var_tracker: usize,
192    target_idx: usize,
193    dry_run: bool,
194    persistent_storage_dir_var: Option<String>,
195    result: Option<anyhow::Result<()>>,
196}
197
198impl<'a, 'b> ExecSnippetCtx<'a, 'b> {
199    pub fn new(
200        flow_backend: FlowBackend,
201        flow_platform: FlowPlatform,
202        flow_arch: FlowArch,
203        node_handle: NodeHandle,
204        target_idx: usize,
205        dry_run: bool,
206        persistent_storage_dir_var: Option<String>,
207        rust_runtime_services: &'a mut RustRuntimeServices<'b>,
208    ) -> Self {
209        Self {
210            flow_backend,
211            flow_platform,
212            flow_arch,
213            node_handle,
214            rust_runtime_services,
215            var_tracker: 0,
216            idx_tracker: 0,
217            target_idx,
218            dry_run,
219            persistent_storage_dir_var,
220            result: None,
221        }
222    }
223
224    pub fn into_result(self) -> Option<anyhow::Result<()>> {
225        self.result
226    }
227}
228
229impl flowey_core::node::NodeCtxBackend for ExecSnippetCtx<'_, '_> {
230    fn on_request(&mut self, _node_handle: NodeHandle, _req: anyhow::Result<Box<[u8]>>) {
231        // nothing to do - filing requests only matters pre-exec
232    }
233
234    fn on_new_var(&mut self) -> String {
235        let v = self.var_tracker;
236        self.var_tracker += 1;
237        format!("{}:{}", self.node_handle.modpath(), v)
238    }
239
240    fn on_claimed_runtime_var(&mut self, _var: &str, _is_read: bool) {
241        // nothing to do - variable claims only matter pre-exec
242    }
243
244    fn on_emit_rust_step(
245        &mut self,
246        label: &str,
247        _can_merge: bool,
248        code: Box<
249            dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
250        >,
251    ) {
252        if self.idx_tracker == self.target_idx {
253            let label = if !label.is_empty() {
254                label
255            } else {
256                "<unnamed snippet>"
257            };
258
259            self.result = Some(run_code(
260                self.flow_backend,
261                format!("{} ({})", label, self.node_handle.modpath()),
262                self.dry_run,
263                || code(self.rust_runtime_services),
264            ));
265        }
266        self.idx_tracker += 1;
267    }
268
269    fn on_emit_ado_step(
270        &mut self,
271        label: &str,
272        _yaml_snippet: Box<
273            dyn for<'a> FnOnce(
274                &'a mut flowey_core::node::user_facing::AdoStepServices<'_>,
275            ) -> String,
276        >,
277        code: Option<
278            Box<
279                dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
280            >,
281        >,
282        _condvar: Option<String>,
283    ) {
284        // don't need to care about condvar, since we wouldn't have been called
285        // if the YAML resolved the condvar to false.
286        if self.idx_tracker == self.target_idx {
287            if let Some(code) = code {
288                self.result = Some(run_code(
289                    self.flow_backend,
290                    format!(
291                        "(inline snippet) {} ({})",
292                        label,
293                        self.node_handle.modpath()
294                    ),
295                    self.dry_run,
296                    || code(self.rust_runtime_services),
297                ));
298            }
299        }
300
301        self.idx_tracker += 1;
302    }
303
304    fn on_emit_gh_step(
305        &mut self,
306        _label: &str,
307        _uses: &str,
308        _with: BTreeMap<String, ClaimedGhParam>,
309        _condvar: Option<String>,
310        _outputs: BTreeMap<String, Vec<GhOutput>>,
311        _permissions: BTreeMap<GhPermission, GhPermissionValue>,
312        _gh_to_rust: Vec<GhToRust>,
313        _rust_to_gh: Vec<RustToGh>,
314    ) {
315        self.idx_tracker += 1;
316    }
317
318    fn on_emit_side_effect_step(&mut self) {
319        // not executable, we simply skip
320    }
321
322    fn backend(&mut self) -> FlowBackend {
323        self.flow_backend
324    }
325
326    fn platform(&mut self) -> FlowPlatform {
327        self.flow_platform
328    }
329
330    fn arch(&mut self) -> FlowArch {
331        self.flow_arch
332    }
333
334    fn current_node(&self) -> NodeHandle {
335        self.node_handle
336    }
337
338    fn persistent_dir_path_var(&mut self) -> Option<String> {
339        self.persistent_storage_dir_var.clone()
340    }
341
342    fn on_unused_read_var(&mut self, _var: &str) {
343        // not relevant at runtime
344    }
345}
346
347#[derive(Serialize, Deserialize)]
348pub(crate) enum VarDbBackendKind {
349    Json,
350}
351
352#[derive(Serialize, Deserialize)]
353pub(crate) struct FloweyPipelineStaticDb {
354    pub flow_backend: FlowBackendCli,
355    pub var_db_backend_kind: VarDbBackendKind,
356    pub job_reqs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
357    pub job_command_wrappers: BTreeMap<usize, flowey_core::shell::CommandWrapperKind>,
358    pub job_platforms: BTreeMap<usize, FlowPlatform>,
359    pub job_archs: BTreeMap<usize, FlowArch>,
360}
361
362// encode requests as JSON stored in a JSON string (to make human inspection
363// easier).
364#[derive(Serialize, Deserialize)]
365#[serde(transparent)]
366pub(crate) struct SerializedRequest(#[serde(with = "serialized_request")] pub Box<[u8]>);
367
368pub(crate) mod serialized_request {
369    use serde::Deserialize;
370    use serde::Deserializer;
371    use serde::Serializer;
372
373    #[expect(clippy::borrowed_box, reason = "required by serde")]
374    pub fn serialize<S: Serializer>(v: &Box<[u8]>, ser: S) -> Result<S::Ok, S::Error> {
375        ser.serialize_str(
376            &serde_json::to_string(&serde_json::from_slice::<serde_json::Value>(v).unwrap())
377                .unwrap(),
378        )
379    }
380
381    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Box<[u8]>, D::Error> {
382        let s: String = Deserialize::deserialize(d)?;
383        Ok(
384            serde_json::to_vec(&serde_json::from_str::<serde_json::Value>(&s).unwrap())
385                .unwrap()
386                .into(),
387        )
388    }
389}
390
391fn run_code(
392    flow_backend: FlowBackend,
393    label: impl std::fmt::Display,
394    dry_run: bool,
395    code: impl FnOnce() -> anyhow::Result<()>,
396) -> anyhow::Result<()> {
397    if matches!(flow_backend, FlowBackend::Ado) {
398        println!("##[group]=== {} ===", label)
399    } else {
400        // green color
401        log::info!("\x1B[0;32m=== {} ===\x1B[0m", label);
402    }
403
404    let result = if dry_run {
405        log::info!("...but not actually, because of --dry-run");
406        Ok(())
407    } else {
408        code()
409    };
410
411    // green color
412    log::info!("\x1B[0;32m=== done! ===\x1B[0m");
413
414    if matches!(flow_backend, FlowBackend::Ado) {
415        println!("##[endgroup]")
416    } else {
417        log::info!(""); // log a newline, for the pretty
418    }
419
420    result
421}