1use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use flowey_core::pipeline::internal::Parameter;
20use petgraph::prelude::NodeIndex;
21use petgraph::visit::EdgeRef;
22use std::collections::BTreeSet;
23use std::path::Path;
24use std::path::PathBuf;
25
26struct ResolvedRunnableStep {
27 node_handle: NodeHandle,
28 label: String,
29 code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30 idx: usize,
31 can_merge: bool,
32}
33
34pub fn direct_run(
36 pipeline: ResolvedPipeline,
37 windows_as_wsl: bool,
38 out_dir: PathBuf,
39 persist_dir: PathBuf,
40) -> anyhow::Result<()> {
41 direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
42
43 if out_dir.join(".job_artifacts").exists() {
45 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
46 }
47 if out_dir.join(".work").exists() {
48 fs_err::remove_dir_all(out_dir.join(".work"))?;
49 }
50
51 Ok(())
52}
53
54fn direct_run_do_work(
55 pipeline: ResolvedPipeline,
56 windows_as_wsl: bool,
57 out_dir: PathBuf,
58 persist_dir: PathBuf,
59) -> anyhow::Result<()> {
60 fs_err::create_dir_all(&out_dir)?;
61 let out_dir = std::path::absolute(out_dir)?;
62
63 fs_err::create_dir_all(&persist_dir)?;
64 let persist_dir = std::path::absolute(persist_dir)?;
65
66 let ResolvedPipeline {
67 graph,
68 order,
69 parameters,
70 ado_name: _,
71 ado_schedule_triggers: _,
72 ado_ci_triggers: _,
73 ado_pr_triggers: _,
74 ado_bootstrap_template: _,
75 ado_resources_repository: _,
76 ado_post_process_yaml_cb: _,
77 ado_variables: _,
78 ado_job_id_overrides: _,
79 gh_name: _,
80 gh_schedule_triggers: _,
81 gh_ci_triggers: _,
82 gh_pr_triggers: _,
83 gh_bootstrap_template: _,
84 } = pipeline;
85
86 let mut skipped_jobs = BTreeSet::new();
87
88 for idx in order {
89 let ResolvedPipelineJob {
90 ref root_nodes,
91 ref patches,
92 ref label,
93 platform,
94 arch,
95 cond_param_idx,
96 timeout_minutes: _,
97 ref command_wrapper,
98 ado_pool: _,
99 ado_variables: _,
100 gh_override_if: _,
101 gh_global_env: _,
102 gh_pool: _,
103 gh_permissions: _,
104 ref external_read_vars,
105 ref parameters_used,
106 ref artifacts_used,
107 ref artifacts_published,
108 } = graph[idx];
109
110 log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
112 log::info!("");
113
114 if graph
115 .edges_directed(idx, petgraph::Direction::Incoming)
116 .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
117 {
118 log::error!("job depends on job that was skipped. skipping job...");
119 log::info!("");
120 skipped_jobs.insert(idx);
121 continue;
122 }
123
124 let flow_arch = FlowArch::host(PipelineBackendHint::Local);
125 match (arch, flow_arch) {
126 (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
127 _ => {
128 log::error!("mismatch between job arch and local arch. skipping job...");
129 skipped_jobs.insert(idx);
130 continue;
131 }
132 }
133
134 let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
135 let platform_ok = match (platform, flow_platform) {
136 (FlowPlatform::Windows, FlowPlatform::Windows) => true,
137 (FlowPlatform::Windows, FlowPlatform::Linux(_)) if windows_as_wsl => true,
138 (FlowPlatform::Linux(_), FlowPlatform::Linux(_)) => true,
139 (FlowPlatform::MacOs, FlowPlatform::MacOs) => true,
140 _ => false,
141 };
142
143 if !platform_ok {
144 log::error!("mismatch between job platform and local platform. skipping job...");
145 log::info!("");
146 if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
147 log::warn!("###");
148 log::warn!("### NOTE: detected that you're running in WSL2");
149 log::warn!(
150 "### if the the pipeline supports it, you can try passing --windows-as-wsl"
151 );
152 log::warn!("###");
153 log::info!("");
154 }
155 skipped_jobs.insert(idx);
156 continue;
157 }
158
159 let runtime_platform = if windows_as_wsl
164 && matches!(platform, FlowPlatform::Windows)
165 && matches!(flow_platform, FlowPlatform::Linux(_))
166 {
167 flow_platform
168 } else {
169 platform
170 };
171
172 let nodes = {
173 let mut resolved_local_steps = Vec::new();
174
175 let (mut output_graph, _, err_unreachable_nodes) =
176 crate::flow_resolver::stage1_dag::stage1_dag(
177 FlowBackend::Local,
178 runtime_platform,
179 arch,
180 patches.clone(),
181 root_nodes
182 .clone()
183 .into_iter()
184 .map(|(node, requests)| (node, (true, requests)))
185 .collect(),
186 external_read_vars.clone(),
187 Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
188 )?;
189
190 if err_unreachable_nodes.is_some() {
191 anyhow::bail!("detected unreachable nodes")
192 }
193
194 let output_order = petgraph::algo::toposort(&output_graph, None)
195 .map_err(|e| {
196 format!(
197 "includes node {}",
198 output_graph[e.node_id()].0.node.modpath()
199 )
200 })
201 .expect("runtime variables cannot introduce a DAG cycle");
202
203 for idx in output_order.into_iter().rev() {
204 let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
205
206 let (label, code, idx, can_merge) = match step {
207 Step::Anchor { .. } => continue,
208 Step::Rust {
209 label,
210 code,
211 idx,
212 can_merge,
213 } => (label, code, idx, can_merge),
214 Step::AdoYaml { .. } => {
215 anyhow::bail!(
216 "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
217 node_handle.modpath()
218 )
219 }
220 Step::GitHubYaml { .. } => {
221 anyhow::bail!(
222 "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
223 node_handle.modpath()
224 )
225 }
226 };
227
228 resolved_local_steps.push(ResolvedRunnableStep {
229 node_handle,
230 label,
231 code: code.lock().take().unwrap(),
232 idx,
233 can_merge,
234 });
235 }
236
237 resolved_local_steps
238 };
239
240 let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
241
242 for ResolvedJobUseParameter {
243 flowey_var,
244 pipeline_param_idx,
245 } in parameters_used
246 {
247 log::trace!(
248 "resolving parameter idx {}, flowey_var {:?}",
249 pipeline_param_idx,
250 flowey_var
251 );
252 let (desc, value) = match ¶meters[*pipeline_param_idx] {
253 Parameter::Bool {
254 name: _,
255 description,
256 kind: _,
257 default,
258 } => (
259 description,
260 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
261 ),
262 Parameter::String {
263 name: _,
264 description,
265 kind: _,
266 default,
267 possible_values: _,
268 } => (
269 description,
270 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
271 ),
272 Parameter::Num {
273 name: _,
274 description,
275 kind: _,
276 default,
277 possible_values: _,
278 } => (
279 description,
280 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
281 ),
282 };
283
284 let Some(value) = value else {
285 anyhow::bail!(
286 "pipeline must specify default value for params when running locally. missing default for '{desc}'"
287 )
288 };
289
290 in_mem_var_db.set_var(flowey_var, false, value);
291 }
292
293 in_mem_var_db.set_var(
294 VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
295 false,
296 serde_json::to_string(&persist_dir).unwrap().into(),
297 );
298
299 for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
300 let path = out_dir.join("artifacts").join(name);
301 fs_err::create_dir_all(&path)?;
302
303 in_mem_var_db.set_var(
304 flowey_var,
305 false,
306 serde_json::to_string(&path).unwrap().into(),
307 );
308 }
309
310 if out_dir.join(".job_artifacts").exists() {
311 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
312 }
313 fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
314
315 for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
316 let path = out_dir.join(".job_artifacts").join(name);
317 fs_err::create_dir_all(&path)?;
318 copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
319
320 in_mem_var_db.set_var(
321 flowey_var,
322 false,
323 serde_json::to_string(&path).unwrap().into(),
324 );
325 }
326
327 if out_dir.join(".work").exists() {
328 fs_err::remove_dir_all(out_dir.join(".work"))?;
329 }
330 fs_err::create_dir_all(out_dir.join(".work"))?;
331
332 if let Some(cond_param_idx) = cond_param_idx {
333 let Parameter::Bool {
334 name,
335 description: _,
336 kind: _,
337 default: _,
338 } = ¶meters[cond_param_idx]
339 else {
340 panic!("cond param is guaranteed to be bool by type system")
341 };
342
343 let (data, _secret) = in_mem_var_db.get_var(name);
345 let should_run: bool = serde_json::from_slice(&data).unwrap();
346
347 if !should_run {
348 log::warn!("job condition was false - skipping job...");
349 skipped_jobs.insert(idx);
350 continue;
351 }
352 }
353
354 let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
355 &mut in_mem_var_db,
356 FlowBackend::Local,
357 runtime_platform,
358 arch,
359 )?;
360
361 if let Some(wrapper) = command_wrapper {
362 runtime_services.sh.set_wrapper(Some(wrapper.clone()));
363 }
364
365 for ResolvedRunnableStep {
366 node_handle,
367 label,
368 code,
369 idx,
370 can_merge,
371 } in nodes
372 {
373 let node_working_dir = out_dir.join(".work").join(format!(
374 "{}_{}",
375 node_handle.modpath().replace("::", "__"),
376 idx
377 ));
378 if !node_working_dir.exists() {
379 fs_err::create_dir(&node_working_dir)?;
380 }
381
382 std::env::set_current_dir(node_working_dir.clone())?;
383 runtime_services.sh.change_dir(node_working_dir);
384
385 if can_merge {
386 log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
387 } else {
388 log::info!(
389 "\x1B[0;32m=== {} ({}) ===\x1B[0m",
391 label,
392 node_handle.modpath(),
393 );
394 }
395 code(&mut runtime_services)?;
396 if can_merge {
397 log::debug!("done!");
398 log::debug!(""); } else {
400 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
401 log::info!(""); }
403 }
404
405 std::env::set_current_dir(&out_dir)?;
407 }
408
409 Ok(())
410}
411
412fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
413 fs_err::create_dir_all(&dst)?;
414 for entry in fs_err::read_dir(src.as_ref())? {
415 let entry = entry?;
416 let ty = entry.file_type()?;
417 if ty.is_dir() {
418 copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
419 } else {
420 fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
421 }
422 }
423 Ok(())
424}