1use crate::cli::FlowBackendCli;
5use anyhow::Context;
6use flowey_core::node::FlowArch;
7use flowey_core::node::FlowBackend;
8use flowey_core::node::FlowPlatform;
9use flowey_core::node::GhOutput;
10use flowey_core::node::GhToRust;
11use flowey_core::node::NodeHandle;
12use flowey_core::node::RustToGh;
13use flowey_core::node::steps::rust::RustRuntimeServices;
14use flowey_core::node::user_facing::ClaimedGhParam;
15use flowey_core::node::user_facing::GhPermission;
16use flowey_core::node::user_facing::GhPermissionValue;
17use flowey_core::pipeline::HostExt;
18use flowey_core::pipeline::PipelineBackendHint;
19use serde::Deserialize;
20use serde::Serialize;
21use std::collections::BTreeMap;
22use std::path::PathBuf;
23
24pub fn construct_exec_snippet_cli(
25 flowey_bin: &str,
26 node_modpath: &str,
27 snippet_idx: usize,
28 job_idx: usize,
29) -> String {
30 format!(r#"{flowey_bin} e {job_idx} {node_modpath} {snippet_idx}"#)
31}
32
33#[derive(clap::Args)]
35pub struct ExecSnippet {
36 pub(crate) job_idx: usize,
38
39 node_modpath_and_snippet_idx: Vec<String>,
40
41 #[clap(long)]
43 dry_run: bool,
44}
45
46pub const VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR: &str = "_internal_WORKING_DIR";
47pub const VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR: &str = "_internal_PERSISTENT_STORAGE_DIR";
48
49impl ExecSnippet {
50 pub fn run(self) -> anyhow::Result<()> {
51 let Self {
52 node_modpath_and_snippet_idx,
53 job_idx,
54 dry_run,
55 } = self;
56
57 let flow_platform = FlowPlatform::host(PipelineBackendHint::Local);
58 let flow_arch = FlowArch::host(PipelineBackendHint::Local);
59
60 let mut runtime_var_db = super::var_db::open_var_db(job_idx)?;
61
62 let working_dir: PathBuf = {
63 let Some((working_dir, _)) =
64 runtime_var_db.try_get_var(VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR)
65 else {
66 anyhow::bail!("var db was not seeded with {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR}");
67 };
68 serde_json::from_slice::<String>(&working_dir)
69 .context(format!(
70 "found {VAR_DB_SEEDVAR_FLOWEY_WORKING_DIR} in db, but it wasn't a json string!"
71 ))?
72 .into()
73 };
74
75 let FloweyPipelineStaticDb {
76 flow_backend,
77 var_db_backend_kind: _,
78 job_reqs,
79 } = {
80 let current_exe = std::env::current_exe()
81 .context("failed to get path to current flowey executable")?;
82 let pipeline_static_db =
83 fs_err::File::open(current_exe.with_file_name("pipeline.json"))?;
84 serde_json::from_reader(pipeline_static_db)?
85 };
86
87 for [node_modpath, snippet_idx] in node_modpath_and_snippet_idx
88 .chunks_exact(2)
89 .map(|x| -> [String; 2] { x.to_vec().try_into().unwrap() })
90 {
91 let snippet_idx = snippet_idx.parse::<usize>().unwrap();
92
93 let raw_json_reqs: Vec<Box<[u8]>> = job_reqs
94 .get(&job_idx)
95 .context("invalid job_idx")?
96 .get(&node_modpath)
97 .context("pipeline db did not include data for specified node")?
98 .iter()
99 .map(|v| v.0.clone())
100 .collect::<Vec<_>>();
101
102 let Some(node_handle) = NodeHandle::try_from_modpath(&node_modpath) else {
103 anyhow::bail!("could not find node with that name")
104 };
105
106 let mut node = node_handle.new_erased_node();
107
108 {
110 let snippet_working_dir = working_dir.join(format!(
111 "{}_{}",
112 node_handle.modpath().replace("::", "__"),
113 snippet_idx
114 ));
115 if !snippet_working_dir.exists() {
116 fs_err::create_dir_all(&snippet_working_dir)?;
117 }
118 log::trace!(
119 "Setting current working directory from {:?} to {:?}",
120 std::env::current_dir()?,
121 snippet_working_dir
122 );
123 std::env::set_current_dir(snippet_working_dir)?;
124 }
125
126 let persistent_storage_dir_var = runtime_var_db
128 .try_get_var(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR)
129 .is_some()
130 .then_some(VAR_DB_SEEDVAR_FLOWEY_PERSISTENT_STORAGE_DIR.to_owned());
131
132 let mut rust_runtime_services =
133 flowey_core::node::steps::rust::new_rust_runtime_services(
134 &mut runtime_var_db,
135 flow_backend.into(),
136 flow_platform,
137 flow_arch,
138 );
139
140 let mut ctx_backend = ExecSnippetCtx::new(
141 flow_backend.into(),
142 flow_platform,
143 flow_arch,
144 node_handle,
145 snippet_idx,
146 dry_run,
147 persistent_storage_dir_var,
148 &mut rust_runtime_services,
149 );
150
151 let mut ctx = flowey_core::node::new_node_ctx(&mut ctx_backend);
152 node.emit(raw_json_reqs.clone(), &mut ctx)?;
153
154 match ctx_backend.into_result() {
155 Some(res) => res?,
156 None => {
157 if dry_run {
158 } else {
160 anyhow::bail!("snippet wasn't run (invalid index)")
161 }
162 }
163 }
164 }
165
166 Ok(())
167 }
168}
169
170pub struct ExecSnippetCtx<'a, 'b> {
171 flow_backend: FlowBackend,
172 flow_platform: FlowPlatform,
173 flow_arch: FlowArch,
174 node_handle: NodeHandle,
175 rust_runtime_services: &'a mut RustRuntimeServices<'b>,
176 idx_tracker: usize,
177 var_tracker: usize,
178 target_idx: usize,
179 dry_run: bool,
180 persistent_storage_dir_var: Option<String>,
181 result: Option<anyhow::Result<()>>,
182}
183
184impl<'a, 'b> ExecSnippetCtx<'a, 'b> {
185 pub fn new(
186 flow_backend: FlowBackend,
187 flow_platform: FlowPlatform,
188 flow_arch: FlowArch,
189 node_handle: NodeHandle,
190 target_idx: usize,
191 dry_run: bool,
192 persistent_storage_dir_var: Option<String>,
193 rust_runtime_services: &'a mut RustRuntimeServices<'b>,
194 ) -> Self {
195 Self {
196 flow_backend,
197 flow_platform,
198 flow_arch,
199 node_handle,
200 rust_runtime_services,
201 var_tracker: 0,
202 idx_tracker: 0,
203 target_idx,
204 dry_run,
205 persistent_storage_dir_var,
206 result: None,
207 }
208 }
209
210 pub fn into_result(self) -> Option<anyhow::Result<()>> {
211 self.result
212 }
213}
214
215impl flowey_core::node::NodeCtxBackend for ExecSnippetCtx<'_, '_> {
216 fn on_request(&mut self, _node_handle: NodeHandle, _req: anyhow::Result<Box<[u8]>>) {
217 }
219
220 fn on_new_var(&mut self) -> String {
221 let v = self.var_tracker;
222 self.var_tracker += 1;
223 format!("{}:{}", self.node_handle.modpath(), v)
224 }
225
226 fn on_claimed_runtime_var(&mut self, _var: &str, _is_read: bool) {
227 }
229
230 fn on_emit_rust_step(
231 &mut self,
232 label: &str,
233 _can_merge: bool,
234 code: Box<
235 dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
236 >,
237 ) {
238 if self.idx_tracker == self.target_idx {
239 let label = if !label.is_empty() {
240 label
241 } else {
242 "<unnamed snippet>"
243 };
244
245 self.result = Some(run_code(
246 self.flow_backend,
247 format!("{} ({})", label, self.node_handle.modpath()),
248 self.dry_run,
249 || code(self.rust_runtime_services),
250 ));
251 }
252 self.idx_tracker += 1;
253 }
254
255 fn on_emit_ado_step(
256 &mut self,
257 label: &str,
258 _yaml_snippet: Box<
259 dyn for<'a> FnOnce(
260 &'a mut flowey_core::node::user_facing::AdoStepServices<'_>,
261 ) -> String,
262 >,
263 code: Option<
264 Box<
265 dyn for<'a> FnOnce(&'a mut RustRuntimeServices<'_>) -> anyhow::Result<()> + 'static,
266 >,
267 >,
268 _condvar: Option<String>,
269 ) {
270 if self.idx_tracker == self.target_idx {
273 if let Some(code) = code {
274 self.result = Some(run_code(
275 self.flow_backend,
276 format!(
277 "(inline snippet) {} ({})",
278 label,
279 self.node_handle.modpath()
280 ),
281 self.dry_run,
282 || code(self.rust_runtime_services),
283 ));
284 }
285 }
286
287 self.idx_tracker += 1;
288 }
289
290 fn on_emit_gh_step(
291 &mut self,
292 _label: &str,
293 _uses: &str,
294 _with: BTreeMap<String, ClaimedGhParam>,
295 _condvar: Option<String>,
296 _outputs: BTreeMap<String, Vec<GhOutput>>,
297 _permissions: BTreeMap<GhPermission, GhPermissionValue>,
298 _gh_to_rust: Vec<GhToRust>,
299 _rust_to_gh: Vec<RustToGh>,
300 ) {
301 self.idx_tracker += 1;
302 }
303
304 fn on_emit_side_effect_step(&mut self) {
305 }
307
308 fn backend(&mut self) -> FlowBackend {
309 self.flow_backend
310 }
311
312 fn platform(&mut self) -> FlowPlatform {
313 self.flow_platform
314 }
315
316 fn arch(&mut self) -> FlowArch {
317 self.flow_arch
318 }
319
320 fn current_node(&self) -> NodeHandle {
321 self.node_handle
322 }
323
324 fn persistent_dir_path_var(&mut self) -> Option<String> {
325 self.persistent_storage_dir_var.clone()
326 }
327
328 fn on_unused_read_var(&mut self, _var: &str) {
329 }
331}
332
333#[derive(Serialize, Deserialize)]
334pub(crate) enum VarDbBackendKind {
335 Json,
336}
337
338#[derive(Serialize, Deserialize)]
339pub(crate) struct FloweyPipelineStaticDb {
340 pub flow_backend: FlowBackendCli,
341 pub var_db_backend_kind: VarDbBackendKind,
342 pub job_reqs: BTreeMap<usize, BTreeMap<String, Vec<SerializedRequest>>>,
343}
344
345#[derive(Serialize, Deserialize)]
348#[serde(transparent)]
349pub(crate) struct SerializedRequest(#[serde(with = "serialized_request")] pub Box<[u8]>);
350
351pub(crate) mod serialized_request {
352 use serde::Deserialize;
353 use serde::Deserializer;
354 use serde::Serializer;
355
356 #[expect(clippy::borrowed_box, reason = "required by serde")]
357 pub fn serialize<S: Serializer>(v: &Box<[u8]>, ser: S) -> Result<S::Ok, S::Error> {
358 ser.serialize_str(
359 &serde_json::to_string(&serde_json::from_slice::<serde_json::Value>(v).unwrap())
360 .unwrap(),
361 )
362 }
363
364 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Box<[u8]>, D::Error> {
365 let s: String = Deserialize::deserialize(d)?;
366 Ok(
367 serde_json::to_vec(&serde_json::from_str::<serde_json::Value>(&s).unwrap())
368 .unwrap()
369 .into(),
370 )
371 }
372}
373
374fn run_code(
375 flow_backend: FlowBackend,
376 label: impl std::fmt::Display,
377 dry_run: bool,
378 code: impl FnOnce() -> anyhow::Result<()>,
379) -> anyhow::Result<()> {
380 if matches!(flow_backend, FlowBackend::Ado) {
381 println!("##[group]=== {} ===", label)
382 } else {
383 log::info!("\x1B[0;32m=== {} ===\x1B[0m", label);
385 }
386
387 let result = if dry_run {
388 log::info!("...but not actually, because of --dry-run");
389 Ok(())
390 } else {
391 code()
392 };
393
394 log::info!("\x1B[0;32m=== done! ===\x1B[0m");
396
397 if matches!(flow_backend, FlowBackend::Ado) {
398 println!("##[endgroup]")
399 } else {
400 log::info!(""); }
402
403 result
404}