flowey_core/pipeline/
artifact.rs1use anyhow::Context as _;
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use std::path::Path;
11use std::path::PathBuf;
12
13pub trait Artifact: Serialize + DeserializeOwned {
60 const TAR_GZ_NAME: Option<&'static str> = None;
71}
72
73pub trait ArtifactType: Serialize + DeserializeOwned + Ord + Clone {
75 fn name(&self, flavor: Option<&str>, suffix: Option<&str>) -> String;
77}
78
79fn json_to_fs(value: serde_json::Value, path: &Path) -> anyhow::Result<()> {
80 if let serde_json::Value::Object(map) = value {
81 json_to_fs_inner(map, path)
82 } else {
83 anyhow::bail!("expected JSON object");
84 }
85}
86
87fn json_to_fs_inner(
88 value: serde_json::Map<String, serde_json::Value>,
89 root: &Path,
90) -> anyhow::Result<()> {
91 for (key, value) in value {
92 let path = root.join(key);
93 match value {
94 serde_json::Value::Object(map) => {
95 fs_err::create_dir_all(&path)?;
96 json_to_fs_inner(map, &path)?;
97 }
98 serde_json::Value::String(src_path) => {
99 let src_path = Path::new(&src_path);
100 if src_path.is_dir() {
101 crate::util::copy_dir_all(src_path, &path)?;
102 fs_err::File::create(tag_path(path))?;
105 } else {
106 fs_err::copy(src_path, &path)?;
107 }
108 }
109 _ => {
110 anyhow::bail!("unsupported JSON value type");
111 }
112 }
113 }
114 Ok(())
115}
116
117fn fs_to_json(root: &Path) -> anyhow::Result<serde_json::Value> {
118 let mut map = serde_json::Map::new();
119 for entry in fs_err::read_dir(std::path::absolute(root)?)? {
120 let entry = entry?;
121 let path = entry.path();
122 let file_name = entry
123 .file_name()
124 .into_string()
125 .ok()
126 .context("non-utf8 filename")?;
127 let recurse = if path.is_dir() {
128 !tag_path(path.clone()).exists()
129 } else if is_tag_path(&path) {
130 continue;
131 } else {
132 false
133 };
134
135 let value = if recurse {
136 fs_to_json(&path)?
137 } else {
138 let path = path
139 .into_os_string()
140 .into_string()
141 .ok()
142 .context("non-utf8 path")?;
143 serde_json::Value::String(path)
144 };
145 map.insert(file_name, value);
146 }
147 Ok(serde_json::Value::Object(map))
148}
149
150fn is_tag_path(path: &Path) -> bool {
151 path.file_name()
152 .and_then(|name| name.to_str())
153 .is_some_and(|name| name.starts_with(".artifact-dir."))
154}
155
156fn tag_path(mut path: PathBuf) -> PathBuf {
157 let file_name = path.file_name().unwrap().to_str().unwrap();
158 path.set_file_name(format!(".artifact-dir.{file_name}"));
159 path
160}
161
162#[expect(unsafe_code)]
164pub mod publish {
165 use super::Artifact;
166 use crate::flowey_request;
167 use crate::new_simple_flow_node;
168 use crate::node::ClaimVar;
169 use crate::node::ReadVar;
170 use crate::node::SideEffect;
171 use crate::node::SimpleFlowNode;
172 use crate::node::WriteVar;
173 use std::path::PathBuf;
174
175 new_simple_flow_node!(struct Node);
176
177 flowey_request! {
178 pub struct Request {
179 value: ReadVar<serde_json::Value>,
180 path: ReadVar<PathBuf>,
181 tar_gz_name: Option<String>,
182 done: WriteVar<SideEffect>,
183 }
184 }
185
186 impl Request {
187 pub fn new<T: Artifact>(
188 value: ReadVar<T>,
189 path: ReadVar<PathBuf>,
190 done: WriteVar<SideEffect>,
191 ) -> Self {
192 Self {
193 value: value.into_json(),
194 path,
195 tar_gz_name: T::TAR_GZ_NAME.map(ToOwned::to_owned),
196 done,
197 }
198 }
199 }
200
201 impl SimpleFlowNode for Node {
202 type Request = Request;
203
204 fn imports(_ctx: &mut crate::node::ImportCtx<'_>) {}
205
206 fn process_request(
207 request: Self::Request,
208 ctx: &mut crate::node::NodeCtx<'_>,
209 ) -> anyhow::Result<()> {
210 let Request {
211 value,
212 path,
213 tar_gz_name,
214 done,
215 } = request;
216
217 ctx.emit_minor_rust_step("🌼 copy artifact contents", |ctx| {
218 let path = path.claim(ctx);
219 let value = value.claim(ctx);
220 done.claim(ctx);
221 |rt| {
222 let path = rt.read(path);
223 let value = rt.read(value);
224 if let Some(tar_gz_name) = tar_gz_name {
225 super::json_to_fs(value, ".".as_ref())
226 .expect("failed to copy artifact contents");
227 let tar_gz_path = path.join(tar_gz_name);
228 let r = std::process::Command::new("tar")
229 .arg("-acf")
230 .arg(&tar_gz_path)
231 .arg(".")
232 .output()
233 .expect("failed to launch tar");
234 if !r.status.success() {
235 panic!("failed to archive artifact contents: {r:?}");
236 }
237 } else {
238 super::json_to_fs(value, &path).expect("failed to copy artifact contents");
239 }
240 }
241 });
242 Ok(())
243 }
244 }
245}
246
247#[expect(unsafe_code)]
249pub mod resolve {
250 use super::Artifact;
251 use crate::flowey_request;
252 use crate::new_simple_flow_node;
253 use crate::node::ClaimVar;
254 use crate::node::ReadVar;
255 use crate::node::SimpleFlowNode;
256 use crate::node::WriteVar;
257 use std::path::PathBuf;
258
259 new_simple_flow_node!(struct Node);
260
261 flowey_request! {
262 pub struct Request {
263 path: ReadVar<PathBuf>,
264 tar_gz_name: Option<String>,
265 result: WriteVar<serde_json::Value>,
266 }
267 }
268
269 impl Request {
270 pub fn new<T: Artifact>(path: ReadVar<PathBuf>, result: WriteVar<T>) -> Self {
271 Self {
272 path,
273 tar_gz_name: T::TAR_GZ_NAME.map(ToOwned::to_owned),
274 result: result.into_json(),
275 }
276 }
277 }
278
279 impl SimpleFlowNode for Node {
280 type Request = Request;
281
282 fn imports(_ctx: &mut crate::node::ImportCtx<'_>) {}
283
284 fn process_request(
285 request: Self::Request,
286 ctx: &mut crate::node::NodeCtx<'_>,
287 ) -> anyhow::Result<()> {
288 let Request {
289 path,
290 tar_gz_name,
291 result,
292 } = request;
293
294 ctx.emit_minor_rust_step("🌼 resolve artifact", |ctx| {
295 let path = path.claim(ctx);
296 let result = result.claim(ctx);
297 |rt| {
298 let path = rt.read(path);
299 let path = if let Some(tar_gz_name) = tar_gz_name {
300 let tar_gz_path = path.join(tar_gz_name);
301 let r = std::process::Command::new("tar")
302 .arg("-xf")
303 .arg(&tar_gz_path)
304 .output()
305 .expect("failed to launch tar");
306 if !r.status.success() {
307 panic!("failed to extract artifact contents: {r:?}");
308 }
309 ".".as_ref()
310 } else {
311 path.as_ref()
312 };
313 let value = super::fs_to_json(path).expect("failed to read artifact contents");
314 rt.write(result, &value);
315 }
316 });
317
318 Ok(())
319 }
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::fs_to_json;
326 use crate::pipeline::artifact::json_to_fs;
327 use serde_json::Value;
328 use std::path::Path;
329
330 fn make_abs(root: &Path, value: Value) -> Value {
331 match value {
332 Value::String(v) => Value::String(
333 std::path::absolute(root.join(v))
334 .unwrap()
335 .into_os_string()
336 .into_string()
337 .ok()
338 .unwrap(),
339 ),
340 Value::Array(values) => {
341 Value::Array(values.into_iter().map(|v| make_abs(root, v)).collect())
342 }
343 Value::Object(map) => Value::Object(
344 map.into_iter()
345 .map(|(k, v)| (k, make_abs(root, v)))
346 .collect(),
347 ),
348 v => v,
349 }
350 }
351
352 #[test]
353 fn test_fs_to_json() {
354 let dir = tempfile::TempDir::new().unwrap();
355 fs_err::write(dir.path().join("foo"), "").unwrap();
356 fs_err::create_dir(dir.path().join("bar")).unwrap();
357 fs_err::write(dir.path().join("bar/baz"), "").unwrap();
358 fs_err::create_dir(dir.path().join("bar/quux")).unwrap();
359 fs_err::write(dir.path().join("bar/.artifact-dir.quux"), "").unwrap();
360 fs_err::write(dir.path().join("bar/quux/0"), "").unwrap();
361 fs_err::write(dir.path().join("bar/quux/1"), "").unwrap();
362 let json = fs_to_json(dir.path()).unwrap();
363 let expected = make_abs(
364 dir.path(),
365 serde_json::json!({
366 "foo": "foo",
367 "bar": {
368 "baz": "bar/baz",
369 "quux": "bar/quux",
370 }
371 }),
372 );
373 assert_eq!(json, expected);
374 }
375
376 #[test]
377 fn test_json_to_fs() {
378 let f = tempfile::NamedTempFile::new().unwrap();
379 let f_path = f.path().to_str().unwrap();
380
381 let d = tempfile::TempDir::new().unwrap();
382 fs_err::write(d.path().join("foo"), "").unwrap();
383 fs_err::create_dir(d.path().join("bar")).unwrap();
384 fs_err::write(d.path().join("bar/baz"), "").unwrap();
385 let d_path = d.path().to_str().unwrap();
386
387 let json = serde_json::json!({
388 "foo": f_path,
389 "bar": {
390 "baz": f_path,
391 "quux": d_path,
392 }
393 });
394 let dir = tempfile::TempDir::new().unwrap();
395 json_to_fs(json, dir.path()).unwrap();
396 let assert_exists = |p: &str| {
397 let is_dir = p.ends_with('/');
398 let m = fs_err::metadata(dir.path().join(p)).unwrap();
399 if is_dir {
400 assert!(m.is_dir(), "file {p} is not a directory");
401 } else {
402 assert!(m.is_file(), "file {p} is not a file");
403 }
404 };
405 assert_exists("foo");
406 assert_exists("bar/");
407 assert_exists("bar/baz");
408 assert_exists("bar/quux/");
409 assert_exists("bar/.artifact-dir.quux");
410 }
411}