flowey_cli/pipeline_resolver/
direct_run.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use flowey_core::pipeline::internal::Parameter;
20use petgraph::prelude::NodeIndex;
21use petgraph::visit::EdgeRef;
22use std::collections::BTreeSet;
23use std::path::Path;
24use std::path::PathBuf;
25
26struct ResolvedRunnableStep {
27    node_handle: NodeHandle,
28    label: String,
29    code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30    idx: usize,
31    can_merge: bool,
32}
33
34/// Directly run the pipeline using flowey
35pub fn direct_run(
36    pipeline: ResolvedPipeline,
37    windows_as_wsl: bool,
38    out_dir: PathBuf,
39    persist_dir: PathBuf,
40) -> anyhow::Result<()> {
41    direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
42
43    // cleanup
44    if out_dir.join(".job_artifacts").exists() {
45        fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
46    }
47    if out_dir.join(".work").exists() {
48        fs_err::remove_dir_all(out_dir.join(".work"))?;
49    }
50
51    Ok(())
52}
53
54fn direct_run_do_work(
55    pipeline: ResolvedPipeline,
56    windows_as_wsl: bool,
57    out_dir: PathBuf,
58    persist_dir: PathBuf,
59) -> anyhow::Result<()> {
60    fs_err::create_dir_all(&out_dir)?;
61    let out_dir = std::path::absolute(out_dir)?;
62
63    fs_err::create_dir_all(&persist_dir)?;
64    let persist_dir = std::path::absolute(persist_dir)?;
65
66    let ResolvedPipeline {
67        graph,
68        order,
69        parameters,
70        ado_name: _,
71        ado_schedule_triggers: _,
72        ado_ci_triggers: _,
73        ado_pr_triggers: _,
74        ado_bootstrap_template: _,
75        ado_resources_repository: _,
76        ado_post_process_yaml_cb: _,
77        ado_variables: _,
78        ado_job_id_overrides: _,
79        gh_name: _,
80        gh_schedule_triggers: _,
81        gh_ci_triggers: _,
82        gh_pr_triggers: _,
83        gh_bootstrap_template: _,
84    } = pipeline;
85
86    let mut skipped_jobs = BTreeSet::new();
87
88    for idx in order {
89        let ResolvedPipelineJob {
90            ref root_nodes,
91            ref patches,
92            ref label,
93            platform,
94            arch,
95            cond_param_idx,
96            timeout_minutes: _,
97            ado_pool: _,
98            ado_variables: _,
99            gh_override_if: _,
100            gh_global_env: _,
101            gh_pool: _,
102            gh_permissions: _,
103            ref external_read_vars,
104            ref parameters_used,
105            ref artifacts_used,
106            ref artifacts_published,
107        } = graph[idx];
108
109        // orange color
110        log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
111        log::info!("");
112
113        if graph
114            .edges_directed(idx, petgraph::Direction::Incoming)
115            .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
116        {
117            log::error!("job depends on job that was skipped. skipping job...");
118            log::info!("");
119            skipped_jobs.insert(idx);
120            continue;
121        }
122
123        let flow_arch = FlowArch::host(PipelineBackendHint::Local);
124        match (arch, flow_arch) {
125            (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
126            _ => {
127                log::error!("mismatch between job arch and local arch. skipping job...");
128                skipped_jobs.insert(idx);
129                continue;
130            }
131        }
132
133        let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
134        let platform_ok = match (platform, flow_platform) {
135            (FlowPlatform::Windows, FlowPlatform::Windows) => true,
136            (FlowPlatform::Windows, FlowPlatform::Linux(_)) if windows_as_wsl => true,
137            (FlowPlatform::Linux(_), FlowPlatform::Linux(_)) => true,
138            (FlowPlatform::MacOs, FlowPlatform::MacOs) => true,
139            _ => false,
140        };
141
142        if !platform_ok {
143            log::error!("mismatch between job platform and local platform. skipping job...");
144            log::info!("");
145            if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
146                log::warn!("###");
147                log::warn!("### NOTE: detected that you're running in WSL2");
148                log::warn!(
149                    "###       if the the pipeline supports it, you can try passing --windows-as-wsl"
150                );
151                log::warn!("###");
152                log::info!("");
153            }
154            skipped_jobs.insert(idx);
155            continue;
156        }
157
158        let nodes = {
159            let mut resolved_local_steps = Vec::new();
160
161            let (mut output_graph, _, err_unreachable_nodes) =
162                crate::flow_resolver::stage1_dag::stage1_dag(
163                    FlowBackend::Local,
164                    platform,
165                    flow_arch,
166                    patches.clone(),
167                    root_nodes
168                        .clone()
169                        .into_iter()
170                        .map(|(node, requests)| (node, (true, requests)))
171                        .collect(),
172                    external_read_vars.clone(),
173                    Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
174                )?;
175
176            if err_unreachable_nodes.is_some() {
177                anyhow::bail!("detected unreachable nodes")
178            }
179
180            let output_order = petgraph::algo::toposort(&output_graph, None)
181                .map_err(|e| {
182                    format!(
183                        "includes node {}",
184                        output_graph[e.node_id()].0.node.modpath()
185                    )
186                })
187                .expect("runtime variables cannot introduce a DAG cycle");
188
189            for idx in output_order.into_iter().rev() {
190                let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
191
192                let (label, code, idx, can_merge) = match step {
193                    Step::Anchor { .. } => continue,
194                    Step::Rust {
195                        label,
196                        code,
197                        idx,
198                        can_merge,
199                    } => (label, code, idx, can_merge),
200                    Step::AdoYaml { .. } => {
201                        anyhow::bail!(
202                            "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
203                            node_handle.modpath()
204                        )
205                    }
206                    Step::GitHubYaml { .. } => {
207                        anyhow::bail!(
208                            "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
209                            node_handle.modpath()
210                        )
211                    }
212                };
213
214                resolved_local_steps.push(ResolvedRunnableStep {
215                    node_handle,
216                    label,
217                    code: code.lock().take().unwrap(),
218                    idx,
219                    can_merge,
220                });
221            }
222
223            resolved_local_steps
224        };
225
226        let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
227
228        for ResolvedJobUseParameter {
229            flowey_var,
230            pipeline_param_idx,
231        } in parameters_used
232        {
233            log::trace!(
234                "resolving parameter idx {}, flowey_var {:?}",
235                pipeline_param_idx,
236                flowey_var
237            );
238            let (desc, value) = match &parameters[*pipeline_param_idx] {
239                Parameter::Bool {
240                    name: _,
241                    description,
242                    kind: _,
243                    default,
244                } => (
245                    description,
246                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
247                ),
248                Parameter::String {
249                    name: _,
250                    description,
251                    kind: _,
252                    default,
253                    possible_values: _,
254                } => (
255                    description,
256                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
257                ),
258                Parameter::Num {
259                    name: _,
260                    description,
261                    kind: _,
262                    default,
263                    possible_values: _,
264                } => (
265                    description,
266                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
267                ),
268            };
269
270            let Some(value) = value else {
271                anyhow::bail!(
272                    "pipeline must specify default value for params when running locally. missing default for '{desc}'"
273                )
274            };
275
276            in_mem_var_db.set_var(flowey_var, false, value);
277        }
278
279        in_mem_var_db.set_var(
280            VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
281            false,
282            serde_json::to_string(&persist_dir).unwrap().into(),
283        );
284
285        for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
286            let path = out_dir.join("artifacts").join(name);
287            fs_err::create_dir_all(&path)?;
288
289            in_mem_var_db.set_var(
290                flowey_var,
291                false,
292                serde_json::to_string(&path).unwrap().into(),
293            );
294        }
295
296        if out_dir.join(".job_artifacts").exists() {
297            fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
298        }
299        fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
300
301        for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
302            let path = out_dir.join(".job_artifacts").join(name);
303            fs_err::create_dir_all(&path)?;
304            copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
305
306            in_mem_var_db.set_var(
307                flowey_var,
308                false,
309                serde_json::to_string(&path).unwrap().into(),
310            );
311        }
312
313        if out_dir.join(".work").exists() {
314            fs_err::remove_dir_all(out_dir.join(".work"))?;
315        }
316        fs_err::create_dir_all(out_dir.join(".work"))?;
317
318        if let Some(cond_param_idx) = cond_param_idx {
319            let Parameter::Bool {
320                name,
321                description: _,
322                kind: _,
323                default: _,
324            } = &parameters[cond_param_idx]
325            else {
326                panic!("cond param is guaranteed to be bool by type system")
327            };
328
329            // Vars should have had their default already applied, so this should never fail.
330            let (data, _secret) = in_mem_var_db.get_var(name);
331            let should_run: bool = serde_json::from_slice(&data).unwrap();
332
333            if !should_run {
334                log::warn!("job condition was false - skipping job...");
335                skipped_jobs.insert(idx);
336                continue;
337            }
338        }
339
340        let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
341            &mut in_mem_var_db,
342            FlowBackend::Local,
343            platform,
344            flow_arch,
345        );
346
347        for ResolvedRunnableStep {
348            node_handle,
349            label,
350            code,
351            idx,
352            can_merge,
353        } in nodes
354        {
355            let node_working_dir = out_dir.join(".work").join(format!(
356                "{}_{}",
357                node_handle.modpath().replace("::", "__"),
358                idx
359            ));
360            if !node_working_dir.exists() {
361                fs_err::create_dir(&node_working_dir)?;
362            }
363            std::env::set_current_dir(node_working_dir)?;
364
365            if can_merge {
366                log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
367            } else {
368                log::info!(
369                    // green color
370                    "\x1B[0;32m=== {} ({}) ===\x1B[0m",
371                    label,
372                    node_handle.modpath(),
373                );
374            }
375            code(&mut runtime_services)?;
376            if can_merge {
377                log::debug!("done!");
378                log::debug!(""); // log a newline, for the pretty
379            } else {
380                log::info!("\x1B[0;32m=== done! ===\x1B[0m");
381                log::info!(""); // log a newline, for the pretty
382            }
383        }
384
385        // Leave the last node's working dir so it can be deleted by later steps
386        std::env::set_current_dir(&out_dir)?;
387    }
388
389    Ok(())
390}
391
392fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
393    fs_err::create_dir_all(&dst)?;
394    for entry in fs_err::read_dir(src.as_ref())? {
395        let entry = entry?;
396        let ty = entry.file_type()?;
397        if ty.is_dir() {
398            copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
399        } else {
400            fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
401        }
402    }
403    Ok(())
404}