flowey_cli/pipeline_resolver/
generic.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use anyhow::Context;
use flowey_core::node::FlowArch;
use flowey_core::node::FlowPlatform;
use flowey_core::node::NodeHandle;
use flowey_core::node::user_facing::GhPermission;
use flowey_core::node::user_facing::GhPermissionValue;
use flowey_core::patch::ResolvedPatches;
use flowey_core::pipeline::AdoCiTriggers;
use flowey_core::pipeline::AdoPrTriggers;
use flowey_core::pipeline::AdoScheduleTriggers;
use flowey_core::pipeline::GhCiTriggers;
use flowey_core::pipeline::GhPrTriggers;
use flowey_core::pipeline::GhRunner;
use flowey_core::pipeline::GhScheduleTriggers;
use flowey_core::pipeline::Pipeline;
use flowey_core::pipeline::internal::AdoPool;
use flowey_core::pipeline::internal::ArtifactMeta;
use flowey_core::pipeline::internal::InternalAdoResourcesRepository;
use flowey_core::pipeline::internal::Parameter;
use flowey_core::pipeline::internal::ParameterMeta;
use flowey_core::pipeline::internal::PipelineFinalized;
use flowey_core::pipeline::internal::PipelineJobMetadata;
use std::collections::BTreeMap;
use std::collections::BTreeSet;

pub struct ResolvedPipeline {
    pub graph: petgraph::Graph<ResolvedPipelineJob, ()>,
    pub order: Vec<petgraph::prelude::NodeIndex>,
    pub parameters: Vec<Parameter>,
    pub ado_schedule_triggers: Vec<AdoScheduleTriggers>,
    pub ado_name: Option<String>,
    pub ado_ci_triggers: Option<AdoCiTriggers>,
    pub ado_pr_triggers: Option<AdoPrTriggers>,
    pub ado_bootstrap_template: String,
    pub ado_resources_repository: Vec<InternalAdoResourcesRepository>,
    pub ado_post_process_yaml_cb: Option<Box<dyn FnOnce(serde_yaml::Value) -> serde_yaml::Value>>,
    pub ado_variables: BTreeMap<String, String>,
    pub ado_job_id_overrides: BTreeMap<usize, String>,
    pub gh_name: Option<String>,
    pub gh_schedule_triggers: Vec<GhScheduleTriggers>,
    pub gh_ci_triggers: Option<GhCiTriggers>,
    pub gh_pr_triggers: Option<GhPrTriggers>,
    pub gh_bootstrap_template: String,
}

#[derive(Debug, Clone)]
pub struct ResolvedJobArtifact {
    pub flowey_var: String,
    pub name: String,
}

#[derive(Debug, Clone)]
pub struct ResolvedJobUseParameter {
    pub flowey_var: String,
    pub pipeline_param_idx: usize,
}

#[derive(Debug, Clone)] // Clone is because of shoddy viz code
pub struct ResolvedPipelineJob {
    pub root_nodes: BTreeMap<NodeHandle, Vec<Box<[u8]>>>,
    pub patches: ResolvedPatches,
    pub label: String,
    pub platform: FlowPlatform,
    pub arch: FlowArch,
    pub ado_pool: Option<AdoPool>,
    pub ado_variables: BTreeMap<String, String>,
    pub gh_override_if: Option<String>,
    pub gh_global_env: BTreeMap<String, String>,
    pub gh_pool: Option<GhRunner>,
    pub gh_permissions: BTreeMap<NodeHandle, BTreeMap<GhPermission, GhPermissionValue>>,
    pub external_read_vars: BTreeSet<String>,
    pub cond_param_idx: Option<usize>,

    pub parameters_used: Vec<ResolvedJobUseParameter>,
    // correspond to injected download nodes at the start of the job
    pub artifacts_used: Vec<ResolvedJobArtifact>,
    // correspond to injected publish nodes at the end of the job
    pub artifacts_published: Vec<ResolvedJobArtifact>,
}

pub fn resolve_pipeline(pipeline: Pipeline) -> anyhow::Result<ResolvedPipeline> {
    let PipelineFinalized {
        jobs,
        artifacts,
        parameters,
        extra_deps,
        ado_name,
        ado_schedule_triggers,
        ado_ci_triggers,
        ado_pr_triggers,
        ado_bootstrap_template,
        ado_resources_repository,
        ado_post_process_yaml_cb,
        ado_variables,
        ado_job_id_overrides,
        gh_name,
        gh_schedule_triggers,
        gh_ci_triggers,
        gh_pr_triggers,
        gh_bootstrap_template,
    } = PipelineFinalized::from_pipeline(pipeline);

    let mut graph = petgraph::Graph::new();

    let mut job_to_artifacts = {
        let mut m = BTreeMap::<usize, (BTreeSet<String>, BTreeSet<String>)>::new();

        for ArtifactMeta {
            name,
            published_by_job,
            used_by_jobs,
        } in &artifacts
        {
            let no_existing = m
                .entry(
                    published_by_job
                        .context(format!("artifact '{name}' is not published by any job"))?,
                )
                .or_default()
                .0
                .insert(name.clone());
            assert!(no_existing);

            for job_idx in used_by_jobs {
                let no_existing = m.entry(*job_idx).or_default().1.insert(name.clone());
                assert!(no_existing);
            }
        }

        m
    };

    let (parameters, mut job_to_params) = {
        let mut params = Vec::new();
        let mut m = BTreeMap::<usize, BTreeSet<usize>>::new();

        for (
            param_idx,
            ParameterMeta {
                parameter,
                used_by_jobs,
            },
        ) in parameters.into_iter().enumerate()
        {
            params.push(parameter);
            for job_idx in used_by_jobs {
                let no_existing = m.entry(job_idx).or_default().insert(param_idx);
                assert!(no_existing);
            }
        }

        (params, m)
    };

    let mut flowey_bootstrap_platforms = BTreeSet::new();

    // first things first: spin up graph nodes for each job
    let mut job_graph_idx = Vec::new();
    for (
        job_idx,
        PipelineJobMetadata {
            root_nodes,
            patches,
            label,
            platform,
            arch,
            cond_param_idx,
            ado_pool,
            ado_variables,
            gh_override_if,
            gh_global_env,
            gh_pool,
            gh_permissions,
        },
    ) in jobs.into_iter().enumerate()
    {
        let (artifacts_published, artifacts_used) =
            job_to_artifacts.remove(&job_idx).unwrap_or_default();
        let parameters_used = job_to_params.remove(&job_idx).unwrap_or_default();

        let artifacts_published: Vec<_> = artifacts_published
            .into_iter()
            .map(|a| ResolvedJobArtifact {
                flowey_var: flowey_core::pipeline::internal::consistent_artifact_runtime_var_name(
                    &a, false,
                ),
                name: a,
            })
            .collect();
        let artifacts_used: Vec<_> = artifacts_used
            .into_iter()
            .map(|a| ResolvedJobArtifact {
                flowey_var: flowey_core::pipeline::internal::consistent_artifact_runtime_var_name(
                    &a, true,
                ),
                name: a,
            })
            .collect();
        let parameters_used: Vec<_> = parameters_used
            .into_iter()
            .map(|param_idx| ResolvedJobUseParameter {
                flowey_var: parameters[param_idx].name().to_string(),
                pipeline_param_idx: param_idx,
            })
            .collect();

        // individual pipeline resolvers still need to ensure that the var is in
        // the var-db at job start time, but this external-var reporting code
        // can be shared across all impls
        let mut external_read_vars = BTreeSet::new();
        external_read_vars.extend(artifacts_used.iter().map(|a| a.flowey_var.clone()));
        external_read_vars.extend(artifacts_published.iter().map(|a| a.flowey_var.clone()));
        external_read_vars.extend(parameters_used.iter().map(|p| p.flowey_var.clone()));

        let idx = graph.add_node(ResolvedPipelineJob {
            root_nodes,
            patches: patches.finalize(),
            label,
            ado_pool,
            ado_variables,
            gh_override_if,
            gh_global_env,
            gh_pool,
            gh_permissions,
            platform,
            arch,
            cond_param_idx,
            external_read_vars,
            parameters_used,
            artifacts_used,
            artifacts_published,
        });

        // ...also using this opportunity to keep track of what flowey bins we need to bootstrap
        flowey_bootstrap_platforms.insert(platform);

        job_graph_idx.push(idx);
    }

    // next, add node edges based on artifact flow
    for ArtifactMeta {
        name: _,
        published_by_job,
        used_by_jobs,
    } in artifacts
    {
        let published_idx = job_graph_idx[published_by_job.expect("checked in loop above")];
        for job in used_by_jobs {
            let used_idx = job_graph_idx[job];
            graph.add_edge(published_idx, used_idx, ());
        }
    }

    // lastly, add node edges based on any additional explicit dependencies
    for (from, to) in extra_deps {
        graph.add_edge(job_graph_idx[from], job_graph_idx[to], ());
    }

    // TODO: better error handling
    let order = petgraph::algo::toposort(&graph, None)
        .map_err(|_| anyhow::anyhow!("detected cycle in pipeline"))?;

    Ok(ResolvedPipeline {
        graph,
        order,
        parameters,
        ado_name,
        ado_variables,
        ado_schedule_triggers,
        ado_ci_triggers,
        ado_pr_triggers,
        ado_bootstrap_template,
        ado_resources_repository,
        ado_post_process_yaml_cb,
        ado_job_id_overrides,
        gh_name,
        gh_schedule_triggers,
        gh_ci_triggers,
        gh_pr_triggers,
        gh_bootstrap_template,
    })
}

impl ResolvedPipeline {
    /// Trim the pipeline graph to only include the specified jobs (taking care
    /// to also preserve any dependant jobs they rely on).
    pub fn trim_pipeline_graph(&mut self, preserve_jobs: Vec<petgraph::prelude::NodeIndex>) {
        // DEVNOTE: this is a horribly suboptimal way to implement this, but it
        // works fine with the graph-sizes we currently have, so we can optimize
        // this later...

        let mut jobs_to_delete: BTreeSet<_> = self.graph.node_indices().collect();
        for idx in preserve_jobs {
            let g = petgraph::visit::Reversed(&self.graph);

            let mut dfs = petgraph::visit::Dfs::new(g, idx);
            while let Some(save_idx) = dfs.next(g) {
                jobs_to_delete.remove(&save_idx);
            }
        }

        let mut jobs_to_delete = jobs_to_delete.into_iter().collect::<Vec<_>>();
        jobs_to_delete.sort();

        // in petgraph, when you remove a node, it invalidates the node idx of
        // all subsequent nodes.
        //
        // I'm sure there's a better way to do this filtering, but just removing
        // nodes in reverse order seems to work fine.
        for idx in jobs_to_delete.into_iter().rev() {
            self.graph.remove_node(idx).unwrap();
        }

        self.order = petgraph::algo::toposort(&self.graph, None).unwrap();
    }
}