1use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use flowey_core::pipeline::internal::Parameter;
20use petgraph::prelude::NodeIndex;
21use petgraph::visit::EdgeRef;
22use std::collections::BTreeSet;
23use std::path::Path;
24use std::path::PathBuf;
25
26struct ResolvedRunnableStep {
27 node_handle: NodeHandle,
28 label: String,
29 code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30 idx: usize,
31 can_merge: bool,
32}
33
34pub fn direct_run(
36 pipeline: ResolvedPipeline,
37 windows_as_wsl: bool,
38 out_dir: PathBuf,
39 persist_dir: PathBuf,
40) -> anyhow::Result<()> {
41 direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
42
43 if out_dir.join(".job_artifacts").exists() {
45 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
46 }
47 if out_dir.join(".work").exists() {
48 fs_err::remove_dir_all(out_dir.join(".work"))?;
49 }
50
51 Ok(())
52}
53
54fn direct_run_do_work(
55 pipeline: ResolvedPipeline,
56 windows_as_wsl: bool,
57 out_dir: PathBuf,
58 persist_dir: PathBuf,
59) -> anyhow::Result<()> {
60 fs_err::create_dir_all(&out_dir)?;
61 let out_dir = std::path::absolute(out_dir)?;
62
63 fs_err::create_dir_all(&persist_dir)?;
64 let persist_dir = std::path::absolute(persist_dir)?;
65
66 let ResolvedPipeline {
67 graph,
68 order,
69 parameters,
70 ado_name: _,
71 ado_schedule_triggers: _,
72 ado_ci_triggers: _,
73 ado_pr_triggers: _,
74 ado_bootstrap_template: _,
75 ado_resources_repository: _,
76 ado_post_process_yaml_cb: _,
77 ado_variables: _,
78 ado_job_id_overrides: _,
79 gh_name: _,
80 gh_schedule_triggers: _,
81 gh_ci_triggers: _,
82 gh_pr_triggers: _,
83 gh_bootstrap_template: _,
84 } = pipeline;
85
86 let mut skipped_jobs = BTreeSet::new();
87
88 for idx in order {
89 let ResolvedPipelineJob {
90 ref root_nodes,
91 ref root_configs,
92 ref patches,
93 ref label,
94 platform,
95 arch,
96 cond_param_idx,
97 timeout_minutes: _,
98 ref command_wrapper,
99 ado_pool: _,
100 ado_variables: _,
101 gh_override_if: _,
102 gh_global_env: _,
103 gh_pool: _,
104 gh_permissions: _,
105 ref external_read_vars,
106 ref parameters_used,
107 ref artifacts_used,
108 ref artifacts_published,
109 } = graph[idx];
110
111 log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
113 log::info!("");
114
115 if graph
116 .edges_directed(idx, petgraph::Direction::Incoming)
117 .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
118 {
119 log::error!("job depends on job that was skipped. skipping job...");
120 log::info!("");
121 skipped_jobs.insert(idx);
122 continue;
123 }
124
125 let flow_arch = FlowArch::host(PipelineBackendHint::Local);
126 match (arch, flow_arch) {
127 (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
128 _ => {
129 log::error!("mismatch between job arch and local arch. skipping job...");
130 skipped_jobs.insert(idx);
131 continue;
132 }
133 }
134
135 let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
136 let platform_ok = match (platform, flow_platform) {
137 (FlowPlatform::Windows, FlowPlatform::Windows) => true,
138 (FlowPlatform::Windows, FlowPlatform::Linux(_)) if windows_as_wsl => true,
139 (FlowPlatform::Linux(_), FlowPlatform::Linux(_)) => true,
140 (FlowPlatform::MacOs, FlowPlatform::MacOs) => true,
141 _ => false,
142 };
143
144 if !platform_ok {
145 log::error!("mismatch between job platform and local platform. skipping job...");
146 log::info!("");
147 if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
148 log::warn!("###");
149 log::warn!("### NOTE: detected that you're running in WSL2");
150 log::warn!(
151 "### if the the pipeline supports it, you can try passing --windows-as-wsl"
152 );
153 log::warn!("###");
154 log::info!("");
155 }
156 skipped_jobs.insert(idx);
157 continue;
158 }
159
160 let runtime_platform = if windows_as_wsl
165 && matches!(platform, FlowPlatform::Windows)
166 && matches!(flow_platform, FlowPlatform::Linux(_))
167 {
168 flow_platform
169 } else {
170 platform
171 };
172
173 let nodes = {
174 let mut resolved_local_steps = Vec::new();
175
176 let crate::flow_resolver::stage1_dag::Stage1DagOutput {
177 mut output_graph,
178 found_unreachable_nodes,
179 ..
180 } = crate::flow_resolver::stage1_dag::stage1_dag(
181 FlowBackend::Local,
182 runtime_platform,
183 arch,
184 patches.clone(),
185 root_nodes
186 .clone()
187 .into_iter()
188 .map(|(node, requests)| (node, (true, requests)))
189 .collect(),
190 root_configs.clone(),
191 external_read_vars.clone(),
192 Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
193 )?;
194
195 if found_unreachable_nodes {
196 anyhow::bail!("detected unreachable nodes")
197 }
198
199 let output_order = petgraph::algo::toposort(&output_graph, None)
200 .map_err(|e| {
201 format!(
202 "includes node {}",
203 output_graph[e.node_id()].0.node.modpath()
204 )
205 })
206 .expect("runtime variables cannot introduce a DAG cycle");
207
208 for idx in output_order.into_iter().rev() {
209 let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
210
211 let (label, code, idx, can_merge) = match step {
212 Step::Anchor { .. } => continue,
213 Step::Rust {
214 label,
215 code,
216 idx,
217 can_merge,
218 } => (label, code, idx, can_merge),
219 Step::AdoYaml { .. } => {
220 anyhow::bail!(
221 "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
222 node_handle.modpath()
223 )
224 }
225 Step::GitHubYaml { .. } => {
226 anyhow::bail!(
227 "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
228 node_handle.modpath()
229 )
230 }
231 };
232
233 resolved_local_steps.push(ResolvedRunnableStep {
234 node_handle,
235 label,
236 code: code.lock().take().unwrap(),
237 idx,
238 can_merge,
239 });
240 }
241
242 resolved_local_steps
243 };
244
245 let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
246
247 for ResolvedJobUseParameter {
248 flowey_var,
249 pipeline_param_idx,
250 } in parameters_used
251 {
252 log::trace!(
253 "resolving parameter idx {}, flowey_var {:?}",
254 pipeline_param_idx,
255 flowey_var
256 );
257 let (desc, value) = match ¶meters[*pipeline_param_idx] {
258 Parameter::Bool {
259 name: _,
260 description,
261 kind: _,
262 default,
263 } => (
264 description,
265 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
266 ),
267 Parameter::String {
268 name: _,
269 description,
270 kind: _,
271 default,
272 possible_values: _,
273 } => (
274 description,
275 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
276 ),
277 Parameter::Num {
278 name: _,
279 description,
280 kind: _,
281 default,
282 possible_values: _,
283 } => (
284 description,
285 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
286 ),
287 };
288
289 let Some(value) = value else {
290 anyhow::bail!(
291 "pipeline must specify default value for params when running locally. missing default for '{desc}'"
292 )
293 };
294
295 in_mem_var_db.set_var(flowey_var, false, value);
296 }
297
298 in_mem_var_db.set_var(
299 VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
300 false,
301 serde_json::to_string(&persist_dir).unwrap().into(),
302 );
303
304 for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
305 let path = out_dir.join("artifacts").join(name);
306 fs_err::create_dir_all(&path)?;
307
308 in_mem_var_db.set_var(
309 flowey_var,
310 false,
311 serde_json::to_string(&path).unwrap().into(),
312 );
313 }
314
315 if out_dir.join(".job_artifacts").exists() {
316 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
317 }
318 fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
319
320 for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
321 let path = out_dir.join(".job_artifacts").join(name);
322 fs_err::create_dir_all(&path)?;
323 copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
324
325 in_mem_var_db.set_var(
326 flowey_var,
327 false,
328 serde_json::to_string(&path).unwrap().into(),
329 );
330 }
331
332 if out_dir.join(".work").exists() {
333 fs_err::remove_dir_all(out_dir.join(".work"))?;
334 }
335 fs_err::create_dir_all(out_dir.join(".work"))?;
336
337 if let Some(cond_param_idx) = cond_param_idx {
338 let Parameter::Bool {
339 name,
340 description: _,
341 kind: _,
342 default: _,
343 } = ¶meters[cond_param_idx]
344 else {
345 panic!("cond param is guaranteed to be bool by type system")
346 };
347
348 let (data, _secret) = in_mem_var_db.get_var(name);
350 let should_run: bool = serde_json::from_slice(&data).unwrap();
351
352 if !should_run {
353 log::warn!("job condition was false - skipping job...");
354 skipped_jobs.insert(idx);
355 continue;
356 }
357 }
358
359 let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
360 &mut in_mem_var_db,
361 FlowBackend::Local,
362 runtime_platform,
363 arch,
364 )?;
365
366 if let Some(wrapper) = command_wrapper {
367 runtime_services.sh.set_wrapper(Some(wrapper.clone()));
368 }
369
370 for ResolvedRunnableStep {
371 node_handle,
372 label,
373 code,
374 idx,
375 can_merge,
376 } in nodes
377 {
378 let node_working_dir = out_dir.join(".work").join(format!(
379 "{}_{}",
380 node_handle.modpath().replace("::", "__"),
381 idx
382 ));
383 if !node_working_dir.exists() {
384 fs_err::create_dir(&node_working_dir)?;
385 }
386
387 std::env::set_current_dir(node_working_dir.clone())?;
388 runtime_services.sh.change_dir(node_working_dir);
389
390 if can_merge {
391 log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
392 } else {
393 log::info!(
394 "\x1B[0;32m=== {} ({}) ===\x1B[0m",
396 label,
397 node_handle.modpath(),
398 );
399 }
400 code(&mut runtime_services)?;
401 if can_merge {
402 log::debug!("done!");
403 log::debug!(""); } else {
405 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
406 log::info!(""); }
408 }
409
410 std::env::set_current_dir(&out_dir)?;
412 }
413
414 Ok(())
415}
416
417fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
418 fs_err::create_dir_all(&dst)?;
419 for entry in fs_err::read_dir(src.as_ref())? {
420 let entry = entry?;
421 let ty = entry.file_type()?;
422 if ty.is_dir() {
423 copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
424 } else {
425 fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
426 }
427 }
428 Ok(())
429}