1use crate::cli::FlowBackendCli;
5use anyhow::Context;
6use flowey_core::node::FlowArch;
7use flowey_core::node::FlowBackend;
8use flowey_core::node::FlowPlatform;
9use flowey_core::node::GhOutput;
10use flowey_core::node::GhToRust;
11use flowey_core::node::NodeHandle;
12use flowey_core::node::RustToGh;
13use flowey_core::node::steps::rust::RustRuntimeServices;
14use flowey_core::node::user_facing::ClaimedGhParam;
15use flowey_core::node::user_facing::GhPermission;
16use flowey_core::node::user_facing::GhPermissionValue;
17use serde::Deserialize;
18use serde::Serialize;
19use std::collections::BTreeMap;
20use std::path::PathBuf;
21
22pub fn construct_exec_snippet_cli(
23 flowey_bin: &str,
24 node_modpath: &str,
25 snippet_idx: usize,
26 job_idx: usize,
27) -> String {
28 format!(r#"{flowey_bin} e {job_idx} {node_modpath} {snippet_idx}"#)
29}
30
31#[derive(clap::Args)]
33pub struct ExecSnippet {
34 pub(crate) job_idx: usize,
36
37 node_modpath_and_snippet_idx: Vec<String>,
38
39 #[clap(long)]
41 dry_run: bool,
42}
43
44pub const VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR: &str = "_internal_WORKING_DIR";
45pub const VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR: &str = "_internal_PERSISTENT_STORAGE_DIR";
46
47impl ExecSnippet {
48 pub fn run(self) -> anyhow::Result<()> {
49 let Self {
50 node_modpath_and_snippet_idx,
51 job_idx,
52 dry_run,
53 } = self;
54
55 let mut runtime_var_db = super::var_db::open_var_db(job_idx)?;
56
57 let working_dir: PathBuf = {
58 let Some((working_dir, _)) =
59 runtime_var_db.try_get_var(VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR)
60 else {
61 anyhow::bail!("var db was not seeded with {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR}");
62 };
63 serde_json::from_slice::<String>(&working_dir)
64 .context(format!(
65 "found {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR} in db, but it wasn't a json string!"
66 ))?
67 .into()
68 };
69
70 let FloweyPipelineStaticDb {
71 flow_backend,
72 var_db_backend_kind: _,
73 job_reqs,
74 job_configs,
75 job_command_wrappers,
76 job_platforms,
77 job_archs,
78 } = {
79 let current_exe = std::env::current_exe()
80 .context("failed to get path to current flowey executable")?;
81 let pipeline_static_db =
82 fs_err::File::open(current_exe.with_file_name("pipeline.json"))?;
83 serde_json::from_reader(pipeline_static_db)?
84 };
85
86 let flow_platform = *job_platforms
87 .get(&job_idx)
88 .context("invalid job_idx: missing platform")?;
89 let flow_arch = *job_archs
90 .get(&job_idx)
91 .context("invalid job_idx: missing arch")?;
92
93 let command_wrapper = job_command_wrappers.get(&job_idx).cloned();
94
95 for [node_modpath, snippet_idx] in node_modpath_and_snippet_idx
96 .chunks_exact(2)
97 .map(|x| -> [String; 2] { x.to_vec().try_into().unwrap() })
98 {
99 let snippet_idx = snippet_idx.parse::<usize>().unwrap();
100
101 let raw_json_reqs: Vec<Box<[u8]>> = job_reqs
102 .get(&job_idx)
103 .context("invalid job_idx")?
104 .get(&node_modpath)
105 .context("pipeline db did not include data for specified node")?
106 .iter()
107 .map(|v| v.0.clone())
108 .collect::<Vec<_>>();
109
110 let raw_config_bytes: Vec<Box<[u8]>> = job_configs
111 .get(&job_idx)
112 .and_then(|m| m.get(&node_modpath))
113 .map(|v| v.iter().map(|v| v.0.clone()).collect())
114 .unwrap_or_default();
115
116 let Some(node_handle) = NodeHandle::try_from_modpath(&node_modpath) else {
117 anyhow::bail!("could not find node with that name")
118 };
119
120 let mut node = node_handle.new_erased_node();
121
122 {
124 let snippet_working_dir = working_dir.join(format!(
125 "{}_{}",
126 node_handle.modpath().replace("::", "__"),
127 snippet_idx
128 ));
129 if !snippet_working_dir.exists() {
130 fs_err::create_dir_all(&snippet_working_dir)?;
131 }
132 log::trace!(
133 "Setting current working directory from {:?} to {:?}",
134 std::env::current_dir()?,
135 snippet_working_dir
136 );
137 std::env::set_current_dir(snippet_working_dir)?;
138 }
139
140 let persistent_storage_dir_var = runtime_var_db
142 .try_get_var(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR)
143 .is_some()
144 .then_some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.to_owned());
145
146 let mut rust_runtime_services =
147 flowey_core::node::steps::rust::new_rust_runtime_services(
148 &mut runtime_var_db,
149 flow_backend.into(),
150 flow_platform,
151 flow_arch,
152 )?;
153
154 if let Some(ref wrapper) = command_wrapper {
155 rust_runtime_services.sh.set_wrapper(Some(wrapper.clone()));
156 }
157
158 let mut ctx_backend = ExecSnippetCtx::new(
159 flow_backend.into(),
160 flow_platform,
161 flow_arch,
162 node_handle,
163 snippet_idx,
164 dry_run,
165 persistent_storage_dir_var,
166 &mut rust_runtime_services,
167 );
168
169 let mut ctx = flowey_core::node::new_node_ctx(&mut ctx_backend);
170 node.emit(raw_config_bytes, raw_json_reqs.clone(), &mut ctx)?;
171
172 match ctx_backend.into_result() {
173 Some(res) => res?,
174 None => {
175 if dry_run {
176 } else {
178 anyhow::bail!("snippet wasn't run (invalid index)")
179 }
180 }
181 }
182 }
183
184 std::env::set_current_dir(working_dir)?;
186
187 Ok(())
188 }
189}
190
191pub struct ExecSnippetCtx<'a, 'b> {
192 flow_backend: FlowBackend,
193 flow_platform: FlowPlatform,
194 flow_arch: FlowArch,
195 node_handle: NodeHandle,
196 rust_runtime_services: &'a mut RustRuntimeServices<'b>,
197 idx_tracker: usize,
198 var_tracker: usize,
199 target_idx: usize,
200 dry_run: bool,
201 persistent_storage_dir_var: Option<String>,
202 result: Option<anyhow::Result<()>>,
203}
204
205impl<'a, 'b> ExecSnippetCtx<'a, 'b> {
206 pub fn new(
207 flow_backend: FlowBackend,
208 flow_platform: FlowPlatform,
209 flow_arch: FlowArch,
210 node_handle: NodeHandle,
211 target_idx: usize,
212 dry_run: bool,
213 persistent_storage_dir_var: Option<String>,
214 rust_runtime_services: &'a mut RustRuntimeServices<'b>,
215 ) -> Self {
216 Self {
217 flow_backend,
218 flow_platform,
219 flow_arch,
220 node_handle,
221 rust_runtime_services,
222 var_tracker: 0,
223 idx_tracker: 0,
224 target_idx,
225 dry_run,
226 persistent_storage_dir_var,
227 result: None,
228 }
229 }
230
231 pub fn into_result(self) -> Option<anyhow::Result<()>> {
232 self.result
233 }
234}
235
236impl flowey_core::node::NodeCtxBackend for ExecSnippetCtx<'_, '_> {
237 fn on_request(&mut self, _node_handle: NodeHandle, _req: anyhow::Result<Box<[u8]>>) {
238 }
240
241 fn on_config(&mut self, _node_handle: NodeHandle, _config: anyhow::Result<Box<[u8]>>) {
242 }
244
245 fn on_new_var(&mut self) -> String {
246 let v = self.var_tracker;
247 self.var_tracker += 1;
248 format!("{}:{}", self.node_handle.modpath(), v)
249 }
250
251 fn on_claimed_runtime_var(&mut self, _var: &str, _is_read: bool) {
252 }
254
255 fn on_emit_rust_step(
256 &mut self,
257 label: &str,
258 _can_merge: bool,
259 code: Box<
260 dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
261 >,
262 ) {
263 if self.idx_tracker == self.target_idx {
264 let label = if !label.is_empty() {
265 label
266 } else {
267 "<unnamed snippet>"
268 };
269
270 self.result = Some(run_code(
271 self.flow_backend,
272 format!("{} ({})", label, self.node_handle.modpath()),
273 self.dry_run,
274 || code(self.rust_runtime_services),
275 ));
276 }
277 self.idx_tracker += 1;
278 }
279
280 fn on_emit_ado_step(
281 &mut self,
282 label: &str,
283 _yaml_snippet: Box<
284 dyn for<'a> FnOnce(
285 &'a mut flowey_core::node::user_facing::AdoStepServices<'_>,
286 ) -> String,
287 >,
288 code: Option<
289 Box<
290 dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
291 >,
292 >,
293 _condvar: Option<String>,
294 ) {
295 if self.idx_tracker == self.target_idx {
298 if let Some(code) = code {
299 self.result = Some(run_code(
300 self.flow_backend,
301 format!(
302 "(inline snippet) {} ({})",
303 label,
304 self.node_handle.modpath()
305 ),
306 self.dry_run,
307 || code(self.rust_runtime_services),
308 ));
309 }
310 }
311
312 self.idx_tracker += 1;
313 }
314
315 fn on_emit_gh_step(
316 &mut self,
317 _label: &str,
318 _uses: &str,
319 _with: BTreeMap<String, ClaimedGhParam>,
320 _condvar: Option<String>,
321 _outputs: BTreeMap<String, Vec<GhOutput>>,
322 _permissions: BTreeMap<GhPermission, GhPermissionValue>,
323 _gh_to_rust: Vec<GhToRust>,
324 _rust_to_gh: Vec<RustToGh>,
325 ) {
326 self.idx_tracker += 1;
327 }
328
329 fn on_emit_side_effect_step(&mut self) {
330 }
332
333 fn backend(&mut self) -> FlowBackend {
334 self.flow_backend
335 }
336
337 fn platform(&mut self) -> FlowPlatform {
338 self.flow_platform
339 }
340
341 fn arch(&mut self) -> FlowArch {
342 self.flow_arch
343 }
344
345 fn current_node(&self) -> NodeHandle {
346 self.node_handle
347 }
348
349 fn persistent_dir_path_var(&mut self) -> Option<String> {
350 self.persistent_storage_dir_var.clone()
351 }
352
353 fn on_unused_read_var(&mut self, _var: &str) {
354 }
356}
357
358#[derive(Serialize, Deserialize)]
359pub(crate) enum VarDbBackendKind {
360 Json,
361}
362
363#[derive(Serialize, Deserialize)]
364pub(crate) struct FloweyPipelineStaticDb {
365 pub flow_backend: FlowBackendCli,
366 pub var_db_backend_kind: VarDbBackendKind,
367 pub job_reqs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
368 #[serde(default)]
369 pub job_configs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
370 pub job_command_wrappers: BTreeMap<usize, flowey_core::shell::CommandWrapperKind>,
371 pub job_platforms: BTreeMap<usize, FlowPlatform>,
372 pub job_archs: BTreeMap<usize, FlowArch>,
373}
374
375#[derive(Serialize, Deserialize)]
378#[serde(transparent)]
379pub(crate) struct SerializedRequest(#[serde(with = "serialized_request")] pub Box<[u8]>);
380
381pub(crate) mod serialized_request {
382 use serde::Deserialize;
383 use serde::Deserializer;
384 use serde::Serializer;
385
386 #[expect(clippy::borrowed_box, reason = "required by serde")]
387 pub fn serialize<S: Serializer>(v: &Box<[u8]>, ser: S) -> Result<S::Ok, S::Error> {
388 ser.serialize_str(
389 &serde_json::to_string(&serde_json::from_slice::<serde_json::Value>(v).unwrap())
390 .unwrap(),
391 )
392 }
393
394 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Box<[u8]>, D::Error> {
395 let s: String = Deserialize::deserialize(d)?;
396 Ok(
397 serde_json::to_vec(&serde_json::from_str::<serde_json::Value>(&s).unwrap())
398 .unwrap()
399 .into(),
400 )
401 }
402}
403
404fn run_code(
405 flow_backend: FlowBackend,
406 label: impl std::fmt::Display,
407 dry_run: bool,
408 code: impl FnOnce() -> anyhow::Result<()>,
409) -> anyhow::Result<()> {
410 if matches!(flow_backend, FlowBackend::Ado) {
411 println!("##[group]=== {} ===", label)
412 } else {
413 log::info!("\x1B[0;32m=== {} ===\x1B[0m", label);
415 }
416
417 let result = if dry_run {
418 log::info!("...but not actually, because of --dry-run");
419 Ok(())
420 } else {
421 code()
422 };
423
424 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
426
427 if matches!(flow_backend, FlowBackend::Ado) {
428 println!("##[endgroup]")
429 } else {
430 log::info!(""); }
432
433 result
434}