flowey_core/pipeline/
artifact.rs1use anyhow::Context as _;
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use std::path::Path;
11use std::path::PathBuf;
12
13pub trait Artifact: Serialize + DeserializeOwned {
60 const TAR_GZ_NAME: Option<&'static str> = None;
71}
72
73fn json_to_fs(value: serde_json::Value, path: &Path) -> anyhow::Result<()> {
74 if let serde_json::Value::Object(map) = value {
75 json_to_fs_inner(map, path)
76 } else {
77 anyhow::bail!("expected JSON object");
78 }
79}
80
81fn json_to_fs_inner(
82 value: serde_json::Map<String, serde_json::Value>,
83 root: &Path,
84) -> anyhow::Result<()> {
85 for (key, value) in value {
86 let path = root.join(key);
87 match value {
88 serde_json::Value::Object(map) => {
89 fs_err::create_dir_all(&path)?;
90 json_to_fs_inner(map, &path)?;
91 }
92 serde_json::Value::String(src_path) => {
93 let src_path = Path::new(&src_path);
94 if src_path.is_dir() {
95 crate::util::copy_dir_all(src_path, &path)?;
96 fs_err::File::create(tag_path(path))?;
99 } else {
100 fs_err::copy(src_path, &path)?;
101 }
102 }
103 _ => {
104 anyhow::bail!("unsupported JSON value type");
105 }
106 }
107 }
108 Ok(())
109}
110
111fn fs_to_json(root: &Path) -> anyhow::Result<serde_json::Value> {
112 let mut map = serde_json::Map::new();
113 for entry in fs_err::read_dir(std::path::absolute(root)?)? {
114 let entry = entry?;
115 let path = entry.path();
116 let file_name = entry
117 .file_name()
118 .into_string()
119 .ok()
120 .context("non-utf8 filename")?;
121 let recurse = if path.is_dir() {
122 !tag_path(path.clone()).exists()
123 } else if is_tag_path(&path) {
124 continue;
125 } else {
126 false
127 };
128
129 let value = if recurse {
130 fs_to_json(&path)?
131 } else {
132 let path = path
133 .into_os_string()
134 .into_string()
135 .ok()
136 .context("non-utf8 path")?;
137 serde_json::Value::String(path)
138 };
139 map.insert(file_name, value);
140 }
141 Ok(serde_json::Value::Object(map))
142}
143
144fn is_tag_path(path: &Path) -> bool {
145 path.file_name()
146 .and_then(|name| name.to_str())
147 .is_some_and(|name| name.starts_with(".artifact-dir."))
148}
149
150fn tag_path(mut path: PathBuf) -> PathBuf {
151 let file_name = path.file_name().unwrap().to_str().unwrap();
152 path.set_file_name(format!(".artifact-dir.{file_name}"));
153 path
154}
155
156#[expect(unsafe_code)]
158pub mod publish {
159 use super::Artifact;
160 use crate::flowey_request;
161 use crate::new_simple_flow_node;
162 use crate::node::ClaimVar;
163 use crate::node::ReadVar;
164 use crate::node::SideEffect;
165 use crate::node::SimpleFlowNode;
166 use crate::node::WriteVar;
167 use std::path::PathBuf;
168
169 new_simple_flow_node!(struct Node);
170
171 flowey_request! {
172 pub struct Request {
173 value: ReadVar<serde_json::Value>,
174 path: ReadVar<PathBuf>,
175 tar_gz_name: Option<String>,
176 done: WriteVar<SideEffect>,
177 }
178 }
179
180 impl Request {
181 pub fn new<T: Artifact>(
182 value: ReadVar<T>,
183 path: ReadVar<PathBuf>,
184 done: WriteVar<SideEffect>,
185 ) -> Self {
186 Self {
187 value: value.into_json(),
188 path,
189 tar_gz_name: T::TAR_GZ_NAME.map(ToOwned::to_owned),
190 done,
191 }
192 }
193 }
194
195 impl SimpleFlowNode for Node {
196 type Request = Request;
197
198 fn imports(_ctx: &mut crate::node::ImportCtx<'_>) {}
199
200 fn process_request(
201 request: Self::Request,
202 ctx: &mut crate::node::NodeCtx<'_>,
203 ) -> anyhow::Result<()> {
204 let Request {
205 value,
206 path,
207 tar_gz_name,
208 done,
209 } = request;
210
211 ctx.emit_minor_rust_step("🌼 copy artifact contents", |ctx| {
212 let path = path.claim(ctx);
213 let value = value.claim(ctx);
214 done.claim(ctx);
215 |rt| {
216 let path = rt.read(path);
217 let value = rt.read(value);
218 if let Some(tar_gz_name) = tar_gz_name {
219 super::json_to_fs(value, ".".as_ref())
220 .expect("failed to copy artifact contents");
221 let tar_gz_path = path.join(tar_gz_name);
222 let r = std::process::Command::new("tar")
223 .arg("-acf")
224 .arg(&tar_gz_path)
225 .arg(".")
226 .output()
227 .expect("failed to launch tar");
228 if !r.status.success() {
229 panic!("failed to archive artifact contents: {r:?}");
230 }
231 } else {
232 super::json_to_fs(value, &path).expect("failed to copy artifact contents");
233 }
234 }
235 });
236 Ok(())
237 }
238 }
239}
240
241#[expect(unsafe_code)]
243pub mod resolve {
244 use super::Artifact;
245 use crate::flowey_request;
246 use crate::new_simple_flow_node;
247 use crate::node::ClaimVar;
248 use crate::node::ReadVar;
249 use crate::node::SimpleFlowNode;
250 use crate::node::WriteVar;
251 use std::path::PathBuf;
252
253 new_simple_flow_node!(struct Node);
254
255 flowey_request! {
256 pub struct Request {
257 path: ReadVar<PathBuf>,
258 tar_gz_name: Option<String>,
259 result: WriteVar<serde_json::Value>,
260 }
261 }
262
263 impl Request {
264 pub fn new<T: Artifact>(path: ReadVar<PathBuf>, result: WriteVar<T>) -> Self {
265 Self {
266 path,
267 tar_gz_name: T::TAR_GZ_NAME.map(ToOwned::to_owned),
268 result: result.into_json(),
269 }
270 }
271 }
272
273 impl SimpleFlowNode for Node {
274 type Request = Request;
275
276 fn imports(_ctx: &mut crate::node::ImportCtx<'_>) {}
277
278 fn process_request(
279 request: Self::Request,
280 ctx: &mut crate::node::NodeCtx<'_>,
281 ) -> anyhow::Result<()> {
282 let Request {
283 path,
284 tar_gz_name,
285 result,
286 } = request;
287
288 ctx.emit_minor_rust_step("🌼 resolve artifact", |ctx| {
289 let path = path.claim(ctx);
290 let result = result.claim(ctx);
291 |rt| {
292 let path = rt.read(path);
293 let path = if let Some(tar_gz_name) = tar_gz_name {
294 let tar_gz_path = path.join(tar_gz_name);
295 let r = std::process::Command::new("tar")
296 .arg("-xf")
297 .arg(&tar_gz_path)
298 .output()
299 .expect("failed to launch tar");
300 if !r.status.success() {
301 panic!("failed to extract artifact contents: {r:?}");
302 }
303 ".".as_ref()
304 } else {
305 path.as_ref()
306 };
307 let value = super::fs_to_json(path).expect("failed to read artifact contents");
308 rt.write(result, &value);
309 }
310 });
311
312 Ok(())
313 }
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::fs_to_json;
320 use crate::pipeline::artifact::json_to_fs;
321 use serde_json::Value;
322 use std::path::Path;
323
324 fn make_abs(root: &Path, value: Value) -> Value {
325 match value {
326 Value::String(v) => Value::String(
327 std::path::absolute(root.join(v))
328 .unwrap()
329 .into_os_string()
330 .into_string()
331 .ok()
332 .unwrap(),
333 ),
334 Value::Array(values) => {
335 Value::Array(values.into_iter().map(|v| make_abs(root, v)).collect())
336 }
337 Value::Object(map) => Value::Object(
338 map.into_iter()
339 .map(|(k, v)| (k, make_abs(root, v)))
340 .collect(),
341 ),
342 v => v,
343 }
344 }
345
346 #[test]
347 fn test_fs_to_json() {
348 let dir = tempfile::TempDir::new().unwrap();
349 fs_err::write(dir.path().join("foo"), "").unwrap();
350 fs_err::create_dir(dir.path().join("bar")).unwrap();
351 fs_err::write(dir.path().join("bar/baz"), "").unwrap();
352 fs_err::create_dir(dir.path().join("bar/quux")).unwrap();
353 fs_err::write(dir.path().join("bar/.artifact-dir.quux"), "").unwrap();
354 fs_err::write(dir.path().join("bar/quux/0"), "").unwrap();
355 fs_err::write(dir.path().join("bar/quux/1"), "").unwrap();
356 let json = fs_to_json(dir.path()).unwrap();
357 let expected = make_abs(
358 dir.path(),
359 serde_json::json!({
360 "foo": "foo",
361 "bar": {
362 "baz": "bar/baz",
363 "quux": "bar/quux",
364 }
365 }),
366 );
367 assert_eq!(json, expected);
368 }
369
370 #[test]
371 fn test_json_to_fs() {
372 let f = tempfile::NamedTempFile::new().unwrap();
373 let f_path = f.path().to_str().unwrap();
374
375 let d = tempfile::TempDir::new().unwrap();
376 fs_err::write(d.path().join("foo"), "").unwrap();
377 fs_err::create_dir(d.path().join("bar")).unwrap();
378 fs_err::write(d.path().join("bar/baz"), "").unwrap();
379 let d_path = d.path().to_str().unwrap();
380
381 let json = serde_json::json!({
382 "foo": f_path,
383 "bar": {
384 "baz": f_path,
385 "quux": d_path,
386 }
387 });
388 let dir = tempfile::TempDir::new().unwrap();
389 json_to_fs(json, dir.path()).unwrap();
390 let assert_exists = |p: &str| {
391 let is_dir = p.ends_with('/');
392 let m = fs_err::metadata(dir.path().join(p)).unwrap();
393 if is_dir {
394 assert!(m.is_dir(), "file {p} is not a directory");
395 } else {
396 assert!(m.is_file(), "file {p} is not a file");
397 }
398 };
399 assert_exists("foo");
400 assert_exists("bar/");
401 assert_exists("bar/baz");
402 assert_exists("bar/quux/");
403 assert_exists("bar/.artifact-dir.quux");
404 }
405}