Skip to main content

flowey_cli/pipeline_resolver/
direct_run.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use flowey_core::pipeline::internal::Parameter;
20use petgraph::prelude::NodeIndex;
21use petgraph::visit::EdgeRef;
22use std::collections::BTreeSet;
23use std::path::Path;
24use std::path::PathBuf;
25
26struct ResolvedRunnableStep {
27    node_handle: NodeHandle,
28    label: String,
29    code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30    idx: usize,
31    can_merge: bool,
32}
33
34/// Directly run the pipeline using flowey
35pub fn direct_run(
36    pipeline: ResolvedPipeline,
37    windows_as_wsl: bool,
38    out_dir: PathBuf,
39    persist_dir: PathBuf,
40) -> anyhow::Result<()> {
41    direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
42
43    // cleanup
44    if out_dir.join(".job_artifacts").exists() {
45        fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
46    }
47    if out_dir.join(".work").exists() {
48        fs_err::remove_dir_all(out_dir.join(".work"))?;
49    }
50
51    Ok(())
52}
53
54fn direct_run_do_work(
55    pipeline: ResolvedPipeline,
56    windows_as_wsl: bool,
57    out_dir: PathBuf,
58    persist_dir: PathBuf,
59) -> anyhow::Result<()> {
60    fs_err::create_dir_all(&out_dir)?;
61    let out_dir = std::path::absolute(out_dir)?;
62
63    fs_err::create_dir_all(&persist_dir)?;
64    let persist_dir = std::path::absolute(persist_dir)?;
65
66    let ResolvedPipeline {
67        graph,
68        order,
69        parameters,
70        ado_name: _,
71        ado_schedule_triggers: _,
72        ado_ci_triggers: _,
73        ado_pr_triggers: _,
74        ado_bootstrap_template: _,
75        ado_resources_repository: _,
76        ado_post_process_yaml_cb: _,
77        ado_variables: _,
78        ado_job_id_overrides: _,
79        gh_name: _,
80        gh_schedule_triggers: _,
81        gh_ci_triggers: _,
82        gh_pr_triggers: _,
83        gh_bootstrap_template: _,
84    } = pipeline;
85
86    let mut skipped_jobs = BTreeSet::new();
87
88    for idx in order {
89        let ResolvedPipelineJob {
90            ref root_nodes,
91            ref root_configs,
92            ref patches,
93            ref label,
94            platform,
95            arch,
96            cond_param_idx,
97            timeout_minutes: _,
98            ref command_wrapper,
99            ado_pool: _,
100            ado_variables: _,
101            gh_override_if: _,
102            gh_global_env: _,
103            gh_pool: _,
104            gh_permissions: _,
105            ref external_read_vars,
106            ref parameters_used,
107            ref artifacts_used,
108            ref artifacts_published,
109        } = graph[idx];
110
111        // orange color
112        log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
113        log::info!("");
114
115        if graph
116            .edges_directed(idx, petgraph::Direction::Incoming)
117            .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
118        {
119            log::error!("job depends on job that was skipped. skipping job...");
120            log::info!("");
121            skipped_jobs.insert(idx);
122            continue;
123        }
124
125        let flow_arch = FlowArch::host(PipelineBackendHint::Local);
126        match (arch, flow_arch) {
127            (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
128            _ => {
129                log::error!("mismatch between job arch and local arch. skipping job...");
130                skipped_jobs.insert(idx);
131                continue;
132            }
133        }
134
135        let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
136        let platform_ok = match (platform, flow_platform) {
137            (FlowPlatform::Windows, FlowPlatform::Windows) => true,
138            (FlowPlatform::Windows, FlowPlatform::Linux(_)) if windows_as_wsl => true,
139            (FlowPlatform::Linux(_), FlowPlatform::Linux(_)) => true,
140            (FlowPlatform::MacOs, FlowPlatform::MacOs) => true,
141            _ => false,
142        };
143
144        if !platform_ok {
145            log::error!("mismatch between job platform and local platform. skipping job...");
146            log::info!("");
147            if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
148                log::warn!("###");
149                log::warn!("### NOTE: detected that you're running in WSL2");
150                log::warn!(
151                    "###       if the the pipeline supports it, you can try passing --windows-as-wsl"
152                );
153                log::warn!("###");
154                log::info!("");
155            }
156            skipped_jobs.insert(idx);
157            continue;
158        }
159
160        // Use the job's declared platform for DAG resolution and runtime,
161        // except when --windows-as-wsl is active: in that case, the job
162        // declares Windows but we're actually running on Linux/WSL, so
163        // use the host platform instead.
164        let runtime_platform = if windows_as_wsl
165            && matches!(platform, FlowPlatform::Windows)
166            && matches!(flow_platform, FlowPlatform::Linux(_))
167        {
168            flow_platform
169        } else {
170            platform
171        };
172
173        let nodes = {
174            let mut resolved_local_steps = Vec::new();
175
176            let crate::flow_resolver::stage1_dag::Stage1DagOutput {
177                mut output_graph,
178                found_unreachable_nodes,
179                ..
180            } = crate::flow_resolver::stage1_dag::stage1_dag(
181                FlowBackend::Local,
182                runtime_platform,
183                arch,
184                patches.clone(),
185                root_nodes
186                    .clone()
187                    .into_iter()
188                    .map(|(node, requests)| (node, (true, requests)))
189                    .collect(),
190                root_configs.clone(),
191                external_read_vars.clone(),
192                Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
193            )?;
194
195            if found_unreachable_nodes {
196                anyhow::bail!("detected unreachable nodes")
197            }
198
199            let output_order = petgraph::algo::toposort(&output_graph, None)
200                .map_err(|e| {
201                    format!(
202                        "includes node {}",
203                        output_graph[e.node_id()].0.node.modpath()
204                    )
205                })
206                .expect("runtime variables cannot introduce a DAG cycle");
207
208            for idx in output_order.into_iter().rev() {
209                let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
210
211                let (label, code, idx, can_merge) = match step {
212                    Step::Anchor { .. } => continue,
213                    Step::Rust {
214                        label,
215                        code,
216                        idx,
217                        can_merge,
218                    } => (label, code, idx, can_merge),
219                    Step::AdoYaml { .. } => {
220                        anyhow::bail!(
221                            "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
222                            node_handle.modpath()
223                        )
224                    }
225                    Step::GitHubYaml { .. } => {
226                        anyhow::bail!(
227                            "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
228                            node_handle.modpath()
229                        )
230                    }
231                };
232
233                resolved_local_steps.push(ResolvedRunnableStep {
234                    node_handle,
235                    label,
236                    code: code.lock().take().unwrap(),
237                    idx,
238                    can_merge,
239                });
240            }
241
242            resolved_local_steps
243        };
244
245        let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
246
247        for ResolvedJobUseParameter {
248            flowey_var,
249            pipeline_param_idx,
250        } in parameters_used
251        {
252            log::trace!(
253                "resolving parameter idx {}, flowey_var {:?}",
254                pipeline_param_idx,
255                flowey_var
256            );
257            let (desc, value) = match &parameters[*pipeline_param_idx] {
258                Parameter::Bool {
259                    name: _,
260                    description,
261                    kind: _,
262                    default,
263                } => (
264                    description,
265                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
266                ),
267                Parameter::String {
268                    name: _,
269                    description,
270                    kind: _,
271                    default,
272                    possible_values: _,
273                } => (
274                    description,
275                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
276                ),
277                Parameter::Num {
278                    name: _,
279                    description,
280                    kind: _,
281                    default,
282                    possible_values: _,
283                } => (
284                    description,
285                    default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
286                ),
287            };
288
289            let Some(value) = value else {
290                anyhow::bail!(
291                    "pipeline must specify default value for params when running locally. missing default for '{desc}'"
292                )
293            };
294
295            in_mem_var_db.set_var(flowey_var, false, value);
296        }
297
298        in_mem_var_db.set_var(
299            VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
300            false,
301            serde_json::to_string(&persist_dir).unwrap().into(),
302        );
303
304        for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
305            let path = out_dir.join("artifacts").join(name);
306            fs_err::create_dir_all(&path)?;
307
308            in_mem_var_db.set_var(
309                flowey_var,
310                false,
311                serde_json::to_string(&path).unwrap().into(),
312            );
313        }
314
315        if out_dir.join(".job_artifacts").exists() {
316            fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
317        }
318        fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
319
320        for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
321            let path = out_dir.join(".job_artifacts").join(name);
322            fs_err::create_dir_all(&path)?;
323            copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
324
325            in_mem_var_db.set_var(
326                flowey_var,
327                false,
328                serde_json::to_string(&path).unwrap().into(),
329            );
330        }
331
332        if out_dir.join(".work").exists() {
333            fs_err::remove_dir_all(out_dir.join(".work"))?;
334        }
335        fs_err::create_dir_all(out_dir.join(".work"))?;
336
337        if let Some(cond_param_idx) = cond_param_idx {
338            let Parameter::Bool {
339                name,
340                description: _,
341                kind: _,
342                default: _,
343            } = &parameters[cond_param_idx]
344            else {
345                panic!("cond param is guaranteed to be bool by type system")
346            };
347
348            // Vars should have had their default already applied, so this should never fail.
349            let (data, _secret) = in_mem_var_db.get_var(name);
350            let should_run: bool = serde_json::from_slice(&data).unwrap();
351
352            if !should_run {
353                log::warn!("job condition was false - skipping job...");
354                skipped_jobs.insert(idx);
355                continue;
356            }
357        }
358
359        let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
360            &mut in_mem_var_db,
361            FlowBackend::Local,
362            runtime_platform,
363            arch,
364        )?;
365
366        if let Some(wrapper) = command_wrapper {
367            runtime_services.sh.set_wrapper(Some(wrapper.clone()));
368        }
369
370        for ResolvedRunnableStep {
371            node_handle,
372            label,
373            code,
374            idx,
375            can_merge,
376        } in nodes
377        {
378            let node_working_dir = out_dir.join(".work").join(format!(
379                "{}_{}",
380                node_handle.modpath().replace("::", "__"),
381                idx
382            ));
383            if !node_working_dir.exists() {
384                fs_err::create_dir(&node_working_dir)?;
385            }
386
387            std::env::set_current_dir(node_working_dir.clone())?;
388            runtime_services.sh.change_dir(node_working_dir);
389
390            if can_merge {
391                log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
392            } else {
393                log::info!(
394                    // green color
395                    "\x1B[0;32m=== {} ({}) ===\x1B[0m",
396                    label,
397                    node_handle.modpath(),
398                );
399            }
400            code(&mut runtime_services)?;
401            if can_merge {
402                log::debug!("done!");
403                log::debug!(""); // log a newline, for the pretty
404            } else {
405                log::info!("\x1B[0;32m=== done! ===\x1B[0m");
406                log::info!(""); // log a newline, for the pretty
407            }
408        }
409
410        // Leave the last node's working dir so it can be deleted by later steps
411        std::env::set_current_dir(&out_dir)?;
412    }
413
414    Ok(())
415}
416
417fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
418    fs_err::create_dir_all(&dst)?;
419    for entry in fs_err::read_dir(src.as_ref())? {
420        let entry = entry?;
421        let ty = entry.file_type()?;
422        if ty.is_dir() {
423            copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
424        } else {
425            fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
426        }
427    }
428    Ok(())
429}