flowey_cli/pipeline_resolver/
direct_run.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::internal::Parameter;
18use petgraph::prelude::NodeIndex;
19use petgraph::visit::EdgeRef;
20use std::collections::BTreeSet;
21use std::path::Path;
22use std::path::PathBuf;
23
24pub struct ResolvedRunnableNode {
25    pub node_handle: NodeHandle,
26    pub steps: Vec<(
27        usize,
28        String,
29        Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30    )>,
31}
32
33/// Directly run the pipeline using flowey
34pub fn direct_run(
35    pipeline: ResolvedPipeline,
36    windows_as_wsl: bool,
37    out_dir: PathBuf,
38    persist_dir: PathBuf,
39) -> anyhow::Result<()> {
40    direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
41
42    // cleanup
43    if out_dir.join(".job_artifacts").exists() {
44        fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
45    }
46    if out_dir.join(".work").exists() {
47        fs_err::remove_dir_all(out_dir.join(".work"))?;
48    }
49
50    Ok(())
51}
52
53fn direct_run_do_work(
54    pipeline: ResolvedPipeline,
55    windows_as_wsl: bool,
56    out_dir: PathBuf,
57    persist_dir: PathBuf,
58) -> anyhow::Result<()> {
59    fs_err::create_dir_all(&out_dir)?;
60    let out_dir = std::path::absolute(out_dir)?;
61
62    fs_err::create_dir_all(&persist_dir)?;
63    let persist_dir = std::path::absolute(persist_dir)?;
64
65    let ResolvedPipeline {
66        graph,
67        order,
68        parameters,
69        ado_name: _,
70        ado_schedule_triggers: _,
71        ado_ci_triggers: _,
72        ado_pr_triggers: _,
73        ado_bootstrap_template: _,
74        ado_resources_repository: _,
75        ado_post_process_yaml_cb: _,
76        ado_variables: _,
77        ado_job_id_overrides: _,
78        gh_name: _,
79        gh_schedule_triggers: _,
80        gh_ci_triggers: _,
81        gh_pr_triggers: _,
82        gh_bootstrap_template: _,
83    } = pipeline;
84
85    let mut skipped_jobs = BTreeSet::new();
86
87    for idx in order {
88        let ResolvedPipelineJob {
89            ref root_nodes,
90            ref patches,
91            ref label,
92            platform,
93            arch,
94            cond_param_idx,
95            ado_pool: _,
96            ado_variables: _,
97            gh_override_if: _,
98            gh_global_env: _,
99            gh_pool: _,
100            gh_permissions: _,
101            ref external_read_vars,
102            ref parameters_used,
103            ref artifacts_used,
104            ref artifacts_published,
105        } = graph[idx];
106
107        // orange color
108        log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
109        log::info!("");
110
111        if graph
112            .edges_directed(idx, petgraph::Direction::Incoming)
113            .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
114        {
115            log::error!("job depends on job that was skipped. skipping job...");
116            log::info!("");
117            skipped_jobs.insert(idx);
118            continue;
119        }
120
121        // xtask-fmt allow-target-arch oneoff-flowey
122        let flow_arch = if cfg!(target_arch = "x86_64") {
123            FlowArch::X86_64
124        // xtask-fmt allow-target-arch oneoff-flowey
125        } else if cfg!(target_arch = "aarch64") {
126            FlowArch::Aarch64
127        } else {
128            unreachable!("flowey only runs on X86_64 or Aarch64 at the moment")
129        };
130
131        match (arch, flow_arch) {
132            (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
133            _ => {
134                log::error!("mismatch between job arch and local arch. skipping job...");
135                continue;
136            }
137        }
138
139        let platform_ok = match platform {
140            FlowPlatform::Windows => cfg!(windows) || (cfg!(target_os = "linux") && windows_as_wsl),
141            FlowPlatform::Linux(_) => cfg!(target_os = "linux"),
142            FlowPlatform::MacOs => cfg!(target_os = "macos"),
143            platform => panic!("unknown platform {platform}"),
144        };
145
146        if !platform_ok {
147            log::error!("mismatch between job platform and local platform. skipping job...");
148            log::info!("");
149            if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
150                log::warn!("###");
151                log::warn!("### NOTE: detected that you're running in WSL2");
152                log::warn!(
153                    "###       if the the pipeline supports it, you can try passing --windows-as-wsl"
154                );
155                log::warn!("###");
156                log::info!("");
157            }
158            skipped_jobs.insert(idx);
159            continue;
160        }
161
162        let nodes = {
163            let mut resolved_local_nodes = Vec::new();
164
165            let (mut output_graph, _, err_unreachable_nodes) =
166                crate::flow_resolver::stage1_dag::stage1_dag(
167                    FlowBackend::Local,
168                    platform,
169                    flow_arch,
170                    patches.clone(),
171                    root_nodes
172                        .clone()
173                        .into_iter()
174                        .map(|(node, requests)| (node, (true, requests)))
175                        .collect(),
176                    external_read_vars.clone(),
177                    Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
178                )?;
179
180            if err_unreachable_nodes.is_some() {
181                anyhow::bail!("detected unreachable nodes")
182            }
183
184            let output_order = petgraph::algo::toposort(&output_graph, None)
185                .map_err(|e| {
186                    format!(
187                        "includes node {}",
188                        output_graph[e.node_id()].0.node.modpath()
189                    )
190                })
191                .expect("runtime variables cannot introduce a DAG cycle");
192
193            for idx in output_order.into_iter().rev() {
194                let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
195
196                let mut steps = Vec::new();
197                let (label, code, idx) = match step {
198                    Step::Anchor { .. } => continue,
199                    Step::Rust {
200                        label,
201                        code,
202                        idx,
203                        can_merge: _,
204                    } => (label, code, idx),
205                    Step::AdoYaml { .. } => {
206                        anyhow::bail!(
207                            "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
208                            node_handle.modpath()
209                        )
210                    }
211                    Step::GitHubYaml { .. } => {
212                        anyhow::bail!(
213                            "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
214                            node_handle.modpath()
215                        )
216                    }
217                };
218                steps.push((idx, label, code.lock().take().unwrap()));
219
220                resolved_local_nodes.push(ResolvedRunnableNode { node_handle, steps })
221            }
222
223            resolved_local_nodes
224        };
225
226        let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
227
228        for ResolvedJobUseParameter {
229            flowey_var,
230            pipeline_param_idx,
231        } in parameters_used
232        {
233            let (desc, value) = match &parameters[*pipeline_param_idx] {
234                Parameter::Bool {
235                    name: _,
236                    description,
237                    kind: _,
238                    default,
239                } => (
240                    description,
241                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
242                ),
243                Parameter::String {
244                    name: _,
245                    description,
246                    kind: _,
247                    default,
248                    possible_values: _,
249                } => (
250                    description,
251                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
252                ),
253                Parameter::Num {
254                    name: _,
255                    description,
256                    kind: _,
257                    default,
258                    possible_values: _,
259                } => (
260                    description,
261                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
262                ),
263            };
264
265            let Some(value) = value else {
266                anyhow::bail!(
267                    "pipeline must specify default value for params when running locally. missing default for '{desc}'"
268                )
269            };
270
271            in_mem_var_db.set_var(flowey_var, false, value);
272        }
273
274        in_mem_var_db.set_var(
275            VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
276            false,
277            serde_json::to_string(&persist_dir).unwrap().into(),
278        );
279
280        for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
281            let path = out_dir.join("artifacts").join(name);
282            fs_err::create_dir_all(&path)?;
283
284            in_mem_var_db.set_var(
285                flowey_var,
286                false,
287                serde_json::to_string(&path).unwrap().into(),
288            );
289        }
290
291        if out_dir.join(".job_artifacts").exists() {
292            fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
293        }
294        fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
295
296        for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
297            let path = out_dir.join(".job_artifacts").join(name);
298            fs_err::create_dir_all(&path)?;
299            copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
300
301            in_mem_var_db.set_var(
302                flowey_var,
303                false,
304                serde_json::to_string(&path).unwrap().into(),
305            );
306        }
307
308        if out_dir.join(".work").exists() {
309            fs_err::remove_dir_all(out_dir.join(".work"))?;
310        }
311        fs_err::create_dir_all(out_dir.join(".work"))?;
312
313        let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
314            &mut in_mem_var_db,
315            FlowBackend::Local,
316            platform,
317            flow_arch,
318        );
319
320        if let Some(cond_param_idx) = cond_param_idx {
321            let Parameter::Bool {
322                name: _,
323                description: _,
324                kind: _,
325                default,
326            } = &parameters[cond_param_idx]
327            else {
328                panic!("cond param is guaranteed to be bool by type system")
329            };
330
331            let Some(should_run) = default else {
332                anyhow::bail!(
333                    "when running locally, job condition parameter must include a default value"
334                )
335            };
336
337            if !should_run {
338                log::warn!("job condition was false - skipping job...");
339                continue;
340            }
341        }
342
343        for ResolvedRunnableNode { node_handle, steps } in nodes {
344            for (idx, label, code) in steps {
345                let node_working_dir = out_dir.join(".work").join(format!(
346                    "{}_{}",
347                    node_handle.modpath().replace("::", "__"),
348                    idx
349                ));
350                if !node_working_dir.exists() {
351                    fs_err::create_dir(&node_working_dir)?;
352                }
353                std::env::set_current_dir(node_working_dir)?;
354
355                log::info!(
356                    // green color
357                    "\x1B[0;32m=== {} ({}) ===\x1B[0m",
358                    label,
359                    node_handle.modpath(),
360                );
361                code(&mut runtime_services)?;
362                log::info!("\x1B[0;32m=== done! ===\x1B[0m");
363                log::info!(""); // log a newline, for the pretty
364            }
365        }
366    }
367
368    Ok(())
369}
370
371fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
372    fs_err::create_dir_all(&dst)?;
373    for entry in fs_err::read_dir(src.as_ref())? {
374        let entry = entry?;
375        let ty = entry.file_type()?;
376        if ty.is_dir() {
377            copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
378        } else {
379            fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
380        }
381    }
382    Ok(())
383}