1use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::internal::Parameter;
18use petgraph::prelude::NodeIndex;
19use petgraph::visit::EdgeRef;
20use std::collections::BTreeSet;
21use std::path::Path;
22use std::path::PathBuf;
23
24pub struct ResolvedRunnableNode {
25 pub node_handle: NodeHandle,
26 pub steps: Vec<(
27 usize,
28 String,
29 Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30 )>,
31}
32
33pub fn direct_run(
35 pipeline: ResolvedPipeline,
36 windows_as_wsl: bool,
37 out_dir: PathBuf,
38 persist_dir: PathBuf,
39) -> anyhow::Result<()> {
40 direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
41
42 if out_dir.join(".job_artifacts").exists() {
44 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
45 }
46 if out_dir.join(".work").exists() {
47 fs_err::remove_dir_all(out_dir.join(".work"))?;
48 }
49
50 Ok(())
51}
52
53fn direct_run_do_work(
54 pipeline: ResolvedPipeline,
55 windows_as_wsl: bool,
56 out_dir: PathBuf,
57 persist_dir: PathBuf,
58) -> anyhow::Result<()> {
59 fs_err::create_dir_all(&out_dir)?;
60 let out_dir = std::path::absolute(out_dir)?;
61
62 fs_err::create_dir_all(&persist_dir)?;
63 let persist_dir = std::path::absolute(persist_dir)?;
64
65 let ResolvedPipeline {
66 graph,
67 order,
68 parameters,
69 ado_name: _,
70 ado_schedule_triggers: _,
71 ado_ci_triggers: _,
72 ado_pr_triggers: _,
73 ado_bootstrap_template: _,
74 ado_resources_repository: _,
75 ado_post_process_yaml_cb: _,
76 ado_variables: _,
77 ado_job_id_overrides: _,
78 gh_name: _,
79 gh_schedule_triggers: _,
80 gh_ci_triggers: _,
81 gh_pr_triggers: _,
82 gh_bootstrap_template: _,
83 } = pipeline;
84
85 let mut skipped_jobs = BTreeSet::new();
86
87 for idx in order {
88 let ResolvedPipelineJob {
89 ref root_nodes,
90 ref patches,
91 ref label,
92 platform,
93 arch,
94 cond_param_idx,
95 ado_pool: _,
96 ado_variables: _,
97 gh_override_if: _,
98 gh_global_env: _,
99 gh_pool: _,
100 gh_permissions: _,
101 ref external_read_vars,
102 ref parameters_used,
103 ref artifacts_used,
104 ref artifacts_published,
105 } = graph[idx];
106
107 log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
109 log::info!("");
110
111 if graph
112 .edges_directed(idx, petgraph::Direction::Incoming)
113 .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
114 {
115 log::error!("job depends on job that was skipped. skipping job...");
116 log::info!("");
117 skipped_jobs.insert(idx);
118 continue;
119 }
120
121 let flow_arch = if cfg!(target_arch = "x86_64") {
123 FlowArch::X86_64
124 } else if cfg!(target_arch = "aarch64") {
126 FlowArch::Aarch64
127 } else {
128 unreachable!("flowey only runs on X86_64 or Aarch64 at the moment")
129 };
130
131 match (arch, flow_arch) {
132 (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
133 _ => {
134 log::error!("mismatch between job arch and local arch. skipping job...");
135 continue;
136 }
137 }
138
139 let platform_ok = match platform {
140 FlowPlatform::Windows => cfg!(windows) || (cfg!(target_os = "linux") && windows_as_wsl),
141 FlowPlatform::Linux(_) => cfg!(target_os = "linux"),
142 FlowPlatform::MacOs => cfg!(target_os = "macos"),
143 platform => panic!("unknown platform {platform}"),
144 };
145
146 if !platform_ok {
147 log::error!("mismatch between job platform and local platform. skipping job...");
148 log::info!("");
149 if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
150 log::warn!("###");
151 log::warn!("### NOTE: detected that you're running in WSL2");
152 log::warn!(
153 "### if the the pipeline supports it, you can try passing --windows-as-wsl"
154 );
155 log::warn!("###");
156 log::info!("");
157 }
158 skipped_jobs.insert(idx);
159 continue;
160 }
161
162 let nodes = {
163 let mut resolved_local_nodes = Vec::new();
164
165 let (mut output_graph, _, err_unreachable_nodes) =
166 crate::flow_resolver::stage1_dag::stage1_dag(
167 FlowBackend::Local,
168 platform,
169 flow_arch,
170 patches.clone(),
171 root_nodes
172 .clone()
173 .into_iter()
174 .map(|(node, requests)| (node, (true, requests)))
175 .collect(),
176 external_read_vars.clone(),
177 Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
178 )?;
179
180 if err_unreachable_nodes.is_some() {
181 anyhow::bail!("detected unreachable nodes")
182 }
183
184 let output_order = petgraph::algo::toposort(&output_graph, None)
185 .map_err(|e| {
186 format!(
187 "includes node {}",
188 output_graph[e.node_id()].0.node.modpath()
189 )
190 })
191 .expect("runtime variables cannot introduce a DAG cycle");
192
193 for idx in output_order.into_iter().rev() {
194 let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
195
196 let mut steps = Vec::new();
197 let (label, code, idx) = match step {
198 Step::Anchor { .. } => continue,
199 Step::Rust {
200 label,
201 code,
202 idx,
203 can_merge: _,
204 } => (label, code, idx),
205 Step::AdoYaml { .. } => {
206 anyhow::bail!(
207 "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
208 node_handle.modpath()
209 )
210 }
211 Step::GitHubYaml { .. } => {
212 anyhow::bail!(
213 "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
214 node_handle.modpath()
215 )
216 }
217 };
218 steps.push((idx, label, code.lock().take().unwrap()));
219
220 resolved_local_nodes.push(ResolvedRunnableNode { node_handle, steps })
221 }
222
223 resolved_local_nodes
224 };
225
226 let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
227
228 for ResolvedJobUseParameter {
229 flowey_var,
230 pipeline_param_idx,
231 } in parameters_used
232 {
233 let (desc, value) = match ¶meters[*pipeline_param_idx] {
234 Parameter::Bool {
235 name: _,
236 description,
237 kind: _,
238 default,
239 } => (
240 description,
241 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
242 ),
243 Parameter::String {
244 name: _,
245 description,
246 kind: _,
247 default,
248 possible_values: _,
249 } => (
250 description,
251 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
252 ),
253 Parameter::Num {
254 name: _,
255 description,
256 kind: _,
257 default,
258 possible_values: _,
259 } => (
260 description,
261 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
262 ),
263 };
264
265 let Some(value) = value else {
266 anyhow::bail!(
267 "pipeline must specify default value for params when running locally. missing default for '{desc}'"
268 )
269 };
270
271 in_mem_var_db.set_var(flowey_var, false, value);
272 }
273
274 in_mem_var_db.set_var(
275 VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
276 false,
277 serde_json::to_string(&persist_dir).unwrap().into(),
278 );
279
280 for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
281 let path = out_dir.join("artifacts").join(name);
282 fs_err::create_dir_all(&path)?;
283
284 in_mem_var_db.set_var(
285 flowey_var,
286 false,
287 serde_json::to_string(&path).unwrap().into(),
288 );
289 }
290
291 if out_dir.join(".job_artifacts").exists() {
292 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
293 }
294 fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
295
296 for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
297 let path = out_dir.join(".job_artifacts").join(name);
298 fs_err::create_dir_all(&path)?;
299 copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
300
301 in_mem_var_db.set_var(
302 flowey_var,
303 false,
304 serde_json::to_string(&path).unwrap().into(),
305 );
306 }
307
308 if out_dir.join(".work").exists() {
309 fs_err::remove_dir_all(out_dir.join(".work"))?;
310 }
311 fs_err::create_dir_all(out_dir.join(".work"))?;
312
313 let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
314 &mut in_mem_var_db,
315 FlowBackend::Local,
316 platform,
317 flow_arch,
318 );
319
320 if let Some(cond_param_idx) = cond_param_idx {
321 let Parameter::Bool {
322 name: _,
323 description: _,
324 kind: _,
325 default,
326 } = ¶meters[cond_param_idx]
327 else {
328 panic!("cond param is guaranteed to be bool by type system")
329 };
330
331 let Some(should_run) = default else {
332 anyhow::bail!(
333 "when running locally, job condition parameter must include a default value"
334 )
335 };
336
337 if !should_run {
338 log::warn!("job condition was false - skipping job...");
339 continue;
340 }
341 }
342
343 for ResolvedRunnableNode { node_handle, steps } in nodes {
344 for (idx, label, code) in steps {
345 let node_working_dir = out_dir.join(".work").join(format!(
346 "{}_{}",
347 node_handle.modpath().replace("::", "__"),
348 idx
349 ));
350 if !node_working_dir.exists() {
351 fs_err::create_dir(&node_working_dir)?;
352 }
353 std::env::set_current_dir(node_working_dir)?;
354
355 log::info!(
356 "\x1B[0;32m=== {} ({}) ===\x1B[0m",
358 label,
359 node_handle.modpath(),
360 );
361 code(&mut runtime_services)?;
362 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
363 log::info!(""); }
365 }
366 }
367
368 Ok(())
369}
370
371fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
372 fs_err::create_dir_all(&dst)?;
373 for entry in fs_err::read_dir(src.as_ref())? {
374 let entry = entry?;
375 let ty = entry.file_type()?;
376 if ty.is_dir() {
377 copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
378 } else {
379 fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
380 }
381 }
382 Ok(())
383}