flowey_cli/pipeline_resolver/
common_yaml.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Shared functionality for emitting a pipeline as ADO/GitHub YAML files
5
6use crate::cli::exec_snippet::FloweyPipelineStaticDb;
7use crate::cli::exec_snippet::SerializedRequest;
8use crate::cli::pipeline::CheckMode;
9use crate::pipeline_resolver::generic::ResolvedPipelineJob;
10use anyhow::Context;
11use flowey_core::node::FlowArch;
12use flowey_core::node::FlowPlatform;
13use petgraph::visit::EdgeRef;
14use serde::Serialize;
15use serde_yaml::Value;
16use std::collections::BTreeMap;
17use std::collections::BTreeSet;
18use std::io::Write;
19use std::path::Path;
20
21/// The output of resolving a flow into a sequence of YAML steps (shared by ADO
22/// and GitHub resolvers).
23pub(crate) struct ResolvedFlowSteps {
24    /// The resolved YAML steps for this job.
25    pub steps: Vec<Value>,
26    /// The serialized request database, keyed by node module path.
27    pub request_db: BTreeMap<String, Vec<SerializedRequest>>,
28    /// The serialized config database, keyed by node module path.
29    pub config_db: BTreeMap<String, Vec<SerializedRequest>>,
30}
31
32#[derive(Debug)]
33pub(crate) enum FloweySource {
34    // bool indicates if this node should publish the flowey it bootstraps for
35    // other nodes to consume
36    Bootstrap(String, bool),
37    Consume(String),
38}
39
40/// each job has one of three "roles" when it comes to bootstrapping flowey:
41///
42/// 1. Build flowey
43/// 2. Building _and_ publishing flowey
44/// 3. Consuming a pre-built flowey
45///
46/// We _could_ just have every bootstrap job also publish flowey, but this
47/// will spam the artifact feed with artifacts no one will consume, which is
48/// wasteful.
49///
50/// META: why go through all this hassle anyways? i.e: why not just do
51/// something dead simple like:
52///
53/// - discover which platforms exist in the graph
54/// - have the first jobs of every pipeline be standalone "bootstrap flowey"
55///   jobs, which all subsequent jobs of a certain platform can take a dep on
56///
57/// well... it turns out that provisioning job runners is _sloooooow_,
58/// and having every single pipeline run these "bootstrap flowey" steps
59/// gating the rest of the "interesting" stuff would really stink.
60///
61/// i.e: it's better to do redundant flowey bootstraps if it means that we
62/// can avoid the extra time it takes to tear down + re-provision a worker.
63pub(crate) fn job_flowey_bootstrap_source(
64    graph: &petgraph::Graph<ResolvedPipelineJob, ()>,
65    order: &Vec<petgraph::prelude::NodeIndex>,
66) -> BTreeMap<petgraph::prelude::NodeIndex, FloweySource> {
67    let mut bootstrapped_flowey = BTreeMap::new();
68
69    // the first traversal builds a list of all ancestors of a give node
70    let mut ancestors = BTreeMap::<
71        petgraph::prelude::NodeIndex,
72        BTreeSet<(petgraph::prelude::NodeIndex, FlowPlatform, FlowArch)>,
73    >::new();
74    for idx in order {
75        for ancestor_idx in graph
76            .edges_directed(*idx, petgraph::Direction::Incoming)
77            .map(|e| e.source())
78        {
79            ancestors.entry(*idx).or_default().insert((
80                ancestor_idx,
81                graph[ancestor_idx].platform,
82                graph[ancestor_idx].arch,
83            ));
84
85            if let Some(set) = ancestors.get(&ancestor_idx).cloned() {
86                ancestors.get_mut(idx).unwrap().extend(&set);
87            }
88        }
89    }
90
91    // the second traversal assigns roles to each node
92    let mut floweyno = 0;
93    'outer: for idx in order {
94        let ancestors = ancestors.remove(idx).unwrap_or_default();
95
96        let mut elect_bootstrap = None;
97
98        for (ancestor_idx, platform, arch) in ancestors {
99            if platform != graph[*idx].platform || arch != graph[*idx].arch {
100                continue;
101            }
102
103            let role =
104                bootstrapped_flowey
105                    .get_mut(&ancestor_idx)
106                    .and_then(|existing| match existing {
107                        FloweySource::Bootstrap(s, true) => Some(FloweySource::Consume(s.clone())),
108                        FloweySource::Consume(s) => Some(FloweySource::Consume(s.clone())),
109                        // there is an ancestor that is building, but not
110                        // publishing. maybe they should get upgraded...
111                        FloweySource::Bootstrap(_, false) => {
112                            elect_bootstrap = Some(ancestor_idx);
113                            None
114                        }
115                    });
116
117            if let Some(role) = role {
118                bootstrapped_flowey.insert(*idx, role);
119                continue 'outer;
120            }
121        }
122
123        // if we got here, that means we couldn't find a valid ancestor.
124        //
125        // check if we can upgrade an existing ancestor vs. bootstrapping
126        // things ourselves
127        if let Some(elect_bootstrap) = elect_bootstrap {
128            let FloweySource::Bootstrap(s, publish) =
129                bootstrapped_flowey.get_mut(&elect_bootstrap).unwrap()
130            else {
131                unreachable!()
132            };
133
134            *publish = true;
135            let s = s.clone();
136
137            bootstrapped_flowey.insert(*idx, FloweySource::Consume(s));
138        } else {
139            // Having this extra unique `floweyno` per bootstrap is
140            // necessary since GitHub doesn't let you double-publish an
141            // artifact with the same name
142            floweyno += 1;
143            let platform = graph[*idx].platform;
144            let arch = graph[*idx].arch;
145            bootstrapped_flowey.insert(
146                *idx,
147                FloweySource::Bootstrap(
148                    format!("_internal-flowey-bootstrap-{arch}-{platform}-uid-{floweyno}"),
149                    false,
150                ),
151            );
152        }
153    }
154
155    bootstrapped_flowey
156}
157
158/// convert `pipeline` to YAML and `pipeline_static_db` to JSON.
159/// if `check` is `Some`, then we will compare the generated YAML and JSON
160/// against the contents of `check` and error if they don't match.
161/// if `check` is `None`, then we will write the generated YAML and JSON to
162/// `repo_root/pipeline_file.yaml` and `repo_root/pipeline_file.json` respectively.
163fn check_or_write_generated_yaml_and_json<T>(
164    pipeline: &T,
165    pipeline_static_db: &FloweyPipelineStaticDb,
166    mode: CheckMode,
167    repo_root: &Path,
168    pipeline_file: &Path,
169    ado_post_process_yaml_cb: Option<Box<dyn FnOnce(Value) -> Value>>,
170) -> anyhow::Result<()>
171where
172    T: Serialize,
173{
174    let generated_yaml =
175        serde_yaml::to_value(pipeline).context("while serializing pipeline yaml")?;
176    let generated_yaml = if let Some(ado_post_process_yaml_cb) = ado_post_process_yaml_cb {
177        ado_post_process_yaml_cb(generated_yaml)
178    } else {
179        generated_yaml
180    };
181
182    let generated_yaml =
183        serde_yaml::to_string(&generated_yaml).context("while emitting pipeline yaml")?;
184    let generated_yaml = format!(
185        r#"
186##############################
187# THIS FILE IS AUTOGENERATED #
188#    DO NOT MANUALLY EDIT    #
189##############################
190{generated_yaml}"#
191    );
192    let generated_yaml = generated_yaml.trim_start();
193
194    let generated_json =
195        serde_json::to_string_pretty(pipeline_static_db).context("while emitting pipeline json")?;
196
197    match mode {
198        CheckMode::Runtime(ref check_file) | CheckMode::Check(ref check_file) => {
199            let existing_yaml = fs_err::read_to_string(check_file)
200                .context("cannot check pipeline that doesn't exist!")?;
201
202            let yaml_out_of_date = existing_yaml != generated_yaml;
203
204            if yaml_out_of_date {
205                println!(
206                    "generated yaml {}:\n==========\n{generated_yaml}",
207                    generated_yaml.len()
208                );
209                println!(
210                    "existing yaml {}:\n==========\n{existing_yaml}",
211                    existing_yaml.len()
212                );
213            }
214
215            if yaml_out_of_date {
216                anyhow::bail!("checked in pipeline YAML is out of date! run `cargo xflowey regen`")
217            }
218
219            // Only write the JSON if we're in runtime mode, not in check mode
220            if let CheckMode::Runtime(_) = mode {
221                let mut f = fs_err::File::create(check_file.with_extension("json"))?;
222                f.write_all(generated_json.as_bytes())
223                    .context("while emitting pipeline database json")?;
224            }
225
226            Ok(())
227        }
228        CheckMode::None => {
229            let out_yaml_path = repo_root.join(pipeline_file);
230
231            let mut f = fs_err::File::create(out_yaml_path)?;
232            f.write_all(generated_yaml.as_bytes())
233                .context("while emitting pipeline yaml")?;
234
235            Ok(())
236        }
237    }
238}
239
240/// See [`check_or_write_generated_yaml_and_json`]
241pub(crate) fn check_generated_yaml_and_json<T>(
242    pipeline: &T,
243    pipeline_static_db: &FloweyPipelineStaticDb,
244    check: CheckMode,
245    repo_root: &Path,
246    pipeline_file: &Path,
247    ado_post_process_yaml_cb: Option<Box<dyn FnOnce(Value) -> Value>>,
248) -> anyhow::Result<()>
249where
250    T: Serialize,
251{
252    check_or_write_generated_yaml_and_json(
253        pipeline,
254        pipeline_static_db,
255        check,
256        repo_root,
257        pipeline_file,
258        ado_post_process_yaml_cb,
259    )
260}
261
262/// See [`check_or_write_generated_yaml_and_json`]
263pub(crate) fn write_generated_yaml_and_json<T>(
264    pipeline: &T,
265    pipeline_static_db: &FloweyPipelineStaticDb,
266    repo_root: &Path,
267    pipeline_file: &Path,
268    ado_post_process_yaml_cb: Option<Box<dyn FnOnce(Value) -> Value>>,
269) -> anyhow::Result<()>
270where
271    T: Serialize,
272{
273    check_or_write_generated_yaml_and_json(
274        pipeline,
275        pipeline_static_db,
276        CheckMode::None,
277        repo_root,
278        pipeline_file,
279        ado_post_process_yaml_cb,
280    )
281}
282
283/// Merges a list of bash commands into a single YAML step.
284pub(crate) struct BashCommands {
285    commands: Vec<String>,
286    label: Option<String>,
287    can_merge: bool,
288    github: bool,
289}
290
291impl BashCommands {
292    pub fn new_github() -> Self {
293        Self {
294            commands: Vec::new(),
295            label: None,
296            can_merge: true,
297            github: true,
298        }
299    }
300
301    pub fn new_ado() -> Self {
302        Self {
303            commands: Vec::new(),
304            label: None,
305            can_merge: true,
306            github: false,
307        }
308    }
309
310    #[must_use]
311    pub fn push(
312        &mut self,
313        label: Option<String>,
314        can_merge: bool,
315        mut cmd: String,
316    ) -> Option<Value> {
317        let val = if !can_merge && !self.can_merge {
318            self.flush()
319        } else {
320            None
321        };
322        if !can_merge || self.label.is_none() {
323            self.label = label;
324        }
325        cmd.truncate(cmd.trim_end().len());
326        self.commands.push(cmd);
327        self.can_merge &= can_merge;
328        val
329    }
330
331    pub fn push_minor(&mut self, cmd: String) {
332        assert!(self.push(None, true, cmd).is_none());
333    }
334
335    #[must_use]
336    pub fn flush(&mut self) -> Option<Value> {
337        if self.commands.is_empty() {
338            return None;
339        }
340        let label = if self.commands.len() == 1 || !self.can_merge {
341            self.label.take()
342        } else {
343            None
344        };
345        let label = label.unwrap_or_else(|| "🦀 flowey rust steps".into());
346        let map = if self.github {
347            let commands = self.commands.join("\n");
348            serde_yaml::Mapping::from_iter([
349                ("name".into(), label.into()),
350                ("run".into(), commands.into()),
351                ("shell".into(), "bash".into()),
352            ])
353        } else {
354            let commands = if self.commands.len() == 1 {
355                self.commands.drain(..).next().unwrap()
356            } else {
357                // ADO doesn't automatically fail on error on multi-line scripts.
358                self.commands.insert(0, "set -e".into());
359                self.commands.join("\n")
360            };
361            serde_yaml::Mapping::from_iter([
362                ("bash".into(), commands.into()),
363                ("displayName".into(), label.into()),
364            ])
365        };
366        self.commands.clear();
367        self.can_merge = true;
368        Some(map.into())
369    }
370}