1use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use flowey_core::pipeline::internal::Parameter;
20use petgraph::prelude::NodeIndex;
21use petgraph::visit::EdgeRef;
22use std::collections::BTreeSet;
23use std::path::Path;
24use std::path::PathBuf;
25
26struct ResolvedRunnableStep {
27 node_handle: NodeHandle,
28 label: String,
29 code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30 idx: usize,
31 can_merge: bool,
32}
33
34pub fn direct_run(
36 pipeline: ResolvedPipeline,
37 windows_as_wsl: bool,
38 out_dir: PathBuf,
39 persist_dir: PathBuf,
40) -> anyhow::Result<()> {
41 direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
42
43 if out_dir.join(".job_artifacts").exists() {
45 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
46 }
47 if out_dir.join(".work").exists() {
48 fs_err::remove_dir_all(out_dir.join(".work"))?;
49 }
50
51 Ok(())
52}
53
54fn direct_run_do_work(
55 pipeline: ResolvedPipeline,
56 windows_as_wsl: bool,
57 out_dir: PathBuf,
58 persist_dir: PathBuf,
59) -> anyhow::Result<()> {
60 fs_err::create_dir_all(&out_dir)?;
61 let out_dir = std::path::absolute(out_dir)?;
62
63 fs_err::create_dir_all(&persist_dir)?;
64 let persist_dir = std::path::absolute(persist_dir)?;
65
66 let ResolvedPipeline {
67 graph,
68 order,
69 parameters,
70 ado_name: _,
71 ado_schedule_triggers: _,
72 ado_ci_triggers: _,
73 ado_pr_triggers: _,
74 ado_bootstrap_template: _,
75 ado_resources_repository: _,
76 ado_post_process_yaml_cb: _,
77 ado_variables: _,
78 ado_job_id_overrides: _,
79 gh_name: _,
80 gh_schedule_triggers: _,
81 gh_ci_triggers: _,
82 gh_pr_triggers: _,
83 gh_bootstrap_template: _,
84 } = pipeline;
85
86 let mut skipped_jobs = BTreeSet::new();
87
88 for idx in order {
89 let ResolvedPipelineJob {
90 ref root_nodes,
91 ref patches,
92 ref label,
93 platform,
94 arch,
95 cond_param_idx,
96 timeout_minutes: _,
97 ado_pool: _,
98 ado_variables: _,
99 gh_override_if: _,
100 gh_global_env: _,
101 gh_pool: _,
102 gh_permissions: _,
103 ref external_read_vars,
104 ref parameters_used,
105 ref artifacts_used,
106 ref artifacts_published,
107 } = graph[idx];
108
109 log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
111 log::info!("");
112
113 if graph
114 .edges_directed(idx, petgraph::Direction::Incoming)
115 .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
116 {
117 log::error!("job depends on job that was skipped. skipping job...");
118 log::info!("");
119 skipped_jobs.insert(idx);
120 continue;
121 }
122
123 let flow_arch = FlowArch::host(PipelineBackendHint::Local);
124 match (arch, flow_arch) {
125 (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
126 _ => {
127 log::error!("mismatch between job arch and local arch. skipping job...");
128 skipped_jobs.insert(idx);
129 continue;
130 }
131 }
132
133 let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
134 let platform_ok = match (platform, flow_platform) {
135 (FlowPlatform::Windows, FlowPlatform::Windows) => true,
136 (FlowPlatform::Windows, FlowPlatform::Linux(_)) if windows_as_wsl => true,
137 (FlowPlatform::Linux(_), FlowPlatform::Linux(_)) => true,
138 (FlowPlatform::MacOs, FlowPlatform::MacOs) => true,
139 _ => false,
140 };
141
142 if !platform_ok {
143 log::error!("mismatch between job platform and local platform. skipping job...");
144 log::info!("");
145 if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
146 log::warn!("###");
147 log::warn!("### NOTE: detected that you're running in WSL2");
148 log::warn!(
149 "### if the the pipeline supports it, you can try passing --windows-as-wsl"
150 );
151 log::warn!("###");
152 log::info!("");
153 }
154 skipped_jobs.insert(idx);
155 continue;
156 }
157
158 let nodes = {
159 let mut resolved_local_steps = Vec::new();
160
161 let (mut output_graph, _, err_unreachable_nodes) =
162 crate::flow_resolver::stage1_dag::stage1_dag(
163 FlowBackend::Local,
164 platform,
165 flow_arch,
166 patches.clone(),
167 root_nodes
168 .clone()
169 .into_iter()
170 .map(|(node, requests)| (node, (true, requests)))
171 .collect(),
172 external_read_vars.clone(),
173 Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
174 )?;
175
176 if err_unreachable_nodes.is_some() {
177 anyhow::bail!("detected unreachable nodes")
178 }
179
180 let output_order = petgraph::algo::toposort(&output_graph, None)
181 .map_err(|e| {
182 format!(
183 "includes node {}",
184 output_graph[e.node_id()].0.node.modpath()
185 )
186 })
187 .expect("runtime variables cannot introduce a DAG cycle");
188
189 for idx in output_order.into_iter().rev() {
190 let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
191
192 let (label, code, idx, can_merge) = match step {
193 Step::Anchor { .. } => continue,
194 Step::Rust {
195 label,
196 code,
197 idx,
198 can_merge,
199 } => (label, code, idx, can_merge),
200 Step::AdoYaml { .. } => {
201 anyhow::bail!(
202 "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
203 node_handle.modpath()
204 )
205 }
206 Step::GitHubYaml { .. } => {
207 anyhow::bail!(
208 "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
209 node_handle.modpath()
210 )
211 }
212 };
213
214 resolved_local_steps.push(ResolvedRunnableStep {
215 node_handle,
216 label,
217 code: code.lock().take().unwrap(),
218 idx,
219 can_merge,
220 });
221 }
222
223 resolved_local_steps
224 };
225
226 let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
227
228 for ResolvedJobUseParameter {
229 flowey_var,
230 pipeline_param_idx,
231 } in parameters_used
232 {
233 log::trace!(
234 "resolving parameter idx {}, flowey_var {:?}",
235 pipeline_param_idx,
236 flowey_var
237 );
238 let (desc, value) = match ¶meters[*pipeline_param_idx] {
239 Parameter::Bool {
240 name: _,
241 description,
242 kind: _,
243 default,
244 } => (
245 description,
246 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
247 ),
248 Parameter::String {
249 name: _,
250 description,
251 kind: _,
252 default,
253 possible_values: _,
254 } => (
255 description,
256 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
257 ),
258 Parameter::Num {
259 name: _,
260 description,
261 kind: _,
262 default,
263 possible_values: _,
264 } => (
265 description,
266 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
267 ),
268 };
269
270 let Some(value) = value else {
271 anyhow::bail!(
272 "pipeline must specify default value for params when running locally. missing default for '{desc}'"
273 )
274 };
275
276 in_mem_var_db.set_var(flowey_var, false, value);
277 }
278
279 in_mem_var_db.set_var(
280 VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
281 false,
282 serde_json::to_string(&persist_dir).unwrap().into(),
283 );
284
285 for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
286 let path = out_dir.join("artifacts").join(name);
287 fs_err::create_dir_all(&path)?;
288
289 in_mem_var_db.set_var(
290 flowey_var,
291 false,
292 serde_json::to_string(&path).unwrap().into(),
293 );
294 }
295
296 if out_dir.join(".job_artifacts").exists() {
297 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
298 }
299 fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
300
301 for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
302 let path = out_dir.join(".job_artifacts").join(name);
303 fs_err::create_dir_all(&path)?;
304 copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
305
306 in_mem_var_db.set_var(
307 flowey_var,
308 false,
309 serde_json::to_string(&path).unwrap().into(),
310 );
311 }
312
313 if out_dir.join(".work").exists() {
314 fs_err::remove_dir_all(out_dir.join(".work"))?;
315 }
316 fs_err::create_dir_all(out_dir.join(".work"))?;
317
318 if let Some(cond_param_idx) = cond_param_idx {
319 let Parameter::Bool {
320 name,
321 description: _,
322 kind: _,
323 default: _,
324 } = ¶meters[cond_param_idx]
325 else {
326 panic!("cond param is guaranteed to be bool by type system")
327 };
328
329 let (data, _secret) = in_mem_var_db.get_var(name);
331 let should_run: bool = serde_json::from_slice(&data).unwrap();
332
333 if !should_run {
334 log::warn!("job condition was false - skipping job...");
335 skipped_jobs.insert(idx);
336 continue;
337 }
338 }
339
340 let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
341 &mut in_mem_var_db,
342 FlowBackend::Local,
343 platform,
344 flow_arch,
345 );
346
347 for ResolvedRunnableStep {
348 node_handle,
349 label,
350 code,
351 idx,
352 can_merge,
353 } in nodes
354 {
355 let node_working_dir = out_dir.join(".work").join(format!(
356 "{}_{}",
357 node_handle.modpath().replace("::", "__"),
358 idx
359 ));
360 if !node_working_dir.exists() {
361 fs_err::create_dir(&node_working_dir)?;
362 }
363 std::env::set_current_dir(node_working_dir)?;
364
365 if can_merge {
366 log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
367 } else {
368 log::info!(
369 "\x1B[0;32m=== {} ({}) ===\x1B[0m",
371 label,
372 node_handle.modpath(),
373 );
374 }
375 code(&mut runtime_services)?;
376 if can_merge {
377 log::debug!("done!");
378 log::debug!(""); } else {
380 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
381 log::info!(""); }
383 }
384
385 std::env::set_current_dir(&out_dir)?;
387 }
388
389 Ok(())
390}
391
392fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
393 fs_err::create_dir_all(&dst)?;
394 for entry in fs_err::read_dir(src.as_ref())? {
395 let entry = entry?;
396 let ty = entry.file_type()?;
397 if ty.is_dir() {
398 copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
399 } else {
400 fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
401 }
402 }
403 Ok(())
404}