1use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use flowey_core::pipeline::internal::Parameter;
20use petgraph::prelude::NodeIndex;
21use petgraph::visit::EdgeRef;
22use std::collections::BTreeSet;
23use std::path::Path;
24use std::path::PathBuf;
25
26struct ResolvedRunnableStep {
27 node_handle: NodeHandle,
28 label: String,
29 code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30 idx: usize,
31 can_merge: bool,
32}
33
34pub fn direct_run(
36 pipeline: ResolvedPipeline,
37 windows_as_wsl: bool,
38 out_dir: PathBuf,
39 persist_dir: PathBuf,
40) -> anyhow::Result<()> {
41 direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
42
43 if out_dir.join(".job_artifacts").exists() {
45 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
46 }
47 if out_dir.join(".work").exists() {
48 fs_err::remove_dir_all(out_dir.join(".work"))?;
49 }
50
51 Ok(())
52}
53
54fn direct_run_do_work(
55 pipeline: ResolvedPipeline,
56 windows_as_wsl: bool,
57 out_dir: PathBuf,
58 persist_dir: PathBuf,
59) -> anyhow::Result<()> {
60 fs_err::create_dir_all(&out_dir)?;
61 let out_dir = std::path::absolute(out_dir)?;
62
63 fs_err::create_dir_all(&persist_dir)?;
64 let persist_dir = std::path::absolute(persist_dir)?;
65
66 let ResolvedPipeline {
67 graph,
68 order,
69 parameters,
70 ado_name: _,
71 ado_schedule_triggers: _,
72 ado_ci_triggers: _,
73 ado_pr_triggers: _,
74 ado_bootstrap_template: _,
75 ado_resources_repository: _,
76 ado_post_process_yaml_cb: _,
77 ado_variables: _,
78 ado_job_id_overrides: _,
79 gh_name: _,
80 gh_schedule_triggers: _,
81 gh_ci_triggers: _,
82 gh_pr_triggers: _,
83 gh_bootstrap_template: _,
84 } = pipeline;
85
86 let mut skipped_jobs = BTreeSet::new();
87
88 for idx in order {
89 let ResolvedPipelineJob {
90 ref root_nodes,
91 ref patches,
92 ref label,
93 platform,
94 arch,
95 cond_param_idx,
96 ado_pool: _,
97 ado_variables: _,
98 gh_override_if: _,
99 gh_global_env: _,
100 gh_pool: _,
101 gh_permissions: _,
102 ref external_read_vars,
103 ref parameters_used,
104 ref artifacts_used,
105 ref artifacts_published,
106 } = graph[idx];
107
108 log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
110 log::info!("");
111
112 if graph
113 .edges_directed(idx, petgraph::Direction::Incoming)
114 .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
115 {
116 log::error!("job depends on job that was skipped. skipping job...");
117 log::info!("");
118 skipped_jobs.insert(idx);
119 continue;
120 }
121
122 let flow_arch = FlowArch::host(PipelineBackendHint::Local);
123 match (arch, flow_arch) {
124 (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
125 _ => {
126 log::error!("mismatch between job arch and local arch. skipping job...");
127 continue;
128 }
129 }
130
131 let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
132 let platform_ok = match (platform, flow_platform) {
133 (FlowPlatform::Windows, FlowPlatform::Windows) => true,
134 (FlowPlatform::Windows, FlowPlatform::Linux(_)) if windows_as_wsl => true,
135 (FlowPlatform::Linux(_), FlowPlatform::Linux(_)) => true,
136 (FlowPlatform::MacOs, FlowPlatform::MacOs) => true,
137 _ => false,
138 };
139
140 if !platform_ok {
141 log::error!("mismatch between job platform and local platform. skipping job...");
142 log::info!("");
143 if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
144 log::warn!("###");
145 log::warn!("### NOTE: detected that you're running in WSL2");
146 log::warn!(
147 "### if the the pipeline supports it, you can try passing --windows-as-wsl"
148 );
149 log::warn!("###");
150 log::info!("");
151 }
152 skipped_jobs.insert(idx);
153 continue;
154 }
155
156 let nodes = {
157 let mut resolved_local_steps = Vec::new();
158
159 let (mut output_graph, _, err_unreachable_nodes) =
160 crate::flow_resolver::stage1_dag::stage1_dag(
161 FlowBackend::Local,
162 platform,
163 flow_arch,
164 patches.clone(),
165 root_nodes
166 .clone()
167 .into_iter()
168 .map(|(node, requests)| (node, (true, requests)))
169 .collect(),
170 external_read_vars.clone(),
171 Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
172 )?;
173
174 if err_unreachable_nodes.is_some() {
175 anyhow::bail!("detected unreachable nodes")
176 }
177
178 let output_order = petgraph::algo::toposort(&output_graph, None)
179 .map_err(|e| {
180 format!(
181 "includes node {}",
182 output_graph[e.node_id()].0.node.modpath()
183 )
184 })
185 .expect("runtime variables cannot introduce a DAG cycle");
186
187 for idx in output_order.into_iter().rev() {
188 let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
189
190 let (label, code, idx, can_merge) = match step {
191 Step::Anchor { .. } => continue,
192 Step::Rust {
193 label,
194 code,
195 idx,
196 can_merge,
197 } => (label, code, idx, can_merge),
198 Step::AdoYaml { .. } => {
199 anyhow::bail!(
200 "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
201 node_handle.modpath()
202 )
203 }
204 Step::GitHubYaml { .. } => {
205 anyhow::bail!(
206 "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
207 node_handle.modpath()
208 )
209 }
210 };
211
212 resolved_local_steps.push(ResolvedRunnableStep {
213 node_handle,
214 label,
215 code: code.lock().take().unwrap(),
216 idx,
217 can_merge,
218 });
219 }
220
221 resolved_local_steps
222 };
223
224 let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
225
226 for ResolvedJobUseParameter {
227 flowey_var,
228 pipeline_param_idx,
229 } in parameters_used
230 {
231 let (desc, value) = match ¶meters[*pipeline_param_idx] {
232 Parameter::Bool {
233 name: _,
234 description,
235 kind: _,
236 default,
237 } => (
238 description,
239 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
240 ),
241 Parameter::String {
242 name: _,
243 description,
244 kind: _,
245 default,
246 possible_values: _,
247 } => (
248 description,
249 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
250 ),
251 Parameter::Num {
252 name: _,
253 description,
254 kind: _,
255 default,
256 possible_values: _,
257 } => (
258 description,
259 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
260 ),
261 };
262
263 let Some(value) = value else {
264 anyhow::bail!(
265 "pipeline must specify default value for params when running locally. missing default for '{desc}'"
266 )
267 };
268
269 in_mem_var_db.set_var(flowey_var, false, value);
270 }
271
272 in_mem_var_db.set_var(
273 VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
274 false,
275 serde_json::to_string(&persist_dir).unwrap().into(),
276 );
277
278 for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
279 let path = out_dir.join("artifacts").join(name);
280 fs_err::create_dir_all(&path)?;
281
282 in_mem_var_db.set_var(
283 flowey_var,
284 false,
285 serde_json::to_string(&path).unwrap().into(),
286 );
287 }
288
289 if out_dir.join(".job_artifacts").exists() {
290 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
291 }
292 fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
293
294 for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
295 let path = out_dir.join(".job_artifacts").join(name);
296 fs_err::create_dir_all(&path)?;
297 copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
298
299 in_mem_var_db.set_var(
300 flowey_var,
301 false,
302 serde_json::to_string(&path).unwrap().into(),
303 );
304 }
305
306 if out_dir.join(".work").exists() {
307 fs_err::remove_dir_all(out_dir.join(".work"))?;
308 }
309 fs_err::create_dir_all(out_dir.join(".work"))?;
310
311 let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
312 &mut in_mem_var_db,
313 FlowBackend::Local,
314 platform,
315 flow_arch,
316 );
317
318 if let Some(cond_param_idx) = cond_param_idx {
319 let Parameter::Bool {
320 name: _,
321 description: _,
322 kind: _,
323 default,
324 } = ¶meters[cond_param_idx]
325 else {
326 panic!("cond param is guaranteed to be bool by type system")
327 };
328
329 let Some(should_run) = default else {
330 anyhow::bail!(
331 "when running locally, job condition parameter must include a default value"
332 )
333 };
334
335 if !should_run {
336 log::warn!("job condition was false - skipping job...");
337 continue;
338 }
339 }
340
341 for ResolvedRunnableStep {
342 node_handle,
343 label,
344 code,
345 idx,
346 can_merge,
347 } in nodes
348 {
349 let node_working_dir = out_dir.join(".work").join(format!(
350 "{}_{}",
351 node_handle.modpath().replace("::", "__"),
352 idx
353 ));
354 if !node_working_dir.exists() {
355 fs_err::create_dir(&node_working_dir)?;
356 }
357 std::env::set_current_dir(node_working_dir)?;
358
359 if can_merge {
360 log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
361 } else {
362 log::info!(
363 "\x1B[0;32m=== {} ({}) ===\x1B[0m",
365 label,
366 node_handle.modpath(),
367 );
368 }
369 code(&mut runtime_services)?;
370 if can_merge {
371 log::debug!("done!");
372 log::debug!(""); } else {
374 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
375 log::info!(""); }
377 }
378 }
379
380 Ok(())
381}
382
383fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
384 fs_err::create_dir_all(&dst)?;
385 for entry in fs_err::read_dir(src.as_ref())? {
386 let entry = entry?;
387 let ty = entry.file_type()?;
388 if ty.is_dir() {
389 copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
390 } else {
391 fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
392 }
393 }
394 Ok(())
395}