flowey_cli/pipeline_resolver/
direct_run.rs1use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::internal::Parameter;
18use petgraph::prelude::NodeIndex;
19use petgraph::visit::EdgeRef;
20use std::collections::BTreeSet;
21use std::path::Path;
22use std::path::PathBuf;
23
24struct ResolvedRunnableStep {
25 node_handle: NodeHandle,
26 label: String,
27 code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
28 idx: usize,
29 can_merge: bool,
30}
31
32pub fn direct_run(
34 pipeline: ResolvedPipeline,
35 windows_as_wsl: bool,
36 out_dir: PathBuf,
37 persist_dir: PathBuf,
38) -> anyhow::Result<()> {
39 direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
40
41 if out_dir.join(".job_artifacts").exists() {
43 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
44 }
45 if out_dir.join(".work").exists() {
46 fs_err::remove_dir_all(out_dir.join(".work"))?;
47 }
48
49 Ok(())
50}
51
52fn direct_run_do_work(
53 pipeline: ResolvedPipeline,
54 windows_as_wsl: bool,
55 out_dir: PathBuf,
56 persist_dir: PathBuf,
57) -> anyhow::Result<()> {
58 fs_err::create_dir_all(&out_dir)?;
59 let out_dir = std::path::absolute(out_dir)?;
60
61 fs_err::create_dir_all(&persist_dir)?;
62 let persist_dir = std::path::absolute(persist_dir)?;
63
64 let ResolvedPipeline {
65 graph,
66 order,
67 parameters,
68 ado_name: _,
69 ado_schedule_triggers: _,
70 ado_ci_triggers: _,
71 ado_pr_triggers: _,
72 ado_bootstrap_template: _,
73 ado_resources_repository: _,
74 ado_post_process_yaml_cb: _,
75 ado_variables: _,
76 ado_job_id_overrides: _,
77 gh_name: _,
78 gh_schedule_triggers: _,
79 gh_ci_triggers: _,
80 gh_pr_triggers: _,
81 gh_bootstrap_template: _,
82 } = pipeline;
83
84 let mut skipped_jobs = BTreeSet::new();
85
86 for idx in order {
87 let ResolvedPipelineJob {
88 ref root_nodes,
89 ref patches,
90 ref label,
91 platform,
92 arch,
93 cond_param_idx,
94 ado_pool: _,
95 ado_variables: _,
96 gh_override_if: _,
97 gh_global_env: _,
98 gh_pool: _,
99 gh_permissions: _,
100 ref external_read_vars,
101 ref parameters_used,
102 ref artifacts_used,
103 ref artifacts_published,
104 } = graph[idx];
105
106 log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
108 log::info!("");
109
110 if graph
111 .edges_directed(idx, petgraph::Direction::Incoming)
112 .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
113 {
114 log::error!("job depends on job that was skipped. skipping job...");
115 log::info!("");
116 skipped_jobs.insert(idx);
117 continue;
118 }
119
120 let flow_arch = if cfg!(target_arch = "x86_64") {
122 FlowArch::X86_64
123 } else if cfg!(target_arch = "aarch64") {
125 FlowArch::Aarch64
126 } else {
127 unreachable!("flowey only runs on X86_64 or Aarch64 at the moment")
128 };
129
130 match (arch, flow_arch) {
131 (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
132 _ => {
133 log::error!("mismatch between job arch and local arch. skipping job...");
134 continue;
135 }
136 }
137
138 let platform_ok = match platform {
139 FlowPlatform::Windows => cfg!(windows) || (cfg!(target_os = "linux") && windows_as_wsl),
140 FlowPlatform::Linux(_) => cfg!(target_os = "linux"),
141 FlowPlatform::MacOs => cfg!(target_os = "macos"),
142 platform => panic!("unknown platform {platform}"),
143 };
144
145 if !platform_ok {
146 log::error!("mismatch between job platform and local platform. skipping job...");
147 log::info!("");
148 if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
149 log::warn!("###");
150 log::warn!("### NOTE: detected that you're running in WSL2");
151 log::warn!(
152 "### if the the pipeline supports it, you can try passing --windows-as-wsl"
153 );
154 log::warn!("###");
155 log::info!("");
156 }
157 skipped_jobs.insert(idx);
158 continue;
159 }
160
161 let nodes = {
162 let mut resolved_local_steps = Vec::new();
163
164 let (mut output_graph, _, err_unreachable_nodes) =
165 crate::flow_resolver::stage1_dag::stage1_dag(
166 FlowBackend::Local,
167 platform,
168 flow_arch,
169 patches.clone(),
170 root_nodes
171 .clone()
172 .into_iter()
173 .map(|(node, requests)| (node, (true, requests)))
174 .collect(),
175 external_read_vars.clone(),
176 Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
177 )?;
178
179 if err_unreachable_nodes.is_some() {
180 anyhow::bail!("detected unreachable nodes")
181 }
182
183 let output_order = petgraph::algo::toposort(&output_graph, None)
184 .map_err(|e| {
185 format!(
186 "includes node {}",
187 output_graph[e.node_id()].0.node.modpath()
188 )
189 })
190 .expect("runtime variables cannot introduce a DAG cycle");
191
192 for idx in output_order.into_iter().rev() {
193 let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
194
195 let (label, code, idx, can_merge) = match step {
196 Step::Anchor { .. } => continue,
197 Step::Rust {
198 label,
199 code,
200 idx,
201 can_merge,
202 } => (label, code, idx, can_merge),
203 Step::AdoYaml { .. } => {
204 anyhow::bail!(
205 "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
206 node_handle.modpath()
207 )
208 }
209 Step::GitHubYaml { .. } => {
210 anyhow::bail!(
211 "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
212 node_handle.modpath()
213 )
214 }
215 };
216
217 resolved_local_steps.push(ResolvedRunnableStep {
218 node_handle,
219 label,
220 code: code.lock().take().unwrap(),
221 idx,
222 can_merge,
223 });
224 }
225
226 resolved_local_steps
227 };
228
229 let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
230
231 for ResolvedJobUseParameter {
232 flowey_var,
233 pipeline_param_idx,
234 } in parameters_used
235 {
236 let (desc, value) = match ¶meters[*pipeline_param_idx] {
237 Parameter::Bool {
238 name: _,
239 description,
240 kind: _,
241 default,
242 } => (
243 description,
244 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
245 ),
246 Parameter::String {
247 name: _,
248 description,
249 kind: _,
250 default,
251 possible_values: _,
252 } => (
253 description,
254 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
255 ),
256 Parameter::Num {
257 name: _,
258 description,
259 kind: _,
260 default,
261 possible_values: _,
262 } => (
263 description,
264 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
265 ),
266 };
267
268 let Some(value) = value else {
269 anyhow::bail!(
270 "pipeline must specify default value for params when running locally. missing default for '{desc}'"
271 )
272 };
273
274 in_mem_var_db.set_var(flowey_var, false, value);
275 }
276
277 in_mem_var_db.set_var(
278 VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
279 false,
280 serde_json::to_string(&persist_dir).unwrap().into(),
281 );
282
283 for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
284 let path = out_dir.join("artifacts").join(name);
285 fs_err::create_dir_all(&path)?;
286
287 in_mem_var_db.set_var(
288 flowey_var,
289 false,
290 serde_json::to_string(&path).unwrap().into(),
291 );
292 }
293
294 if out_dir.join(".job_artifacts").exists() {
295 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
296 }
297 fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
298
299 for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
300 let path = out_dir.join(".job_artifacts").join(name);
301 fs_err::create_dir_all(&path)?;
302 copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
303
304 in_mem_var_db.set_var(
305 flowey_var,
306 false,
307 serde_json::to_string(&path).unwrap().into(),
308 );
309 }
310
311 if out_dir.join(".work").exists() {
312 fs_err::remove_dir_all(out_dir.join(".work"))?;
313 }
314 fs_err::create_dir_all(out_dir.join(".work"))?;
315
316 let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
317 &mut in_mem_var_db,
318 FlowBackend::Local,
319 platform,
320 flow_arch,
321 );
322
323 if let Some(cond_param_idx) = cond_param_idx {
324 let Parameter::Bool {
325 name: _,
326 description: _,
327 kind: _,
328 default,
329 } = ¶meters[cond_param_idx]
330 else {
331 panic!("cond param is guaranteed to be bool by type system")
332 };
333
334 let Some(should_run) = default else {
335 anyhow::bail!(
336 "when running locally, job condition parameter must include a default value"
337 )
338 };
339
340 if !should_run {
341 log::warn!("job condition was false - skipping job...");
342 continue;
343 }
344 }
345
346 for ResolvedRunnableStep {
347 node_handle,
348 label,
349 code,
350 idx,
351 can_merge,
352 } in nodes
353 {
354 let node_working_dir = out_dir.join(".work").join(format!(
355 "{}_{}",
356 node_handle.modpath().replace("::", "__"),
357 idx
358 ));
359 if !node_working_dir.exists() {
360 fs_err::create_dir(&node_working_dir)?;
361 }
362 std::env::set_current_dir(node_working_dir)?;
363
364 if can_merge {
365 log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
366 } else {
367 log::info!(
368 "\x1B[0;32m=== {} ({}) ===\x1B[0m",
370 label,
371 node_handle.modpath(),
372 );
373 }
374 code(&mut runtime_services)?;
375 if can_merge {
376 log::debug!("done!");
377 log::debug!(""); } else {
379 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
380 log::info!(""); }
382 }
383 }
384
385 Ok(())
386}
387
388fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
389 fs_err::create_dir_all(&dst)?;
390 for entry in fs_err::read_dir(src.as_ref())? {
391 let entry = entry?;
392 let ty = entry.file_type()?;
393 if ty.is_dir() {
394 copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
395 } else {
396 fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
397 }
398 }
399 Ok(())
400}