flowey_cli/pipeline_resolver/
direct_run.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use flowey_core::pipeline::internal::Parameter;
20use petgraph::prelude::NodeIndex;
21use petgraph::visit::EdgeRef;
22use std::collections::BTreeSet;
23use std::path::Path;
24use std::path::PathBuf;
25
26struct ResolvedRunnableStep {
27    node_handle: NodeHandle,
28    label: String,
29    code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30    idx: usize,
31    can_merge: bool,
32}
33
34/// Directly run the pipeline using flowey
35pub fn direct_run(
36    pipeline: ResolvedPipeline,
37    windows_as_wsl: bool,
38    out_dir: PathBuf,
39    persist_dir: PathBuf,
40) -> anyhow::Result<()> {
41    direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
42
43    // cleanup
44    if out_dir.join(".job_artifacts").exists() {
45        fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
46    }
47    if out_dir.join(".work").exists() {
48        fs_err::remove_dir_all(out_dir.join(".work"))?;
49    }
50
51    Ok(())
52}
53
54fn direct_run_do_work(
55    pipeline: ResolvedPipeline,
56    windows_as_wsl: bool,
57    out_dir: PathBuf,
58    persist_dir: PathBuf,
59) -> anyhow::Result<()> {
60    fs_err::create_dir_all(&out_dir)?;
61    let out_dir = std::path::absolute(out_dir)?;
62
63    fs_err::create_dir_all(&persist_dir)?;
64    let persist_dir = std::path::absolute(persist_dir)?;
65
66    let ResolvedPipeline {
67        graph,
68        order,
69        parameters,
70        ado_name: _,
71        ado_schedule_triggers: _,
72        ado_ci_triggers: _,
73        ado_pr_triggers: _,
74        ado_bootstrap_template: _,
75        ado_resources_repository: _,
76        ado_post_process_yaml_cb: _,
77        ado_variables: _,
78        ado_job_id_overrides: _,
79        gh_name: _,
80        gh_schedule_triggers: _,
81        gh_ci_triggers: _,
82        gh_pr_triggers: _,
83        gh_bootstrap_template: _,
84    } = pipeline;
85
86    let mut skipped_jobs = BTreeSet::new();
87
88    for idx in order {
89        let ResolvedPipelineJob {
90            ref root_nodes,
91            ref patches,
92            ref label,
93            platform,
94            arch,
95            cond_param_idx,
96            ado_pool: _,
97            ado_variables: _,
98            gh_override_if: _,
99            gh_global_env: _,
100            gh_pool: _,
101            gh_permissions: _,
102            ref external_read_vars,
103            ref parameters_used,
104            ref artifacts_used,
105            ref artifacts_published,
106        } = graph[idx];
107
108        // orange color
109        log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
110        log::info!("");
111
112        if graph
113            .edges_directed(idx, petgraph::Direction::Incoming)
114            .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
115        {
116            log::error!("job depends on job that was skipped. skipping job...");
117            log::info!("");
118            skipped_jobs.insert(idx);
119            continue;
120        }
121
122        let flow_arch = FlowArch::host(PipelineBackendHint::Local);
123        match (arch, flow_arch) {
124            (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
125            _ => {
126                log::error!("mismatch between job arch and local arch. skipping job...");
127                continue;
128            }
129        }
130
131        let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
132        let platform_ok = match (platform, flow_platform) {
133            (FlowPlatform::Windows, FlowPlatform::Windows) => true,
134            (FlowPlatform::Windows, FlowPlatform::Linux(_)) if windows_as_wsl => true,
135            (FlowPlatform::Linux(_), FlowPlatform::Linux(_)) => true,
136            (FlowPlatform::MacOs, FlowPlatform::MacOs) => true,
137            _ => false,
138        };
139
140        if !platform_ok {
141            log::error!("mismatch between job platform and local platform. skipping job...");
142            log::info!("");
143            if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
144                log::warn!("###");
145                log::warn!("### NOTE: detected that you're running in WSL2");
146                log::warn!(
147                    "###       if the the pipeline supports it, you can try passing --windows-as-wsl"
148                );
149                log::warn!("###");
150                log::info!("");
151            }
152            skipped_jobs.insert(idx);
153            continue;
154        }
155
156        let nodes = {
157            let mut resolved_local_steps = Vec::new();
158
159            let (mut output_graph, _, err_unreachable_nodes) =
160                crate::flow_resolver::stage1_dag::stage1_dag(
161                    FlowBackend::Local,
162                    platform,
163                    flow_arch,
164                    patches.clone(),
165                    root_nodes
166                        .clone()
167                        .into_iter()
168                        .map(|(node, requests)| (node, (true, requests)))
169                        .collect(),
170                    external_read_vars.clone(),
171                    Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
172                )?;
173
174            if err_unreachable_nodes.is_some() {
175                anyhow::bail!("detected unreachable nodes")
176            }
177
178            let output_order = petgraph::algo::toposort(&output_graph, None)
179                .map_err(|e| {
180                    format!(
181                        "includes node {}",
182                        output_graph[e.node_id()].0.node.modpath()
183                    )
184                })
185                .expect("runtime variables cannot introduce a DAG cycle");
186
187            for idx in output_order.into_iter().rev() {
188                let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
189
190                let (label, code, idx, can_merge) = match step {
191                    Step::Anchor { .. } => continue,
192                    Step::Rust {
193                        label,
194                        code,
195                        idx,
196                        can_merge,
197                    } => (label, code, idx, can_merge),
198                    Step::AdoYaml { .. } => {
199                        anyhow::bail!(
200                            "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
201                            node_handle.modpath()
202                        )
203                    }
204                    Step::GitHubYaml { .. } => {
205                        anyhow::bail!(
206                            "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
207                            node_handle.modpath()
208                        )
209                    }
210                };
211
212                resolved_local_steps.push(ResolvedRunnableStep {
213                    node_handle,
214                    label,
215                    code: code.lock().take().unwrap(),
216                    idx,
217                    can_merge,
218                });
219            }
220
221            resolved_local_steps
222        };
223
224        let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
225
226        for ResolvedJobUseParameter {
227            flowey_var,
228            pipeline_param_idx,
229        } in parameters_used
230        {
231            let (desc, value) = match &parameters[*pipeline_param_idx] {
232                Parameter::Bool {
233                    name: _,
234                    description,
235                    kind: _,
236                    default,
237                } => (
238                    description,
239                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
240                ),
241                Parameter::String {
242                    name: _,
243                    description,
244                    kind: _,
245                    default,
246                    possible_values: _,
247                } => (
248                    description,
249                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
250                ),
251                Parameter::Num {
252                    name: _,
253                    description,
254                    kind: _,
255                    default,
256                    possible_values: _,
257                } => (
258                    description,
259                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
260                ),
261            };
262
263            let Some(value) = value else {
264                anyhow::bail!(
265                    "pipeline must specify default value for params when running locally. missing default for '{desc}'"
266                )
267            };
268
269            in_mem_var_db.set_var(flowey_var, false, value);
270        }
271
272        in_mem_var_db.set_var(
273            VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
274            false,
275            serde_json::to_string(&persist_dir).unwrap().into(),
276        );
277
278        for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
279            let path = out_dir.join("artifacts").join(name);
280            fs_err::create_dir_all(&path)?;
281
282            in_mem_var_db.set_var(
283                flowey_var,
284                false,
285                serde_json::to_string(&path).unwrap().into(),
286            );
287        }
288
289        if out_dir.join(".job_artifacts").exists() {
290            fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
291        }
292        fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
293
294        for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
295            let path = out_dir.join(".job_artifacts").join(name);
296            fs_err::create_dir_all(&path)?;
297            copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
298
299            in_mem_var_db.set_var(
300                flowey_var,
301                false,
302                serde_json::to_string(&path).unwrap().into(),
303            );
304        }
305
306        if out_dir.join(".work").exists() {
307            fs_err::remove_dir_all(out_dir.join(".work"))?;
308        }
309        fs_err::create_dir_all(out_dir.join(".work"))?;
310
311        let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
312            &mut in_mem_var_db,
313            FlowBackend::Local,
314            platform,
315            flow_arch,
316        );
317
318        if let Some(cond_param_idx) = cond_param_idx {
319            let Parameter::Bool {
320                name: _,
321                description: _,
322                kind: _,
323                default,
324            } = &parameters[cond_param_idx]
325            else {
326                panic!("cond param is guaranteed to be bool by type system")
327            };
328
329            let Some(should_run) = default else {
330                anyhow::bail!(
331                    "when running locally, job condition parameter must include a default value"
332                )
333            };
334
335            if !should_run {
336                log::warn!("job condition was false - skipping job...");
337                continue;
338            }
339        }
340
341        for ResolvedRunnableStep {
342            node_handle,
343            label,
344            code,
345            idx,
346            can_merge,
347        } in nodes
348        {
349            let node_working_dir = out_dir.join(".work").join(format!(
350                "{}_{}",
351                node_handle.modpath().replace("::", "__"),
352                idx
353            ));
354            if !node_working_dir.exists() {
355                fs_err::create_dir(&node_working_dir)?;
356            }
357            std::env::set_current_dir(node_working_dir)?;
358
359            if can_merge {
360                log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
361            } else {
362                log::info!(
363                    // green color
364                    "\x1B[0;32m=== {} ({}) ===\x1B[0m",
365                    label,
366                    node_handle.modpath(),
367                );
368            }
369            code(&mut runtime_services)?;
370            if can_merge {
371                log::debug!("done!");
372                log::debug!(""); // log a newline, for the pretty
373            } else {
374                log::info!("\x1B[0;32m=== done! ===\x1B[0m");
375                log::info!(""); // log a newline, for the pretty
376            }
377        }
378    }
379
380    Ok(())
381}
382
383fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
384    fs_err::create_dir_all(&dst)?;
385    for entry in fs_err::read_dir(src.as_ref())? {
386        let entry = entry?;
387        let ty = entry.file_type()?;
388        if ty.is_dir() {
389            copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
390        } else {
391            fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
392        }
393    }
394    Ok(())
395}