flowey_cli/pipeline_resolver/
direct_run.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use flowey_core::pipeline::internal::Parameter;
20use petgraph::prelude::NodeIndex;
21use petgraph::visit::EdgeRef;
22use std::collections::BTreeSet;
23use std::path::Path;
24use std::path::PathBuf;
25
26struct ResolvedRunnableStep {
27    node_handle: NodeHandle,
28    label: String,
29    code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30    idx: usize,
31    can_merge: bool,
32}
33
34/// Directly run the pipeline using flowey
35pub fn direct_run(
36    pipeline: ResolvedPipeline,
37    windows_as_wsl: bool,
38    out_dir: PathBuf,
39    persist_dir: PathBuf,
40) -> anyhow::Result<()> {
41    direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
42
43    // cleanup
44    if out_dir.join(".job_artifacts").exists() {
45        fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
46    }
47    if out_dir.join(".work").exists() {
48        fs_err::remove_dir_all(out_dir.join(".work"))?;
49    }
50
51    Ok(())
52}
53
54fn direct_run_do_work(
55    pipeline: ResolvedPipeline,
56    windows_as_wsl: bool,
57    out_dir: PathBuf,
58    persist_dir: PathBuf,
59) -> anyhow::Result<()> {
60    fs_err::create_dir_all(&out_dir)?;
61    let out_dir = std::path::absolute(out_dir)?;
62
63    fs_err::create_dir_all(&persist_dir)?;
64    let persist_dir = std::path::absolute(persist_dir)?;
65
66    let ResolvedPipeline {
67        graph,
68        order,
69        parameters,
70        ado_name: _,
71        ado_schedule_triggers: _,
72        ado_ci_triggers: _,
73        ado_pr_triggers: _,
74        ado_bootstrap_template: _,
75        ado_resources_repository: _,
76        ado_post_process_yaml_cb: _,
77        ado_variables: _,
78        ado_job_id_overrides: _,
79        gh_name: _,
80        gh_schedule_triggers: _,
81        gh_ci_triggers: _,
82        gh_pr_triggers: _,
83        gh_bootstrap_template: _,
84    } = pipeline;
85
86    let mut skipped_jobs = BTreeSet::new();
87
88    for idx in order {
89        let ResolvedPipelineJob {
90            ref root_nodes,
91            ref patches,
92            ref label,
93            platform,
94            arch,
95            cond_param_idx,
96            timeout_minutes: _,
97            ado_pool: _,
98            ado_variables: _,
99            gh_override_if: _,
100            gh_global_env: _,
101            gh_pool: _,
102            gh_permissions: _,
103            ref external_read_vars,
104            ref parameters_used,
105            ref artifacts_used,
106            ref artifacts_published,
107        } = graph[idx];
108
109        // orange color
110        log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
111        log::info!("");
112
113        if graph
114            .edges_directed(idx, petgraph::Direction::Incoming)
115            .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
116        {
117            log::error!("job depends on job that was skipped. skipping job...");
118            log::info!("");
119            skipped_jobs.insert(idx);
120            continue;
121        }
122
123        let flow_arch = FlowArch::host(PipelineBackendHint::Local);
124        match (arch, flow_arch) {
125            (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
126            _ => {
127                log::error!("mismatch between job arch and local arch. skipping job...");
128                continue;
129            }
130        }
131
132        let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
133        let platform_ok = match (platform, flow_platform) {
134            (FlowPlatform::Windows, FlowPlatform::Windows) => true,
135            (FlowPlatform::Windows, FlowPlatform::Linux(_)) if windows_as_wsl => true,
136            (FlowPlatform::Linux(_), FlowPlatform::Linux(_)) => true,
137            (FlowPlatform::MacOs, FlowPlatform::MacOs) => true,
138            _ => false,
139        };
140
141        if !platform_ok {
142            log::error!("mismatch between job platform and local platform. skipping job...");
143            log::info!("");
144            if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
145                log::warn!("###");
146                log::warn!("### NOTE: detected that you're running in WSL2");
147                log::warn!(
148                    "###       if the the pipeline supports it, you can try passing --windows-as-wsl"
149                );
150                log::warn!("###");
151                log::info!("");
152            }
153            skipped_jobs.insert(idx);
154            continue;
155        }
156
157        let nodes = {
158            let mut resolved_local_steps = Vec::new();
159
160            let (mut output_graph, _, err_unreachable_nodes) =
161                crate::flow_resolver::stage1_dag::stage1_dag(
162                    FlowBackend::Local,
163                    platform,
164                    flow_arch,
165                    patches.clone(),
166                    root_nodes
167                        .clone()
168                        .into_iter()
169                        .map(|(node, requests)| (node, (true, requests)))
170                        .collect(),
171                    external_read_vars.clone(),
172                    Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
173                )?;
174
175            if err_unreachable_nodes.is_some() {
176                anyhow::bail!("detected unreachable nodes")
177            }
178
179            let output_order = petgraph::algo::toposort(&output_graph, None)
180                .map_err(|e| {
181                    format!(
182                        "includes node {}",
183                        output_graph[e.node_id()].0.node.modpath()
184                    )
185                })
186                .expect("runtime variables cannot introduce a DAG cycle");
187
188            for idx in output_order.into_iter().rev() {
189                let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
190
191                let (label, code, idx, can_merge) = match step {
192                    Step::Anchor { .. } => continue,
193                    Step::Rust {
194                        label,
195                        code,
196                        idx,
197                        can_merge,
198                    } => (label, code, idx, can_merge),
199                    Step::AdoYaml { .. } => {
200                        anyhow::bail!(
201                            "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
202                            node_handle.modpath()
203                        )
204                    }
205                    Step::GitHubYaml { .. } => {
206                        anyhow::bail!(
207                            "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
208                            node_handle.modpath()
209                        )
210                    }
211                };
212
213                resolved_local_steps.push(ResolvedRunnableStep {
214                    node_handle,
215                    label,
216                    code: code.lock().take().unwrap(),
217                    idx,
218                    can_merge,
219                });
220            }
221
222            resolved_local_steps
223        };
224
225        let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
226
227        for ResolvedJobUseParameter {
228            flowey_var,
229            pipeline_param_idx,
230        } in parameters_used
231        {
232            log::trace!(
233                "resolving parameter idx {}, flowey_var {:?}",
234                pipeline_param_idx,
235                flowey_var
236            );
237            let (desc, value) = match &parameters[*pipeline_param_idx] {
238                Parameter::Bool {
239                    name: _,
240                    description,
241                    kind: _,
242                    default,
243                } => (
244                    description,
245                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
246                ),
247                Parameter::String {
248                    name: _,
249                    description,
250                    kind: _,
251                    default,
252                    possible_values: _,
253                } => (
254                    description,
255                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
256                ),
257                Parameter::Num {
258                    name: _,
259                    description,
260                    kind: _,
261                    default,
262                    possible_values: _,
263                } => (
264                    description,
265                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
266                ),
267            };
268
269            let Some(value) = value else {
270                anyhow::bail!(
271                    "pipeline must specify default value for params when running locally. missing default for '{desc}'"
272                )
273            };
274
275            in_mem_var_db.set_var(flowey_var, false, value);
276        }
277
278        in_mem_var_db.set_var(
279            VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
280            false,
281            serde_json::to_string(&persist_dir).unwrap().into(),
282        );
283
284        for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
285            let path = out_dir.join("artifacts").join(name);
286            fs_err::create_dir_all(&path)?;
287
288            in_mem_var_db.set_var(
289                flowey_var,
290                false,
291                serde_json::to_string(&path).unwrap().into(),
292            );
293        }
294
295        if out_dir.join(".job_artifacts").exists() {
296            fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
297        }
298        fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
299
300        for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
301            let path = out_dir.join(".job_artifacts").join(name);
302            fs_err::create_dir_all(&path)?;
303            copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
304
305            in_mem_var_db.set_var(
306                flowey_var,
307                false,
308                serde_json::to_string(&path).unwrap().into(),
309            );
310        }
311
312        if out_dir.join(".work").exists() {
313            fs_err::remove_dir_all(out_dir.join(".work"))?;
314        }
315        fs_err::create_dir_all(out_dir.join(".work"))?;
316
317        if let Some(cond_param_idx) = cond_param_idx {
318            let Parameter::Bool {
319                name,
320                description: _,
321                kind: _,
322                default: _,
323            } = &parameters[cond_param_idx]
324            else {
325                panic!("cond param is guaranteed to be bool by type system")
326            };
327
328            // Vars should have had their default already applied, so this should never fail.
329            let (data, _secret) = in_mem_var_db.get_var(name);
330            let should_run: bool = serde_json::from_slice(&data).unwrap();
331
332            if !should_run {
333                log::warn!("job condition was false - skipping job...");
334                continue;
335            }
336        }
337
338        let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
339            &mut in_mem_var_db,
340            FlowBackend::Local,
341            platform,
342            flow_arch,
343        );
344
345        for ResolvedRunnableStep {
346            node_handle,
347            label,
348            code,
349            idx,
350            can_merge,
351        } in nodes
352        {
353            let node_working_dir = out_dir.join(".work").join(format!(
354                "{}_{}",
355                node_handle.modpath().replace("::", "__"),
356                idx
357            ));
358            if !node_working_dir.exists() {
359                fs_err::create_dir(&node_working_dir)?;
360            }
361            std::env::set_current_dir(node_working_dir)?;
362
363            if can_merge {
364                log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
365            } else {
366                log::info!(
367                    // green color
368                    "\x1B[0;32m=== {} ({}) ===\x1B[0m",
369                    label,
370                    node_handle.modpath(),
371                );
372            }
373            code(&mut runtime_services)?;
374            if can_merge {
375                log::debug!("done!");
376                log::debug!(""); // log a newline, for the pretty
377            } else {
378                log::info!("\x1B[0;32m=== done! ===\x1B[0m");
379                log::info!(""); // log a newline, for the pretty
380            }
381        }
382    }
383
384    Ok(())
385}
386
387fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
388    fs_err::create_dir_all(&dst)?;
389    for entry in fs_err::read_dir(src.as_ref())? {
390        let entry = entry?;
391        let ty = entry.file_type()?;
392        if ty.is_dir() {
393            copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
394        } else {
395            fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
396        }
397    }
398    Ok(())
399}