flowey_cli/pipeline_resolver/
direct_run.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use flowey_core::pipeline::internal::Parameter;
20use petgraph::prelude::NodeIndex;
21use petgraph::visit::EdgeRef;
22use std::collections::BTreeSet;
23use std::path::Path;
24use std::path::PathBuf;
25
26struct ResolvedRunnableStep {
27    node_handle: NodeHandle,
28    label: String,
29    code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30    idx: usize,
31    can_merge: bool,
32}
33
34/// Directly run the pipeline using flowey
35pub fn direct_run(
36    pipeline: ResolvedPipeline,
37    windows_as_wsl: bool,
38    out_dir: PathBuf,
39    persist_dir: PathBuf,
40) -> anyhow::Result<()> {
41    direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
42
43    // cleanup
44    if out_dir.join(".job_artifacts").exists() {
45        fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
46    }
47    if out_dir.join(".work").exists() {
48        fs_err::remove_dir_all(out_dir.join(".work"))?;
49    }
50
51    Ok(())
52}
53
54fn direct_run_do_work(
55    pipeline: ResolvedPipeline,
56    windows_as_wsl: bool,
57    out_dir: PathBuf,
58    persist_dir: PathBuf,
59) -> anyhow::Result<()> {
60    fs_err::create_dir_all(&out_dir)?;
61    let out_dir = std::path::absolute(out_dir)?;
62
63    fs_err::create_dir_all(&persist_dir)?;
64    let persist_dir = std::path::absolute(persist_dir)?;
65
66    let ResolvedPipeline {
67        graph,
68        order,
69        parameters,
70        ado_name: _,
71        ado_schedule_triggers: _,
72        ado_ci_triggers: _,
73        ado_pr_triggers: _,
74        ado_bootstrap_template: _,
75        ado_resources_repository: _,
76        ado_post_process_yaml_cb: _,
77        ado_variables: _,
78        ado_job_id_overrides: _,
79        gh_name: _,
80        gh_schedule_triggers: _,
81        gh_ci_triggers: _,
82        gh_pr_triggers: _,
83        gh_bootstrap_template: _,
84    } = pipeline;
85
86    let mut skipped_jobs = BTreeSet::new();
87
88    for idx in order {
89        let ResolvedPipelineJob {
90            ref root_nodes,
91            ref patches,
92            ref label,
93            platform,
94            arch,
95            cond_param_idx,
96            timeout_minutes: _,
97            ref command_wrapper,
98            ado_pool: _,
99            ado_variables: _,
100            gh_override_if: _,
101            gh_global_env: _,
102            gh_pool: _,
103            gh_permissions: _,
104            ref external_read_vars,
105            ref parameters_used,
106            ref artifacts_used,
107            ref artifacts_published,
108        } = graph[idx];
109
110        // orange color
111        log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
112        log::info!("");
113
114        if graph
115            .edges_directed(idx, petgraph::Direction::Incoming)
116            .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
117        {
118            log::error!("job depends on job that was skipped. skipping job...");
119            log::info!("");
120            skipped_jobs.insert(idx);
121            continue;
122        }
123
124        let flow_arch = FlowArch::host(PipelineBackendHint::Local);
125        match (arch, flow_arch) {
126            (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
127            _ => {
128                log::error!("mismatch between job arch and local arch. skipping job...");
129                skipped_jobs.insert(idx);
130                continue;
131            }
132        }
133
134        let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
135        let platform_ok = match (platform, flow_platform) {
136            (FlowPlatform::Windows, FlowPlatform::Windows) => true,
137            (FlowPlatform::Windows, FlowPlatform::Linux(_)) if windows_as_wsl => true,
138            (FlowPlatform::Linux(_), FlowPlatform::Linux(_)) => true,
139            (FlowPlatform::MacOs, FlowPlatform::MacOs) => true,
140            _ => false,
141        };
142
143        if !platform_ok {
144            log::error!("mismatch between job platform and local platform. skipping job...");
145            log::info!("");
146            if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
147                log::warn!("###");
148                log::warn!("### NOTE: detected that you're running in WSL2");
149                log::warn!(
150                    "###       if the the pipeline supports it, you can try passing --windows-as-wsl"
151                );
152                log::warn!("###");
153                log::info!("");
154            }
155            skipped_jobs.insert(idx);
156            continue;
157        }
158
159        // Use the job's declared platform for DAG resolution and runtime,
160        // except when --windows-as-wsl is active: in that case, the job
161        // declares Windows but we're actually running on Linux/WSL, so
162        // use the host platform instead.
163        let runtime_platform = if windows_as_wsl
164            && matches!(platform, FlowPlatform::Windows)
165            && matches!(flow_platform, FlowPlatform::Linux(_))
166        {
167            flow_platform
168        } else {
169            platform
170        };
171
172        let nodes = {
173            let mut resolved_local_steps = Vec::new();
174
175            let (mut output_graph, _, err_unreachable_nodes) =
176                crate::flow_resolver::stage1_dag::stage1_dag(
177                    FlowBackend::Local,
178                    runtime_platform,
179                    arch,
180                    patches.clone(),
181                    root_nodes
182                        .clone()
183                        .into_iter()
184                        .map(|(node, requests)| (node, (true, requests)))
185                        .collect(),
186                    external_read_vars.clone(),
187                    Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
188                )?;
189
190            if err_unreachable_nodes.is_some() {
191                anyhow::bail!("detected unreachable nodes")
192            }
193
194            let output_order = petgraph::algo::toposort(&output_graph, None)
195                .map_err(|e| {
196                    format!(
197                        "includes node {}",
198                        output_graph[e.node_id()].0.node.modpath()
199                    )
200                })
201                .expect("runtime variables cannot introduce a DAG cycle");
202
203            for idx in output_order.into_iter().rev() {
204                let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
205
206                let (label, code, idx, can_merge) = match step {
207                    Step::Anchor { .. } => continue,
208                    Step::Rust {
209                        label,
210                        code,
211                        idx,
212                        can_merge,
213                    } => (label, code, idx, can_merge),
214                    Step::AdoYaml { .. } => {
215                        anyhow::bail!(
216                            "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
217                            node_handle.modpath()
218                        )
219                    }
220                    Step::GitHubYaml { .. } => {
221                        anyhow::bail!(
222                            "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
223                            node_handle.modpath()
224                        )
225                    }
226                };
227
228                resolved_local_steps.push(ResolvedRunnableStep {
229                    node_handle,
230                    label,
231                    code: code.lock().take().unwrap(),
232                    idx,
233                    can_merge,
234                });
235            }
236
237            resolved_local_steps
238        };
239
240        let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
241
242        for ResolvedJobUseParameter {
243            flowey_var,
244            pipeline_param_idx,
245        } in parameters_used
246        {
247            log::trace!(
248                "resolving parameter idx {}, flowey_var {:?}",
249                pipeline_param_idx,
250                flowey_var
251            );
252            let (desc, value) = match &parameters[*pipeline_param_idx] {
253                Parameter::Bool {
254                    name: _,
255                    description,
256                    kind: _,
257                    default,
258                } => (
259                    description,
260                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
261                ),
262                Parameter::String {
263                    name: _,
264                    description,
265                    kind: _,
266                    default,
267                    possible_values: _,
268                } => (
269                    description,
270                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
271                ),
272                Parameter::Num {
273                    name: _,
274                    description,
275                    kind: _,
276                    default,
277                    possible_values: _,
278                } => (
279                    description,
280                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
281                ),
282            };
283
284            let Some(value) = value else {
285                anyhow::bail!(
286                    "pipeline must specify default value for params when running locally. missing default for '{desc}'"
287                )
288            };
289
290            in_mem_var_db.set_var(flowey_var, false, value);
291        }
292
293        in_mem_var_db.set_var(
294            VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
295            false,
296            serde_json::to_string(&persist_dir).unwrap().into(),
297        );
298
299        for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
300            let path = out_dir.join("artifacts").join(name);
301            fs_err::create_dir_all(&path)?;
302
303            in_mem_var_db.set_var(
304                flowey_var,
305                false,
306                serde_json::to_string(&path).unwrap().into(),
307            );
308        }
309
310        if out_dir.join(".job_artifacts").exists() {
311            fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
312        }
313        fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
314
315        for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
316            let path = out_dir.join(".job_artifacts").join(name);
317            fs_err::create_dir_all(&path)?;
318            copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
319
320            in_mem_var_db.set_var(
321                flowey_var,
322                false,
323                serde_json::to_string(&path).unwrap().into(),
324            );
325        }
326
327        if out_dir.join(".work").exists() {
328            fs_err::remove_dir_all(out_dir.join(".work"))?;
329        }
330        fs_err::create_dir_all(out_dir.join(".work"))?;
331
332        if let Some(cond_param_idx) = cond_param_idx {
333            let Parameter::Bool {
334                name,
335                description: _,
336                kind: _,
337                default: _,
338            } = &parameters[cond_param_idx]
339            else {
340                panic!("cond param is guaranteed to be bool by type system")
341            };
342
343            // Vars should have had their default already applied, so this should never fail.
344            let (data, _secret) = in_mem_var_db.get_var(name);
345            let should_run: bool = serde_json::from_slice(&data).unwrap();
346
347            if !should_run {
348                log::warn!("job condition was false - skipping job...");
349                skipped_jobs.insert(idx);
350                continue;
351            }
352        }
353
354        let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
355            &mut in_mem_var_db,
356            FlowBackend::Local,
357            runtime_platform,
358            arch,
359        )?;
360
361        if let Some(wrapper) = command_wrapper {
362            runtime_services.sh.set_wrapper(Some(wrapper.clone()));
363        }
364
365        for ResolvedRunnableStep {
366            node_handle,
367            label,
368            code,
369            idx,
370            can_merge,
371        } in nodes
372        {
373            let node_working_dir = out_dir.join(".work").join(format!(
374                "{}_{}",
375                node_handle.modpath().replace("::", "__"),
376                idx
377            ));
378            if !node_working_dir.exists() {
379                fs_err::create_dir(&node_working_dir)?;
380            }
381
382            std::env::set_current_dir(node_working_dir.clone())?;
383            runtime_services.sh.change_dir(node_working_dir);
384
385            if can_merge {
386                log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
387            } else {
388                log::info!(
389                    // green color
390                    "\x1B[0;32m=== {} ({}) ===\x1B[0m",
391                    label,
392                    node_handle.modpath(),
393                );
394            }
395            code(&mut runtime_services)?;
396            if can_merge {
397                log::debug!("done!");
398                log::debug!(""); // log a newline, for the pretty
399            } else {
400                log::info!("\x1B[0;32m=== done! ===\x1B[0m");
401                log::info!(""); // log a newline, for the pretty
402            }
403        }
404
405        // Leave the last node's working dir so it can be deleted by later steps
406        std::env::set_current_dir(&out_dir)?;
407    }
408
409    Ok(())
410}
411
412fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
413    fs_err::create_dir_all(&dst)?;
414    for entry in fs_err::read_dir(src.as_ref())? {
415        let entry = entry?;
416        let ty = entry.file_type()?;
417        if ty.is_dir() {
418            copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
419        } else {
420            fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
421        }
422    }
423    Ok(())
424}