flowey_cli/pipeline_resolver/
direct_run.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::internal::Parameter;
18use petgraph::prelude::NodeIndex;
19use petgraph::visit::EdgeRef;
20use std::collections::BTreeSet;
21use std::path::Path;
22use std::path::PathBuf;
23
24struct ResolvedRunnableStep {
25    node_handle: NodeHandle,
26    label: String,
27    code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
28    idx: usize,
29    can_merge: bool,
30}
31
32/// Directly run the pipeline using flowey
33pub fn direct_run(
34    pipeline: ResolvedPipeline,
35    windows_as_wsl: bool,
36    out_dir: PathBuf,
37    persist_dir: PathBuf,
38) -> anyhow::Result<()> {
39    direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
40
41    // cleanup
42    if out_dir.join(".job_artifacts").exists() {
43        fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
44    }
45    if out_dir.join(".work").exists() {
46        fs_err::remove_dir_all(out_dir.join(".work"))?;
47    }
48
49    Ok(())
50}
51
52fn direct_run_do_work(
53    pipeline: ResolvedPipeline,
54    windows_as_wsl: bool,
55    out_dir: PathBuf,
56    persist_dir: PathBuf,
57) -> anyhow::Result<()> {
58    fs_err::create_dir_all(&out_dir)?;
59    let out_dir = std::path::absolute(out_dir)?;
60
61    fs_err::create_dir_all(&persist_dir)?;
62    let persist_dir = std::path::absolute(persist_dir)?;
63
64    let ResolvedPipeline {
65        graph,
66        order,
67        parameters,
68        ado_name: _,
69        ado_schedule_triggers: _,
70        ado_ci_triggers: _,
71        ado_pr_triggers: _,
72        ado_bootstrap_template: _,
73        ado_resources_repository: _,
74        ado_post_process_yaml_cb: _,
75        ado_variables: _,
76        ado_job_id_overrides: _,
77        gh_name: _,
78        gh_schedule_triggers: _,
79        gh_ci_triggers: _,
80        gh_pr_triggers: _,
81        gh_bootstrap_template: _,
82    } = pipeline;
83
84    let mut skipped_jobs = BTreeSet::new();
85
86    for idx in order {
87        let ResolvedPipelineJob {
88            ref root_nodes,
89            ref patches,
90            ref label,
91            platform,
92            arch,
93            cond_param_idx,
94            ado_pool: _,
95            ado_variables: _,
96            gh_override_if: _,
97            gh_global_env: _,
98            gh_pool: _,
99            gh_permissions: _,
100            ref external_read_vars,
101            ref parameters_used,
102            ref artifacts_used,
103            ref artifacts_published,
104        } = graph[idx];
105
106        // orange color
107        log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
108        log::info!("");
109
110        if graph
111            .edges_directed(idx, petgraph::Direction::Incoming)
112            .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
113        {
114            log::error!("job depends on job that was skipped. skipping job...");
115            log::info!("");
116            skipped_jobs.insert(idx);
117            continue;
118        }
119
120        // xtask-fmt allow-target-arch oneoff-flowey
121        let flow_arch = if cfg!(target_arch = "x86_64") {
122            FlowArch::X86_64
123        // xtask-fmt allow-target-arch oneoff-flowey
124        } else if cfg!(target_arch = "aarch64") {
125            FlowArch::Aarch64
126        } else {
127            unreachable!("flowey only runs on X86_64 or Aarch64 at the moment")
128        };
129
130        match (arch, flow_arch) {
131            (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
132            _ => {
133                log::error!("mismatch between job arch and local arch. skipping job...");
134                continue;
135            }
136        }
137
138        let platform_ok = match platform {
139            FlowPlatform::Windows => cfg!(windows) || (cfg!(target_os = "linux") && windows_as_wsl),
140            FlowPlatform::Linux(_) => cfg!(target_os = "linux"),
141            FlowPlatform::MacOs => cfg!(target_os = "macos"),
142            platform => panic!("unknown platform {platform}"),
143        };
144
145        if !platform_ok {
146            log::error!("mismatch between job platform and local platform. skipping job...");
147            log::info!("");
148            if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
149                log::warn!("###");
150                log::warn!("### NOTE: detected that you're running in WSL2");
151                log::warn!(
152                    "###       if the the pipeline supports it, you can try passing --windows-as-wsl"
153                );
154                log::warn!("###");
155                log::info!("");
156            }
157            skipped_jobs.insert(idx);
158            continue;
159        }
160
161        let nodes = {
162            let mut resolved_local_steps = Vec::new();
163
164            let (mut output_graph, _, err_unreachable_nodes) =
165                crate::flow_resolver::stage1_dag::stage1_dag(
166                    FlowBackend::Local,
167                    platform,
168                    flow_arch,
169                    patches.clone(),
170                    root_nodes
171                        .clone()
172                        .into_iter()
173                        .map(|(node, requests)| (node, (true, requests)))
174                        .collect(),
175                    external_read_vars.clone(),
176                    Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
177                )?;
178
179            if err_unreachable_nodes.is_some() {
180                anyhow::bail!("detected unreachable nodes")
181            }
182
183            let output_order = petgraph::algo::toposort(&output_graph, None)
184                .map_err(|e| {
185                    format!(
186                        "includes node {}",
187                        output_graph[e.node_id()].0.node.modpath()
188                    )
189                })
190                .expect("runtime variables cannot introduce a DAG cycle");
191
192            for idx in output_order.into_iter().rev() {
193                let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
194
195                let (label, code, idx, can_merge) = match step {
196                    Step::Anchor { .. } => continue,
197                    Step::Rust {
198                        label,
199                        code,
200                        idx,
201                        can_merge,
202                    } => (label, code, idx, can_merge),
203                    Step::AdoYaml { .. } => {
204                        anyhow::bail!(
205                            "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
206                            node_handle.modpath()
207                        )
208                    }
209                    Step::GitHubYaml { .. } => {
210                        anyhow::bail!(
211                            "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
212                            node_handle.modpath()
213                        )
214                    }
215                };
216
217                resolved_local_steps.push(ResolvedRunnableStep {
218                    node_handle,
219                    label,
220                    code: code.lock().take().unwrap(),
221                    idx,
222                    can_merge,
223                });
224            }
225
226            resolved_local_steps
227        };
228
229        let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
230
231        for ResolvedJobUseParameter {
232            flowey_var,
233            pipeline_param_idx,
234        } in parameters_used
235        {
236            let (desc, value) = match &parameters[*pipeline_param_idx] {
237                Parameter::Bool {
238                    name: _,
239                    description,
240                    kind: _,
241                    default,
242                } => (
243                    description,
244                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
245                ),
246                Parameter::String {
247                    name: _,
248                    description,
249                    kind: _,
250                    default,
251                    possible_values: _,
252                } => (
253                    description,
254                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
255                ),
256                Parameter::Num {
257                    name: _,
258                    description,
259                    kind: _,
260                    default,
261                    possible_values: _,
262                } => (
263                    description,
264                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
265                ),
266            };
267
268            let Some(value) = value else {
269                anyhow::bail!(
270                    "pipeline must specify default value for params when running locally. missing default for '{desc}'"
271                )
272            };
273
274            in_mem_var_db.set_var(flowey_var, false, value);
275        }
276
277        in_mem_var_db.set_var(
278            VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
279            false,
280            serde_json::to_string(&persist_dir).unwrap().into(),
281        );
282
283        for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
284            let path = out_dir.join("artifacts").join(name);
285            fs_err::create_dir_all(&path)?;
286
287            in_mem_var_db.set_var(
288                flowey_var,
289                false,
290                serde_json::to_string(&path).unwrap().into(),
291            );
292        }
293
294        if out_dir.join(".job_artifacts").exists() {
295            fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
296        }
297        fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
298
299        for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
300            let path = out_dir.join(".job_artifacts").join(name);
301            fs_err::create_dir_all(&path)?;
302            copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
303
304            in_mem_var_db.set_var(
305                flowey_var,
306                false,
307                serde_json::to_string(&path).unwrap().into(),
308            );
309        }
310
311        if out_dir.join(".work").exists() {
312            fs_err::remove_dir_all(out_dir.join(".work"))?;
313        }
314        fs_err::create_dir_all(out_dir.join(".work"))?;
315
316        let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
317            &mut in_mem_var_db,
318            FlowBackend::Local,
319            platform,
320            flow_arch,
321        );
322
323        if let Some(cond_param_idx) = cond_param_idx {
324            let Parameter::Bool {
325                name: _,
326                description: _,
327                kind: _,
328                default,
329            } = &parameters[cond_param_idx]
330            else {
331                panic!("cond param is guaranteed to be bool by type system")
332            };
333
334            let Some(should_run) = default else {
335                anyhow::bail!(
336                    "when running locally, job condition parameter must include a default value"
337                )
338            };
339
340            if !should_run {
341                log::warn!("job condition was false - skipping job...");
342                continue;
343            }
344        }
345
346        for ResolvedRunnableStep {
347            node_handle,
348            label,
349            code,
350            idx,
351            can_merge,
352        } in nodes
353        {
354            let node_working_dir = out_dir.join(".work").join(format!(
355                "{}_{}",
356                node_handle.modpath().replace("::", "__"),
357                idx
358            ));
359            if !node_working_dir.exists() {
360                fs_err::create_dir(&node_working_dir)?;
361            }
362            std::env::set_current_dir(node_working_dir)?;
363
364            if can_merge {
365                log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
366            } else {
367                log::info!(
368                    // green color
369                    "\x1B[0;32m=== {} ({}) ===\x1B[0m",
370                    label,
371                    node_handle.modpath(),
372                );
373            }
374            code(&mut runtime_services)?;
375            if can_merge {
376                log::debug!("done!");
377                log::debug!(""); // log a newline, for the pretty
378            } else {
379                log::info!("\x1B[0;32m=== done! ===\x1B[0m");
380                log::info!(""); // log a newline, for the pretty
381            }
382        }
383    }
384
385    Ok(())
386}
387
388fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
389    fs_err::create_dir_all(&dst)?;
390    for entry in fs_err::read_dir(src.as_ref())? {
391        let entry = entry?;
392        let ty = entry.file_type()?;
393        if ty.is_dir() {
394            copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
395        } else {
396            fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
397        }
398    }
399    Ok(())
400}