1use crate::cli::FlowBackendCli;
5use anyhow::Context;
6use flowey_core::node::FlowArch;
7use flowey_core::node::FlowBackend;
8use flowey_core::node::FlowPlatform;
9use flowey_core::node::GhOutput;
10use flowey_core::node::GhToRust;
11use flowey_core::node::NodeHandle;
12use flowey_core::node::RustToGh;
13use flowey_core::node::steps::rust::RustRuntimeServices;
14use flowey_core::node::user_facing::ClaimedGhParam;
15use flowey_core::node::user_facing::GhPermission;
16use flowey_core::node::user_facing::GhPermissionValue;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use serde::Deserialize;
20use serde::Serialize;
21use std::collections::BTreeMap;
22use std::path::PathBuf;
23
24pub fn construct_exec_snippet_cli(
25 flowey_bin: &str,
26 node_modpath: &str,
27 snippet_idx: usize,
28 job_idx: usize,
29) -> String {
30 format!(r#"{flowey_bin} e {job_idx} {node_modpath} {snippet_idx}"#)
31}
32
33#[derive(clap::Args)]
35pub struct ExecSnippet {
36 pub(crate) job_idx: usize,
38
39 node_modpath_and_snippet_idx: Vec<String>,
40
41 #[clap(long)]
43 dry_run: bool,
44}
45
46pub const VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR: &str = "_internal_WORKING_DIR";
47pub const VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR: &str = "_internal_PERSISTENT_STORAGE_DIR";
48
49impl ExecSnippet {
50 pub fn run(self) -> anyhow::Result<()> {
51 let Self {
52 node_modpath_and_snippet_idx,
53 job_idx,
54 dry_run,
55 } = self;
56
57 let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
58 let flow_arch = FlowArch::host(PipelineBackendHint::Local);
59
60 let mut runtime_var_db = super::var_db::open_var_db(job_idx)?;
61
62 let working_dir: PathBuf = {
63 let Some((working_dir, _)) =
64 runtime_var_db.try_get_var(VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR)
65 else {
66 anyhow::bail!("var db was not seeded with {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR}");
67 };
68 serde_json::from_slice::<String>(&working_dir)
69 .context(format!(
70 "found {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR} in db, but it wasn't a json string!"
71 ))?
72 .into()
73 };
74
75 let FloweyPipelineStaticDb {
76 flow_backend,
77 var_db_backend_kind: _,
78 job_reqs,
79 } = {
80 let current_exe = std::env::current_exe()
81 .context("failed to get path to current flowey executable")?;
82 let pipeline_static_db =
83 fs_err::File::open(current_exe.with_file_name("pipeline.json"))?;
84 serde_json::from_reader(pipeline_static_db)?
85 };
86
87 for [node_modpath, snippet_idx] in node_modpath_and_snippet_idx
88 .chunks_exact(2)
89 .map(|x| -> [String; 2] { x.to_vec().try_into().unwrap() })
90 {
91 let snippet_idx = snippet_idx.parse::<usize>().unwrap();
92
93 let raw_json_reqs: Vec<Box<[u8]>> = job_reqs
94 .get(&job_idx)
95 .context("invalid job_idx")?
96 .get(&node_modpath)
97 .context("pipeline db did not include data for specified node")?
98 .iter()
99 .map(|v| v.0.clone())
100 .collect::<Vec<_>>();
101
102 let Some(node_handle) = NodeHandle::try_from_modpath(&node_modpath) else {
103 anyhow::bail!("could not find node with that name")
104 };
105
106 let mut node = node_handle.new_erased_node();
107
108 {
110 let snippet_working_dir = working_dir.join(format!(
111 "{}_{}",
112 node_handle.modpath().replace("::", "__"),
113 snippet_idx
114 ));
115 if !snippet_working_dir.exists() {
116 fs_err::create_dir_all(&snippet_working_dir)?;
117 }
118 log::trace!(
119 "Setting current working directory from {:?} to {:?}",
120 std::env::current_dir()?,
121 snippet_working_dir
122 );
123 std::env::set_current_dir(snippet_working_dir)?;
124 }
125
126 let persistent_storage_dir_var = runtime_var_db
128 .try_get_var(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR)
129 .is_some()
130 .then_some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.to_owned());
131
132 let mut rust_runtime_services =
133 flowey_core::node::steps::rust::new_rust_runtime_services(
134 &mut runtime_var_db,
135 flow_backend.into(),
136 flow_platform,
137 flow_arch,
138 );
139
140 let mut ctx_backend = ExecSnippetCtx::new(
141 flow_backend.into(),
142 flow_platform,
143 flow_arch,
144 node_handle,
145 snippet_idx,
146 dry_run,
147 persistent_storage_dir_var,
148 &mut rust_runtime_services,
149 );
150
151 let mut ctx = flowey_core::node::new_node_ctx(&mut ctx_backend);
152 node.emit(raw_json_reqs.clone(), &mut ctx)?;
153
154 match ctx_backend.into_result() {
155 Some(res) => res?,
156 None => {
157 if dry_run {
158 } else {
160 anyhow::bail!("snippet wasn't run (invalid index)")
161 }
162 }
163 }
164 }
165
166 std::env::set_current_dir(working_dir)?;
168
169 Ok(())
170 }
171}
172
173pub struct ExecSnippetCtx<'a, 'b> {
174 flow_backend: FlowBackend,
175 flow_platform: FlowPlatform,
176 flow_arch: FlowArch,
177 node_handle: NodeHandle,
178 rust_runtime_services: &'a mut RustRuntimeServices<'b>,
179 idx_tracker: usize,
180 var_tracker: usize,
181 target_idx: usize,
182 dry_run: bool,
183 persistent_storage_dir_var: Option<String>,
184 result: Option<anyhow::Result<()>>,
185}
186
187impl<'a, 'b> ExecSnippetCtx<'a, 'b> {
188 pub fn new(
189 flow_backend: FlowBackend,
190 flow_platform: FlowPlatform,
191 flow_arch: FlowArch,
192 node_handle: NodeHandle,
193 target_idx: usize,
194 dry_run: bool,
195 persistent_storage_dir_var: Option<String>,
196 rust_runtime_services: &'a mut RustRuntimeServices<'b>,
197 ) -> Self {
198 Self {
199 flow_backend,
200 flow_platform,
201 flow_arch,
202 node_handle,
203 rust_runtime_services,
204 var_tracker: 0,
205 idx_tracker: 0,
206 target_idx,
207 dry_run,
208 persistent_storage_dir_var,
209 result: None,
210 }
211 }
212
213 pub fn into_result(self) -> Option<anyhow::Result<()>> {
214 self.result
215 }
216}
217
218impl flowey_core::node::NodeCtxBackend for ExecSnippetCtx<'_, '_> {
219 fn on_request(&mut self, _node_handle: NodeHandle, _req: anyhow::Result<Box<[u8]>>) {
220 }
222
223 fn on_new_var(&mut self) -> String {
224 let v = self.var_tracker;
225 self.var_tracker += 1;
226 format!("{}:{}", self.node_handle.modpath(), v)
227 }
228
229 fn on_claimed_runtime_var(&mut self, _var: &str, _is_read: bool) {
230 }
232
233 fn on_emit_rust_step(
234 &mut self,
235 label: &str,
236 _can_merge: bool,
237 code: Box<
238 dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
239 >,
240 ) {
241 if self.idx_tracker == self.target_idx {
242 let label = if !label.is_empty() {
243 label
244 } else {
245 "<unnamed snippet>"
246 };
247
248 self.result = Some(run_code(
249 self.flow_backend,
250 format!("{} ({})", label, self.node_handle.modpath()),
251 self.dry_run,
252 || code(self.rust_runtime_services),
253 ));
254 }
255 self.idx_tracker += 1;
256 }
257
258 fn on_emit_ado_step(
259 &mut self,
260 label: &str,
261 _yaml_snippet: Box<
262 dyn for<'a> FnOnce(
263 &'a mut flowey_core::node::user_facing::AdoStepServices<'_>,
264 ) -> String,
265 >,
266 code: Option<
267 Box<
268 dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
269 >,
270 >,
271 _condvar: Option<String>,
272 ) {
273 if self.idx_tracker == self.target_idx {
276 if let Some(code) = code {
277 self.result = Some(run_code(
278 self.flow_backend,
279 format!(
280 "(inline snippet) {} ({})",
281 label,
282 self.node_handle.modpath()
283 ),
284 self.dry_run,
285 || code(self.rust_runtime_services),
286 ));
287 }
288 }
289
290 self.idx_tracker += 1;
291 }
292
293 fn on_emit_gh_step(
294 &mut self,
295 _label: &str,
296 _uses: &str,
297 _with: BTreeMap<String, ClaimedGhParam>,
298 _condvar: Option<String>,
299 _outputs: BTreeMap<String, Vec<GhOutput>>,
300 _permissions: BTreeMap<GhPermission, GhPermissionValue>,
301 _gh_to_rust: Vec<GhToRust>,
302 _rust_to_gh: Vec<RustToGh>,
303 ) {
304 self.idx_tracker += 1;
305 }
306
307 fn on_emit_side_effect_step(&mut self) {
308 }
310
311 fn backend(&mut self) -> FlowBackend {
312 self.flow_backend
313 }
314
315 fn platform(&mut self) -> FlowPlatform {
316 self.flow_platform
317 }
318
319 fn arch(&mut self) -> FlowArch {
320 self.flow_arch
321 }
322
323 fn current_node(&self) -> NodeHandle {
324 self.node_handle
325 }
326
327 fn persistent_dir_path_var(&mut self) -> Option<String> {
328 self.persistent_storage_dir_var.clone()
329 }
330
331 fn on_unused_read_var(&mut self, _var: &str) {
332 }
334}
335
336#[derive(Serialize, Deserialize)]
337pub(crate) enum VarDbBackendKind {
338 Json,
339}
340
341#[derive(Serialize, Deserialize)]
342pub(crate) struct FloweyPipelineStaticDb {
343 pub flow_backend: FlowBackendCli,
344 pub var_db_backend_kind: VarDbBackendKind,
345 pub job_reqs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
346}
347
348#[derive(Serialize, Deserialize)]
351#[serde(transparent)]
352pub(crate) struct SerializedRequest(#[serde(with = "serialized_request")] pub Box<[u8]>);
353
354pub(crate) mod serialized_request {
355 use serde::Deserialize;
356 use serde::Deserializer;
357 use serde::Serializer;
358
359 #[expect(clippy::borrowed_box, reason = "required by serde")]
360 pub fn serialize<S: Serializer>(v: &Box<[u8]>, ser: S) -> Result<S::Ok, S::Error> {
361 ser.serialize_str(
362 &serde_json::to_string(&serde_json::from_slice::<serde_json::Value>(v).unwrap())
363 .unwrap(),
364 )
365 }
366
367 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Box<[u8]>, D::Error> {
368 let s: String = Deserialize::deserialize(d)?;
369 Ok(
370 serde_json::to_vec(&serde_json::from_str::<serde_json::Value>(&s).unwrap())
371 .unwrap()
372 .into(),
373 )
374 }
375}
376
377fn run_code(
378 flow_backend: FlowBackend,
379 label: impl std::fmt::Display,
380 dry_run: bool,
381 code: impl FnOnce() -> anyhow::Result<()>,
382) -> anyhow::Result<()> {
383 if matches!(flow_backend, FlowBackend::Ado) {
384 println!("##[group]=== {} ===", label)
385 } else {
386 log::info!("\x1B[0;32m=== {} ===\x1B[0m", label);
388 }
389
390 let result = if dry_run {
391 log::info!("...but not actually, because of --dry-run");
392 Ok(())
393 } else {
394 code()
395 };
396
397 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
399
400 if matches!(flow_backend, FlowBackend::Ado) {
401 println!("##[endgroup]")
402 } else {
403 log::info!(""); }
405
406 result
407}