flowey_cli/pipeline_resolver/
common_yaml.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Shared functionality for emitting a pipeline as ADO/GitHub YAML files
5
6use crate::cli::exec_snippet::FloweyPipelineStaticDb;
7use crate::cli::pipeline::CheckMode;
8use crate::pipeline_resolver::generic::ResolvedPipelineJob;
9use anyhow::Context;
10use flowey_core::node::FlowArch;
11use flowey_core::node::FlowPlatform;
12use petgraph::visit::EdgeRef;
13use serde::Serialize;
14use serde_yaml::Value;
15use std::collections::BTreeMap;
16use std::collections::BTreeSet;
17use std::io::Write;
18use std::path::Path;
19
20#[derive(Debug)]
21pub(crate) enum FloweySource {
22    // bool indicates if this node should publish the flowey it bootstraps for
23    // other nodes to consume
24    Bootstrap(String, bool),
25    Consume(String),
26}
27
28/// each job has one of three "roles" when it comes to bootstrapping flowey:
29///
30/// 1. Build flowey
31/// 2. Building _and_ publishing flowey
32/// 3. Consuming a pre-built flowey
33///
34/// We _could_ just have every bootstrap job also publish flowey, but this
35/// will spam the artifact feed with artifacts no one will consume, which is
36/// wasteful.
37///
38/// META: why go through all this hassle anyways? i.e: why not just do
39/// something dead simple like:
40///
41/// - discover which platforms exist in the graph
42/// - have the first jobs of every pipeline be standalone "bootstrap flowey"
43///   jobs, which all subsequent jobs of a certain platform can take a dep on
44///
45/// well... it turns out that provisioning job runners is _sloooooow_,
46/// and having every single pipeline run these "bootstrap flowey" steps
47/// gating the rest of the "interesting" stuff would really stink.
48///
49/// i.e: it's better to do redundant flowey bootstraps if it means that we
50/// can avoid the extra time it takes to tear down + re-provision a worker.
51pub(crate) fn job_flowey_bootstrap_source(
52    graph: &petgraph::Graph<ResolvedPipelineJob, ()>,
53    order: &Vec<petgraph::prelude::NodeIndex>,
54) -> BTreeMap<petgraph::prelude::NodeIndex, FloweySource> {
55    let mut bootstrapped_flowey = BTreeMap::new();
56
57    // the first traversal builds a list of all ancestors of a give node
58    let mut ancestors = BTreeMap::<
59        petgraph::prelude::NodeIndex,
60        BTreeSet<(petgraph::prelude::NodeIndex, FlowPlatform, FlowArch)>,
61    >::new();
62    for idx in order {
63        for ancestor_idx in graph
64            .edges_directed(*idx, petgraph::Direction::Incoming)
65            .map(|e| e.source())
66        {
67            ancestors.entry(*idx).or_default().insert((
68                ancestor_idx,
69                graph[ancestor_idx].platform,
70                graph[ancestor_idx].arch,
71            ));
72
73            if let Some(set) = ancestors.get(&ancestor_idx).cloned() {
74                ancestors.get_mut(idx).unwrap().extend(&set);
75            }
76        }
77    }
78
79    // the second traversal assigns roles to each node
80    let mut floweyno = 0;
81    'outer: for idx in order {
82        let ancestors = ancestors.remove(idx).unwrap_or_default();
83
84        let mut elect_bootstrap = None;
85
86        for (ancestor_idx, platform, arch) in ancestors {
87            if platform != graph[*idx].platform || arch != graph[*idx].arch {
88                continue;
89            }
90
91            let role =
92                bootstrapped_flowey
93                    .get_mut(&ancestor_idx)
94                    .and_then(|existing| match existing {
95                        FloweySource::Bootstrap(s, true) => Some(FloweySource::Consume(s.clone())),
96                        FloweySource::Consume(s) => Some(FloweySource::Consume(s.clone())),
97                        // there is an ancestor that is building, but not
98                        // publishing. maybe they should get upgraded...
99                        FloweySource::Bootstrap(_, false) => {
100                            elect_bootstrap = Some(ancestor_idx);
101                            None
102                        }
103                    });
104
105            if let Some(role) = role {
106                bootstrapped_flowey.insert(*idx, role);
107                continue 'outer;
108            }
109        }
110
111        // if we got here, that means we couldn't find a valid ancestor.
112        //
113        // check if we can upgrade an existing ancestor vs. bootstrapping
114        // things ourselves
115        if let Some(elect_bootstrap) = elect_bootstrap {
116            let FloweySource::Bootstrap(s, publish) =
117                bootstrapped_flowey.get_mut(&elect_bootstrap).unwrap()
118            else {
119                unreachable!()
120            };
121
122            *publish = true;
123            let s = s.clone();
124
125            bootstrapped_flowey.insert(*idx, FloweySource::Consume(s));
126        } else {
127            // Having this extra unique `floweyno` per bootstrap is
128            // necessary since GitHub doesn't let you double-publish an
129            // artifact with the same name
130            floweyno += 1;
131            let platform = graph[*idx].platform;
132            let arch = graph[*idx].arch;
133            bootstrapped_flowey.insert(
134                *idx,
135                FloweySource::Bootstrap(
136                    format!("_internal-flowey-bootstrap-{arch}-{platform}-uid-{floweyno}"),
137                    false,
138                ),
139            );
140        }
141    }
142
143    bootstrapped_flowey
144}
145
146/// convert `pipeline` to YAML and `pipeline_static_db` to JSON.
147/// if `check` is `Some`, then we will compare the generated YAML and JSON
148/// against the contents of `check` and error if they don't match.
149/// if `check` is `None`, then we will write the generated YAML and JSON to
150/// `repo_root/pipeline_file.yaml` and `repo_root/pipeline_file.json` respectively.
151fn check_or_write_generated_yaml_and_json<T>(
152    pipeline: &T,
153    pipeline_static_db: &FloweyPipelineStaticDb,
154    mode: CheckMode,
155    repo_root: &Path,
156    pipeline_file: &Path,
157    ado_post_process_yaml_cb: Option<Box<dyn FnOnce(Value) -> Value>>,
158) -> anyhow::Result<()>
159where
160    T: Serialize,
161{
162    let generated_yaml =
163        serde_yaml::to_value(pipeline).context("while serializing pipeline yaml")?;
164    let generated_yaml = if let Some(ado_post_process_yaml_cb) = ado_post_process_yaml_cb {
165        ado_post_process_yaml_cb(generated_yaml)
166    } else {
167        generated_yaml
168    };
169
170    let generated_yaml =
171        serde_yaml::to_string(&generated_yaml).context("while emitting pipeline yaml")?;
172    let generated_yaml = format!(
173        r#"
174##############################
175# THIS FILE IS AUTOGENERATED #
176#    DO NOT MANUALLY EDIT    #
177##############################
178{generated_yaml}"#
179    );
180    let generated_yaml = generated_yaml.trim_start();
181
182    let generated_json =
183        serde_json::to_string_pretty(pipeline_static_db).context("while emitting pipeline json")?;
184
185    match mode {
186        CheckMode::Runtime(ref check_file) | CheckMode::Check(ref check_file) => {
187            let existing_yaml = fs_err::read_to_string(check_file)
188                .context("cannot check pipeline that doesn't exist!")?;
189
190            let yaml_out_of_date = existing_yaml != generated_yaml;
191
192            if yaml_out_of_date {
193                println!(
194                    "generated yaml {}:\n==========\n{generated_yaml}",
195                    generated_yaml.len()
196                );
197                println!(
198                    "existing yaml {}:\n==========\n{existing_yaml}",
199                    existing_yaml.len()
200                );
201            }
202
203            if yaml_out_of_date {
204                anyhow::bail!("checked in pipeline YAML is out of date! run `cargo xflowey regen`")
205            }
206
207            // Only write the JSON if we're in runtime mode, not in check mode
208            if let CheckMode::Runtime(_) = mode {
209                let mut f = fs_err::File::create(check_file.with_extension("json"))?;
210                f.write_all(generated_json.as_bytes())
211                    .context("while emitting pipeline database json")?;
212            }
213
214            Ok(())
215        }
216        CheckMode::None => {
217            let out_yaml_path = repo_root.join(pipeline_file);
218
219            let mut f = fs_err::File::create(out_yaml_path)?;
220            f.write_all(generated_yaml.as_bytes())
221                .context("while emitting pipeline yaml")?;
222
223            Ok(())
224        }
225    }
226}
227
228/// See [`check_or_write_generated_yaml_and_json`]
229pub(crate) fn check_generated_yaml_and_json<T>(
230    pipeline: &T,
231    pipeline_static_db: &FloweyPipelineStaticDb,
232    check: CheckMode,
233    repo_root: &Path,
234    pipeline_file: &Path,
235    ado_post_process_yaml_cb: Option<Box<dyn FnOnce(Value) -> Value>>,
236) -> anyhow::Result<()>
237where
238    T: Serialize,
239{
240    check_or_write_generated_yaml_and_json(
241        pipeline,
242        pipeline_static_db,
243        check,
244        repo_root,
245        pipeline_file,
246        ado_post_process_yaml_cb,
247    )
248}
249
250/// See [`check_or_write_generated_yaml_and_json`]
251pub(crate) fn write_generated_yaml_and_json<T>(
252    pipeline: &T,
253    pipeline_static_db: &FloweyPipelineStaticDb,
254    repo_root: &Path,
255    pipeline_file: &Path,
256    ado_post_process_yaml_cb: Option<Box<dyn FnOnce(Value) -> Value>>,
257) -> anyhow::Result<()>
258where
259    T: Serialize,
260{
261    check_or_write_generated_yaml_and_json(
262        pipeline,
263        pipeline_static_db,
264        CheckMode::None,
265        repo_root,
266        pipeline_file,
267        ado_post_process_yaml_cb,
268    )
269}
270
271/// Merges a list of bash commands into a single YAML step.
272pub(crate) struct BashCommands {
273    commands: Vec<String>,
274    label: Option<String>,
275    can_merge: bool,
276    github: bool,
277}
278
279impl BashCommands {
280    pub fn new_github() -> Self {
281        Self {
282            commands: Vec::new(),
283            label: None,
284            can_merge: true,
285            github: true,
286        }
287    }
288
289    pub fn new_ado() -> Self {
290        Self {
291            commands: Vec::new(),
292            label: None,
293            can_merge: true,
294            github: false,
295        }
296    }
297
298    #[must_use]
299    pub fn push(
300        &mut self,
301        label: Option<String>,
302        can_merge: bool,
303        mut cmd: String,
304    ) -> Option<Value> {
305        let val = if !can_merge && !self.can_merge {
306            self.flush()
307        } else {
308            None
309        };
310        if !can_merge || self.label.is_none() {
311            self.label = label;
312        }
313        cmd.truncate(cmd.trim_end().len());
314        self.commands.push(cmd);
315        self.can_merge &= can_merge;
316        val
317    }
318
319    pub fn push_minor(&mut self, cmd: String) {
320        assert!(self.push(None, true, cmd).is_none());
321    }
322
323    #[must_use]
324    pub fn flush(&mut self) -> Option<Value> {
325        if self.commands.is_empty() {
326            return None;
327        }
328        let label = if self.commands.len() == 1 || !self.can_merge {
329            self.label.take()
330        } else {
331            None
332        };
333        let label = label.unwrap_or_else(|| "🦀 flowey rust steps".into());
334        let map = if self.github {
335            let commands = self.commands.join("\n");
336            serde_yaml::Mapping::from_iter([
337                ("name".into(), label.into()),
338                ("run".into(), commands.into()),
339                ("shell".into(), "bash".into()),
340            ])
341        } else {
342            let commands = if self.commands.len() == 1 {
343                self.commands.drain(..).next().unwrap()
344            } else {
345                // ADO doesn't automatically fail on error on multi-line scripts.
346                self.commands.insert(0, "set -e".into());
347                self.commands.join("\n")
348            };
349            serde_yaml::Mapping::from_iter([
350                ("bash".into(), commands.into()),
351                ("displayName".into(), label.into()),
352            ])
353        };
354        self.commands.clear();
355        self.can_merge = true;
356        Some(map.into())
357    }
358}