1use crate::cli::FlowBackendCli;
5use anyhow::Context;
6use flowey_core::node::FlowArch;
7use flowey_core::node::FlowBackend;
8use flowey_core::node::FlowPlatform;
9use flowey_core::node::GhOutput;
10use flowey_core::node::GhToRust;
11use flowey_core::node::NodeHandle;
12use flowey_core::node::RustToGh;
13use flowey_core::node::steps::rust::RustRuntimeServices;
14use flowey_core::node::user_facing::ClaimedGhParam;
15use flowey_core::node::user_facing::GhPermission;
16use flowey_core::node::user_facing::GhPermissionValue;
17use serde::Deserialize;
18use serde::Serialize;
19use std::collections::BTreeMap;
20use std::path::PathBuf;
21
22pub fn construct_exec_snippet_cli(
23 flowey_bin: &str,
24 node_modpath: &str,
25 snippet_idx: usize,
26 job_idx: usize,
27) -> String {
28 format!(r#"{flowey_bin} e {job_idx} {node_modpath} {snippet_idx}"#)
29}
30
31#[derive(clap::Args)]
33pub struct ExecSnippet {
34 pub(crate) job_idx: usize,
36
37 node_modpath_and_snippet_idx: Vec<String>,
38
39 #[clap(long)]
41 dry_run: bool,
42}
43
44pub const VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR: &str = "_internal_WORKING_DIR";
45pub const VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR: &str = "_internal_PERSISTENT_STORAGE_DIR";
46
47impl ExecSnippet {
48 pub fn run(self) -> anyhow::Result<()> {
49 let Self {
50 node_modpath_and_snippet_idx,
51 job_idx,
52 dry_run,
53 } = self;
54
55 let mut runtime_var_db = super::var_db::open_var_db(job_idx)?;
56
57 let working_dir: PathBuf = {
58 let Some((working_dir, _)) =
59 runtime_var_db.try_get_var(VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR)
60 else {
61 anyhow::bail!("var db was not seeded with {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR}");
62 };
63 serde_json::from_slice::<String>(&working_dir)
64 .context(format!(
65 "found {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR} in db, but it wasn't a json string!"
66 ))?
67 .into()
68 };
69
70 let FloweyPipelineStaticDb {
71 flow_backend,
72 var_db_backend_kind: _,
73 job_reqs,
74 job_command_wrappers,
75 job_platforms,
76 job_archs,
77 } = {
78 let current_exe = std::env::current_exe()
79 .context("failed to get path to current flowey executable")?;
80 let pipeline_static_db =
81 fs_err::File::open(current_exe.with_file_name("pipeline.json"))?;
82 serde_json::from_reader(pipeline_static_db)?
83 };
84
85 let flow_platform = *job_platforms
86 .get(&job_idx)
87 .context("invalid job_idx: missing platform")?;
88 let flow_arch = *job_archs
89 .get(&job_idx)
90 .context("invalid job_idx: missing arch")?;
91
92 let command_wrapper = job_command_wrappers.get(&job_idx).cloned();
93
94 for [node_modpath, snippet_idx] in node_modpath_and_snippet_idx
95 .chunks_exact(2)
96 .map(|x| -> [String; 2] { x.to_vec().try_into().unwrap() })
97 {
98 let snippet_idx = snippet_idx.parse::<usize>().unwrap();
99
100 let raw_json_reqs: Vec<Box<[u8]>> = job_reqs
101 .get(&job_idx)
102 .context("invalid job_idx")?
103 .get(&node_modpath)
104 .context("pipeline db did not include data for specified node")?
105 .iter()
106 .map(|v| v.0.clone())
107 .collect::<Vec<_>>();
108
109 let Some(node_handle) = NodeHandle::try_from_modpath(&node_modpath) else {
110 anyhow::bail!("could not find node with that name")
111 };
112
113 let mut node = node_handle.new_erased_node();
114
115 {
117 let snippet_working_dir = working_dir.join(format!(
118 "{}_{}",
119 node_handle.modpath().replace("::", "__"),
120 snippet_idx
121 ));
122 if !snippet_working_dir.exists() {
123 fs_err::create_dir_all(&snippet_working_dir)?;
124 }
125 log::trace!(
126 "Setting current working directory from {:?} to {:?}",
127 std::env::current_dir()?,
128 snippet_working_dir
129 );
130 std::env::set_current_dir(snippet_working_dir)?;
131 }
132
133 let persistent_storage_dir_var = runtime_var_db
135 .try_get_var(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR)
136 .is_some()
137 .then_some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.to_owned());
138
139 let mut rust_runtime_services =
140 flowey_core::node::steps::rust::new_rust_runtime_services(
141 &mut runtime_var_db,
142 flow_backend.into(),
143 flow_platform,
144 flow_arch,
145 )?;
146
147 if let Some(ref wrapper) = command_wrapper {
148 rust_runtime_services.sh.set_wrapper(Some(wrapper.clone()));
149 }
150
151 let mut ctx_backend = ExecSnippetCtx::new(
152 flow_backend.into(),
153 flow_platform,
154 flow_arch,
155 node_handle,
156 snippet_idx,
157 dry_run,
158 persistent_storage_dir_var,
159 &mut rust_runtime_services,
160 );
161
162 let mut ctx = flowey_core::node::new_node_ctx(&mut ctx_backend);
163 node.emit(raw_json_reqs.clone(), &mut ctx)?;
164
165 match ctx_backend.into_result() {
166 Some(res) => res?,
167 None => {
168 if dry_run {
169 } else {
171 anyhow::bail!("snippet wasn't run (invalid index)")
172 }
173 }
174 }
175 }
176
177 std::env::set_current_dir(working_dir)?;
179
180 Ok(())
181 }
182}
183
184pub struct ExecSnippetCtx<'a, 'b> {
185 flow_backend: FlowBackend,
186 flow_platform: FlowPlatform,
187 flow_arch: FlowArch,
188 node_handle: NodeHandle,
189 rust_runtime_services: &'a mut RustRuntimeServices<'b>,
190 idx_tracker: usize,
191 var_tracker: usize,
192 target_idx: usize,
193 dry_run: bool,
194 persistent_storage_dir_var: Option<String>,
195 result: Option<anyhow::Result<()>>,
196}
197
198impl<'a, 'b> ExecSnippetCtx<'a, 'b> {
199 pub fn new(
200 flow_backend: FlowBackend,
201 flow_platform: FlowPlatform,
202 flow_arch: FlowArch,
203 node_handle: NodeHandle,
204 target_idx: usize,
205 dry_run: bool,
206 persistent_storage_dir_var: Option<String>,
207 rust_runtime_services: &'a mut RustRuntimeServices<'b>,
208 ) -> Self {
209 Self {
210 flow_backend,
211 flow_platform,
212 flow_arch,
213 node_handle,
214 rust_runtime_services,
215 var_tracker: 0,
216 idx_tracker: 0,
217 target_idx,
218 dry_run,
219 persistent_storage_dir_var,
220 result: None,
221 }
222 }
223
224 pub fn into_result(self) -> Option<anyhow::Result<()>> {
225 self.result
226 }
227}
228
229impl flowey_core::node::NodeCtxBackend for ExecSnippetCtx<'_, '_> {
230 fn on_request(&mut self, _node_handle: NodeHandle, _req: anyhow::Result<Box<[u8]>>) {
231 }
233
234 fn on_new_var(&mut self) -> String {
235 let v = self.var_tracker;
236 self.var_tracker += 1;
237 format!("{}:{}", self.node_handle.modpath(), v)
238 }
239
240 fn on_claimed_runtime_var(&mut self, _var: &str, _is_read: bool) {
241 }
243
244 fn on_emit_rust_step(
245 &mut self,
246 label: &str,
247 _can_merge: bool,
248 code: Box<
249 dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
250 >,
251 ) {
252 if self.idx_tracker == self.target_idx {
253 let label = if !label.is_empty() {
254 label
255 } else {
256 "<unnamed snippet>"
257 };
258
259 self.result = Some(run_code(
260 self.flow_backend,
261 format!("{} ({})", label, self.node_handle.modpath()),
262 self.dry_run,
263 || code(self.rust_runtime_services),
264 ));
265 }
266 self.idx_tracker += 1;
267 }
268
269 fn on_emit_ado_step(
270 &mut self,
271 label: &str,
272 _yaml_snippet: Box<
273 dyn for<'a> FnOnce(
274 &'a mut flowey_core::node::user_facing::AdoStepServices<'_>,
275 ) -> String,
276 >,
277 code: Option<
278 Box<
279 dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
280 >,
281 >,
282 _condvar: Option<String>,
283 ) {
284 if self.idx_tracker == self.target_idx {
287 if let Some(code) = code {
288 self.result = Some(run_code(
289 self.flow_backend,
290 format!(
291 "(inline snippet) {} ({})",
292 label,
293 self.node_handle.modpath()
294 ),
295 self.dry_run,
296 || code(self.rust_runtime_services),
297 ));
298 }
299 }
300
301 self.idx_tracker += 1;
302 }
303
304 fn on_emit_gh_step(
305 &mut self,
306 _label: &str,
307 _uses: &str,
308 _with: BTreeMap<String, ClaimedGhParam>,
309 _condvar: Option<String>,
310 _outputs: BTreeMap<String, Vec<GhOutput>>,
311 _permissions: BTreeMap<GhPermission, GhPermissionValue>,
312 _gh_to_rust: Vec<GhToRust>,
313 _rust_to_gh: Vec<RustToGh>,
314 ) {
315 self.idx_tracker += 1;
316 }
317
318 fn on_emit_side_effect_step(&mut self) {
319 }
321
322 fn backend(&mut self) -> FlowBackend {
323 self.flow_backend
324 }
325
326 fn platform(&mut self) -> FlowPlatform {
327 self.flow_platform
328 }
329
330 fn arch(&mut self) -> FlowArch {
331 self.flow_arch
332 }
333
334 fn current_node(&self) -> NodeHandle {
335 self.node_handle
336 }
337
338 fn persistent_dir_path_var(&mut self) -> Option<String> {
339 self.persistent_storage_dir_var.clone()
340 }
341
342 fn on_unused_read_var(&mut self, _var: &str) {
343 }
345}
346
347#[derive(Serialize, Deserialize)]
348pub(crate) enum VarDbBackendKind {
349 Json,
350}
351
352#[derive(Serialize, Deserialize)]
353pub(crate) struct FloweyPipelineStaticDb {
354 pub flow_backend: FlowBackendCli,
355 pub var_db_backend_kind: VarDbBackendKind,
356 pub job_reqs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
357 pub job_command_wrappers: BTreeMap<usize, flowey_core::shell::CommandWrapperKind>,
358 pub job_platforms: BTreeMap<usize, FlowPlatform>,
359 pub job_archs: BTreeMap<usize, FlowArch>,
360}
361
362#[derive(Serialize, Deserialize)]
365#[serde(transparent)]
366pub(crate) struct SerializedRequest(#[serde(with = "serialized_request")] pub Box<[u8]>);
367
368pub(crate) mod serialized_request {
369 use serde::Deserialize;
370 use serde::Deserializer;
371 use serde::Serializer;
372
373 #[expect(clippy::borrowed_box, reason = "required by serde")]
374 pub fn serialize<S: Serializer>(v: &Box<[u8]>, ser: S) -> Result<S::Ok, S::Error> {
375 ser.serialize_str(
376 &serde_json::to_string(&serde_json::from_slice::<serde_json::Value>(v).unwrap())
377 .unwrap(),
378 )
379 }
380
381 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Box<[u8]>, D::Error> {
382 let s: String = Deserialize::deserialize(d)?;
383 Ok(
384 serde_json::to_vec(&serde_json::from_str::<serde_json::Value>(&s).unwrap())
385 .unwrap()
386 .into(),
387 )
388 }
389}
390
391fn run_code(
392 flow_backend: FlowBackend,
393 label: impl std::fmt::Display,
394 dry_run: bool,
395 code: impl FnOnce() -> anyhow::Result<()>,
396) -> anyhow::Result<()> {
397 if matches!(flow_backend, FlowBackend::Ado) {
398 println!("##[group]=== {} ===", label)
399 } else {
400 log::info!("\x1B[0;32m=== {} ===\x1B[0m", label);
402 }
403
404 let result = if dry_run {
405 log::info!("...but not actually, because of --dry-run");
406 Ok(())
407 } else {
408 code()
409 };
410
411 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
413
414 if matches!(flow_backend, FlowBackend::Ado) {
415 println!("##[endgroup]")
416 } else {
417 log::info!(""); }
419
420 result
421}