1use crate::cli::exec_snippet::VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR;
5use crate::flow_resolver::stage1_dag::OutputGraphEntry;
6use crate::flow_resolver::stage1_dag::Step;
7use crate::pipeline_resolver::generic::ResolvedJobArtifact;
8use crate::pipeline_resolver::generic::ResolvedJobUseParameter;
9use crate::pipeline_resolver::generic::ResolvedPipeline;
10use crate::pipeline_resolver::generic::ResolvedPipelineJob;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowBackend;
13use flowey_core::node::FlowPlatform;
14use flowey_core::node::NodeHandle;
15use flowey_core::node::RuntimeVarDb;
16use flowey_core::node::steps::rust::RustRuntimeServices;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use flowey_core::pipeline::internal::Parameter;
20use petgraph::prelude::NodeIndex;
21use petgraph::visit::EdgeRef;
22use std::collections::BTreeSet;
23use std::path::Path;
24use std::path::PathBuf;
25
26struct ResolvedRunnableStep {
27 node_handle: NodeHandle,
28 label: String,
29 code: Box<dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static>,
30 idx: usize,
31 can_merge: bool,
32}
33
34pub fn direct_run(
36 pipeline: ResolvedPipeline,
37 windows_as_wsl: bool,
38 out_dir: PathBuf,
39 persist_dir: PathBuf,
40) -> anyhow::Result<()> {
41 direct_run_do_work(pipeline, windows_as_wsl, out_dir.clone(), persist_dir)?;
42
43 if out_dir.join(".job_artifacts").exists() {
45 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
46 }
47 if out_dir.join(".work").exists() {
48 fs_err::remove_dir_all(out_dir.join(".work"))?;
49 }
50
51 Ok(())
52}
53
54fn direct_run_do_work(
55 pipeline: ResolvedPipeline,
56 windows_as_wsl: bool,
57 out_dir: PathBuf,
58 persist_dir: PathBuf,
59) -> anyhow::Result<()> {
60 fs_err::create_dir_all(&out_dir)?;
61 let out_dir = std::path::absolute(out_dir)?;
62
63 fs_err::create_dir_all(&persist_dir)?;
64 let persist_dir = std::path::absolute(persist_dir)?;
65
66 let ResolvedPipeline {
67 graph,
68 order,
69 parameters,
70 ado_name: _,
71 ado_schedule_triggers: _,
72 ado_ci_triggers: _,
73 ado_pr_triggers: _,
74 ado_bootstrap_template: _,
75 ado_resources_repository: _,
76 ado_post_process_yaml_cb: _,
77 ado_variables: _,
78 ado_job_id_overrides: _,
79 gh_name: _,
80 gh_schedule_triggers: _,
81 gh_ci_triggers: _,
82 gh_pr_triggers: _,
83 gh_bootstrap_template: _,
84 } = pipeline;
85
86 let mut skipped_jobs = BTreeSet::new();
87
88 for idx in order {
89 let ResolvedPipelineJob {
90 ref root_nodes,
91 ref patches,
92 ref label,
93 platform,
94 arch,
95 cond_param_idx,
96 timeout_minutes: _,
97 ado_pool: _,
98 ado_variables: _,
99 gh_override_if: _,
100 gh_global_env: _,
101 gh_pool: _,
102 gh_permissions: _,
103 ref external_read_vars,
104 ref parameters_used,
105 ref artifacts_used,
106 ref artifacts_published,
107 } = graph[idx];
108
109 log::info!("\x1B[0;33m### job: {label} ###\x1B[0m");
111 log::info!("");
112
113 if graph
114 .edges_directed(idx, petgraph::Direction::Incoming)
115 .any(|e| skipped_jobs.contains(&NodeIndex::from(e.source().index() as u32)))
116 {
117 log::error!("job depends on job that was skipped. skipping job...");
118 log::info!("");
119 skipped_jobs.insert(idx);
120 continue;
121 }
122
123 let flow_arch = FlowArch::host(PipelineBackendHint::Local);
124 match (arch, flow_arch) {
125 (FlowArch::X86_64, FlowArch::X86_64) | (FlowArch::Aarch64, FlowArch::Aarch64) => (),
126 _ => {
127 log::error!("mismatch between job arch and local arch. skipping job...");
128 continue;
129 }
130 }
131
132 let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
133 let platform_ok = match (platform, flow_platform) {
134 (FlowPlatform::Windows, FlowPlatform::Windows) => true,
135 (FlowPlatform::Windows, FlowPlatform::Linux(_)) if windows_as_wsl => true,
136 (FlowPlatform::Linux(_), FlowPlatform::Linux(_)) => true,
137 (FlowPlatform::MacOs, FlowPlatform::MacOs) => true,
138 _ => false,
139 };
140
141 if !platform_ok {
142 log::error!("mismatch between job platform and local platform. skipping job...");
143 log::info!("");
144 if crate::running_in_wsl() && matches!(platform, FlowPlatform::Windows) {
145 log::warn!("###");
146 log::warn!("### NOTE: detected that you're running in WSL2");
147 log::warn!(
148 "### if the the pipeline supports it, you can try passing --windows-as-wsl"
149 );
150 log::warn!("###");
151 log::info!("");
152 }
153 skipped_jobs.insert(idx);
154 continue;
155 }
156
157 let nodes = {
158 let mut resolved_local_steps = Vec::new();
159
160 let (mut output_graph, _, err_unreachable_nodes) =
161 crate::flow_resolver::stage1_dag::stage1_dag(
162 FlowBackend::Local,
163 platform,
164 flow_arch,
165 patches.clone(),
166 root_nodes
167 .clone()
168 .into_iter()
169 .map(|(node, requests)| (node, (true, requests)))
170 .collect(),
171 external_read_vars.clone(),
172 Some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.into()),
173 )?;
174
175 if err_unreachable_nodes.is_some() {
176 anyhow::bail!("detected unreachable nodes")
177 }
178
179 let output_order = petgraph::algo::toposort(&output_graph, None)
180 .map_err(|e| {
181 format!(
182 "includes node {}",
183 output_graph[e.node_id()].0.node.modpath()
184 )
185 })
186 .expect("runtime variables cannot introduce a DAG cycle");
187
188 for idx in output_order.into_iter().rev() {
189 let OutputGraphEntry { node_handle, step } = output_graph[idx].1.take().unwrap();
190
191 let (label, code, idx, can_merge) = match step {
192 Step::Anchor { .. } => continue,
193 Step::Rust {
194 label,
195 code,
196 idx,
197 can_merge,
198 } => (label, code, idx, can_merge),
199 Step::AdoYaml { .. } => {
200 anyhow::bail!(
201 "{} emitted ADO YAML. Fix the node by checking `ctx.backend()` appropriately",
202 node_handle.modpath()
203 )
204 }
205 Step::GitHubYaml { .. } => {
206 anyhow::bail!(
207 "{} emitted GitHub YAML. Fix the node by checking `ctx.backend()` appropriately",
208 node_handle.modpath()
209 )
210 }
211 };
212
213 resolved_local_steps.push(ResolvedRunnableStep {
214 node_handle,
215 label,
216 code: code.lock().take().unwrap(),
217 idx,
218 can_merge,
219 });
220 }
221
222 resolved_local_steps
223 };
224
225 let mut in_mem_var_db = crate::var_db::in_memory::InMemoryVarDb::new();
226
227 for ResolvedJobUseParameter {
228 flowey_var,
229 pipeline_param_idx,
230 } in parameters_used
231 {
232 log::trace!(
233 "resolving parameter idx {}, flowey_var {:?}",
234 pipeline_param_idx,
235 flowey_var
236 );
237 let (desc, value) = match ¶meters[*pipeline_param_idx] {
238 Parameter::Bool {
239 name: _,
240 description,
241 kind: _,
242 default,
243 } => (
244 description,
245 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
246 ),
247 Parameter::String {
248 name: _,
249 description,
250 kind: _,
251 default,
252 possible_values: _,
253 } => (
254 description,
255 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
256 ),
257 Parameter::Num {
258 name: _,
259 description,
260 kind: _,
261 default,
262 possible_values: _,
263 } => (
264 description,
265 default.as_ref().map(|v| serde_json::to_vec(v).unwrap()),
266 ),
267 };
268
269 let Some(value) = value else {
270 anyhow::bail!(
271 "pipeline must specify default value for params when running locally. missing default for '{desc}'"
272 )
273 };
274
275 in_mem_var_db.set_var(flowey_var, false, value);
276 }
277
278 in_mem_var_db.set_var(
279 VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR,
280 false,
281 serde_json::to_string(&persist_dir).unwrap().into(),
282 );
283
284 for ResolvedJobArtifact { flowey_var, name } in artifacts_published {
285 let path = out_dir.join("artifacts").join(name);
286 fs_err::create_dir_all(&path)?;
287
288 in_mem_var_db.set_var(
289 flowey_var,
290 false,
291 serde_json::to_string(&path).unwrap().into(),
292 );
293 }
294
295 if out_dir.join(".job_artifacts").exists() {
296 fs_err::remove_dir_all(out_dir.join(".job_artifacts"))?;
297 }
298 fs_err::create_dir_all(out_dir.join(".job_artifacts"))?;
299
300 for ResolvedJobArtifact { flowey_var, name } in artifacts_used {
301 let path = out_dir.join(".job_artifacts").join(name);
302 fs_err::create_dir_all(&path)?;
303 copy_dir_all(out_dir.join("artifacts").join(name), &path)?;
304
305 in_mem_var_db.set_var(
306 flowey_var,
307 false,
308 serde_json::to_string(&path).unwrap().into(),
309 );
310 }
311
312 if out_dir.join(".work").exists() {
313 fs_err::remove_dir_all(out_dir.join(".work"))?;
314 }
315 fs_err::create_dir_all(out_dir.join(".work"))?;
316
317 if let Some(cond_param_idx) = cond_param_idx {
318 let Parameter::Bool {
319 name,
320 description: _,
321 kind: _,
322 default: _,
323 } = ¶meters[cond_param_idx]
324 else {
325 panic!("cond param is guaranteed to be bool by type system")
326 };
327
328 let (data, _secret) = in_mem_var_db.get_var(name);
330 let should_run: bool = serde_json::from_slice(&data).unwrap();
331
332 if !should_run {
333 log::warn!("job condition was false - skipping job...");
334 continue;
335 }
336 }
337
338 let mut runtime_services = flowey_core::node::steps::rust::new_rust_runtime_services(
339 &mut in_mem_var_db,
340 FlowBackend::Local,
341 platform,
342 flow_arch,
343 );
344
345 for ResolvedRunnableStep {
346 node_handle,
347 label,
348 code,
349 idx,
350 can_merge,
351 } in nodes
352 {
353 let node_working_dir = out_dir.join(".work").join(format!(
354 "{}_{}",
355 node_handle.modpath().replace("::", "__"),
356 idx
357 ));
358 if !node_working_dir.exists() {
359 fs_err::create_dir(&node_working_dir)?;
360 }
361 std::env::set_current_dir(node_working_dir)?;
362
363 if can_merge {
364 log::debug!("minor step: {} ({})", label, node_handle.modpath(),);
365 } else {
366 log::info!(
367 "\x1B[0;32m=== {} ({}) ===\x1B[0m",
369 label,
370 node_handle.modpath(),
371 );
372 }
373 code(&mut runtime_services)?;
374 if can_merge {
375 log::debug!("done!");
376 log::debug!(""); } else {
378 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
379 log::info!(""); }
381 }
382 }
383
384 Ok(())
385}
386
387fn copy_dir_all(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
388 fs_err::create_dir_all(&dst)?;
389 for entry in fs_err::read_dir(src.as_ref())? {
390 let entry = entry?;
391 let ty = entry.file_type()?;
392 if ty.is_dir() {
393 copy_dir_all(entry.path(), dst.as_ref().join(entry.file_name()))?;
394 } else {
395 fs_err::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
396 }
397 }
398 Ok(())
399}