flowey_core/pipeline/
artifact.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Internal nodes for publishing (well, preparing to publish) and resolving
5//! artifacts.
6
7use anyhow::Context as _;
8use serde::Serialize;
9use serde::de::DeserializeOwned;
10use std::path::Path;
11use std::path::PathBuf;
12
13/// A trait representing a collection of files that can be published to or
14/// resolved from a pipeline artifact.
15///
16/// This can be used with `publish_typed_artifact` and `resolve_typed_artifact`
17/// to publish or resolve artifacts between jobs in a pipeline in a structured
18/// way.
19///
20/// By implementing this trait, you are guaranteeing that the type serializes
21/// into JSON in a format reflecting a directory structure, where each key is a
22/// file name and each value is either a string containing the path to the file,
23/// or another JSON object representing a subdirectory.
24///
25/// For example, you might have Rust types like this:
26/// ```rust
27/// # use serde::{Serialize, Deserialize};
28/// # use std::path::PathBuf;
29/// #[derive(Serialize, Deserialize)]
30/// struct Artifact {
31///     #[serde(rename = "file.exe")]
32///     file: PathBuf,
33///     subdir: Option<Inner>,
34/// }
35///
36/// #[derive(Serialize, Deserialize)]
37/// struct Inner {
38///     #[serde(rename = "file2.exe")]
39///     file2: PathBuf,
40/// }
41/// ```
42///
43/// This would serialize into JSON like this:
44/// ```json
45/// {
46///    "file.exe": "path/to/file.exe",
47///   "subdir": {
48///       "file2.exe": "path/to/file2.exe"
49///   }
50/// }
51/// ```
52///
53/// Which would in turn reflect a directory structure like this:
54/// ```text
55/// - file.exe
56/// - subdir/
57///   - file2.exe
58/// ```
59pub trait Artifact: Serialize + DeserializeOwned {
60    /// If present, the published artifact should consist of a tar.gz file
61    /// containing the contents of the artifact.
62    ///
63    /// This is mostly useful for artifacts with lots of files. Some backends
64    /// (specifically Azure DevOps) apparently cannot cope with this.
65    ///
66    /// An alternate approach would be to detect this automatically, and/or to
67    /// only do it for the affected backends. Currently, we don't bother with
68    /// this complexity, preferring instead a predictable and consistent
69    /// approach.
70    const TAR_GZ_NAME: Option<&'static str> = None;
71}
72
73fn json_to_fs(value: serde_json::Value, path: &Path) -> anyhow::Result<()> {
74    if let serde_json::Value::Object(map) = value {
75        json_to_fs_inner(map, path)
76    } else {
77        anyhow::bail!("expected JSON object");
78    }
79}
80
81fn json_to_fs_inner(
82    value: serde_json::Map<String, serde_json::Value>,
83    root: &Path,
84) -> anyhow::Result<()> {
85    for (key, value) in value {
86        let path = root.join(key);
87        match value {
88            serde_json::Value::Object(map) => {
89                fs_err::create_dir_all(&path)?;
90                json_to_fs_inner(map, &path)?;
91            }
92            serde_json::Value::String(src_path) => {
93                let src_path = Path::new(&src_path);
94                if src_path.is_dir() {
95                    crate::util::copy_dir_all(src_path, &path)?;
96                    // Write a tag file so that `fs_to_json` knows that this is
97                    // an opaque directory.
98                    fs_err::File::create(tag_path(path))?;
99                } else {
100                    fs_err::copy(src_path, &path)?;
101                }
102            }
103            _ => {
104                anyhow::bail!("unsupported JSON value type");
105            }
106        }
107    }
108    Ok(())
109}
110
111fn fs_to_json(root: &Path) -> anyhow::Result<serde_json::Value> {
112    let mut map = serde_json::Map::new();
113    for entry in fs_err::read_dir(std::path::absolute(root)?)? {
114        let entry = entry?;
115        let path = entry.path();
116        let file_name = entry
117            .file_name()
118            .into_string()
119            .ok()
120            .context("non-utf8 filename")?;
121        let recurse = if path.is_dir() {
122            !tag_path(path.clone()).exists()
123        } else if is_tag_path(&path) {
124            continue;
125        } else {
126            false
127        };
128
129        let value = if recurse {
130            fs_to_json(&path)?
131        } else {
132            let path = path
133                .into_os_string()
134                .into_string()
135                .ok()
136                .context("non-utf8 path")?;
137            serde_json::Value::String(path)
138        };
139        map.insert(file_name, value);
140    }
141    Ok(serde_json::Value::Object(map))
142}
143
144fn is_tag_path(path: &Path) -> bool {
145    path.file_name()
146        .and_then(|name| name.to_str())
147        .is_some_and(|name| name.starts_with(".artifact-dir."))
148}
149
150fn tag_path(mut path: PathBuf) -> PathBuf {
151    let file_name = path.file_name().unwrap().to_str().unwrap();
152    path.set_file_name(format!(".artifact-dir.{file_name}"));
153    path
154}
155
156// UNSAFETY: Needed to invoke new_simple_flow_node! in the same crate as it is defined.
157#[expect(unsafe_code)]
158pub mod publish {
159    use super::Artifact;
160    use crate::flowey_request;
161    use crate::new_simple_flow_node;
162    use crate::node::ClaimVar;
163    use crate::node::ReadVar;
164    use crate::node::SideEffect;
165    use crate::node::SimpleFlowNode;
166    use crate::node::WriteVar;
167    use std::path::PathBuf;
168
169    new_simple_flow_node!(struct Node);
170
171    flowey_request! {
172        pub struct Request {
173            value: ReadVar<serde_json::Value>,
174            path: ReadVar<PathBuf>,
175            tar_gz_name: Option<String>,
176            done: WriteVar<SideEffect>,
177        }
178    }
179
180    impl Request {
181        pub fn new<T: Artifact>(
182            value: ReadVar<T>,
183            path: ReadVar<PathBuf>,
184            done: WriteVar<SideEffect>,
185        ) -> Self {
186            Self {
187                value: value.into_json(),
188                path,
189                tar_gz_name: T::TAR_GZ_NAME.map(ToOwned::to_owned),
190                done,
191            }
192        }
193    }
194
195    impl SimpleFlowNode for Node {
196        type Request = Request;
197
198        fn imports(_ctx: &mut crate::node::ImportCtx<'_>) {}
199
200        fn process_request(
201            request: Self::Request,
202            ctx: &mut crate::node::NodeCtx<'_>,
203        ) -> anyhow::Result<()> {
204            let Request {
205                value,
206                path,
207                tar_gz_name,
208                done,
209            } = request;
210
211            ctx.emit_minor_rust_step("🌼 copy artifact contents", |ctx| {
212                let path = path.claim(ctx);
213                let value = value.claim(ctx);
214                done.claim(ctx);
215                |rt| {
216                    let path = rt.read(path);
217                    let value = rt.read(value);
218                    if let Some(tar_gz_name) = tar_gz_name {
219                        super::json_to_fs(value, ".".as_ref())
220                            .expect("failed to copy artifact contents");
221                        let tar_gz_path = path.join(tar_gz_name);
222                        let r = std::process::Command::new("tar")
223                            .arg("-acf")
224                            .arg(&tar_gz_path)
225                            .arg(".")
226                            .output()
227                            .expect("failed to launch tar");
228                        if !r.status.success() {
229                            panic!("failed to archive artifact contents: {r:?}");
230                        }
231                    } else {
232                        super::json_to_fs(value, &path).expect("failed to copy artifact contents");
233                    }
234                }
235            });
236            Ok(())
237        }
238    }
239}
240
241// UNSAFETY: Needed to invoke new_simple_flow_node! in the same crate as it is defined.
242#[expect(unsafe_code)]
243pub mod resolve {
244    use super::Artifact;
245    use crate::flowey_request;
246    use crate::new_simple_flow_node;
247    use crate::node::ClaimVar;
248    use crate::node::ReadVar;
249    use crate::node::SimpleFlowNode;
250    use crate::node::WriteVar;
251    use std::path::PathBuf;
252
253    new_simple_flow_node!(struct Node);
254
255    flowey_request! {
256        pub struct Request {
257            path: ReadVar<PathBuf>,
258            tar_gz_name: Option<String>,
259            result: WriteVar<serde_json::Value>,
260        }
261    }
262
263    impl Request {
264        pub fn new<T: Artifact>(path: ReadVar<PathBuf>, result: WriteVar<T>) -> Self {
265            Self {
266                path,
267                tar_gz_name: T::TAR_GZ_NAME.map(ToOwned::to_owned),
268                result: result.into_json(),
269            }
270        }
271    }
272
273    impl SimpleFlowNode for Node {
274        type Request = Request;
275
276        fn imports(_ctx: &mut crate::node::ImportCtx<'_>) {}
277
278        fn process_request(
279            request: Self::Request,
280            ctx: &mut crate::node::NodeCtx<'_>,
281        ) -> anyhow::Result<()> {
282            let Request {
283                path,
284                tar_gz_name,
285                result,
286            } = request;
287
288            ctx.emit_minor_rust_step("🌼 resolve artifact", |ctx| {
289                let path = path.claim(ctx);
290                let result = result.claim(ctx);
291                |rt| {
292                    let path = rt.read(path);
293                    let path = if let Some(tar_gz_name) = tar_gz_name {
294                        let tar_gz_path = path.join(tar_gz_name);
295                        let r = std::process::Command::new("tar")
296                            .arg("-xf")
297                            .arg(&tar_gz_path)
298                            .output()
299                            .expect("failed to launch tar");
300                        if !r.status.success() {
301                            panic!("failed to extract artifact contents: {r:?}");
302                        }
303                        ".".as_ref()
304                    } else {
305                        path.as_ref()
306                    };
307                    let value = super::fs_to_json(path).expect("failed to read artifact contents");
308                    rt.write(result, &value);
309                }
310            });
311
312            Ok(())
313        }
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use super::fs_to_json;
320    use crate::pipeline::artifact::json_to_fs;
321    use serde_json::Value;
322    use std::path::Path;
323
324    fn make_abs(root: &Path, value: Value) -> Value {
325        match value {
326            Value::String(v) => Value::String(
327                std::path::absolute(root.join(v))
328                    .unwrap()
329                    .into_os_string()
330                    .into_string()
331                    .ok()
332                    .unwrap(),
333            ),
334            Value::Array(values) => {
335                Value::Array(values.into_iter().map(|v| make_abs(root, v)).collect())
336            }
337            Value::Object(map) => Value::Object(
338                map.into_iter()
339                    .map(|(k, v)| (k, make_abs(root, v)))
340                    .collect(),
341            ),
342            v => v,
343        }
344    }
345
346    #[test]
347    fn test_fs_to_json() {
348        let dir = tempfile::TempDir::new().unwrap();
349        fs_err::write(dir.path().join("foo"), "").unwrap();
350        fs_err::create_dir(dir.path().join("bar")).unwrap();
351        fs_err::write(dir.path().join("bar/baz"), "").unwrap();
352        fs_err::create_dir(dir.path().join("bar/quux")).unwrap();
353        fs_err::write(dir.path().join("bar/.artifact-dir.quux"), "").unwrap();
354        fs_err::write(dir.path().join("bar/quux/0"), "").unwrap();
355        fs_err::write(dir.path().join("bar/quux/1"), "").unwrap();
356        let json = fs_to_json(dir.path()).unwrap();
357        let expected = make_abs(
358            dir.path(),
359            serde_json::json!({
360                "foo": "foo",
361                "bar": {
362                    "baz": "bar/baz",
363                    "quux": "bar/quux",
364                }
365            }),
366        );
367        assert_eq!(json, expected);
368    }
369
370    #[test]
371    fn test_json_to_fs() {
372        let f = tempfile::NamedTempFile::new().unwrap();
373        let f_path = f.path().to_str().unwrap();
374
375        let d = tempfile::TempDir::new().unwrap();
376        fs_err::write(d.path().join("foo"), "").unwrap();
377        fs_err::create_dir(d.path().join("bar")).unwrap();
378        fs_err::write(d.path().join("bar/baz"), "").unwrap();
379        let d_path = d.path().to_str().unwrap();
380
381        let json = serde_json::json!({
382            "foo": f_path,
383            "bar": {
384                "baz": f_path,
385                "quux": d_path,
386            }
387        });
388        let dir = tempfile::TempDir::new().unwrap();
389        json_to_fs(json, dir.path()).unwrap();
390        let assert_exists = |p: &str| {
391            let is_dir = p.ends_with('/');
392            let m = fs_err::metadata(dir.path().join(p)).unwrap();
393            if is_dir {
394                assert!(m.is_dir(), "file {p} is not a directory");
395            } else {
396                assert!(m.is_file(), "file {p} is not a file");
397            }
398        };
399        assert_exists("foo");
400        assert_exists("bar/");
401        assert_exists("bar/baz");
402        assert_exists("bar/quux/");
403        assert_exists("bar/.artifact-dir.quux");
404    }
405}