flowey_cli/cli/
pipeline.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4use anyhow::Context;
5use flowey_core::node::FlowBackend;
6use flowey_core::pipeline::IntoPipeline;
7use flowey_core::pipeline::PipelineBackendHint;
8use std::path::Path;
9use std::path::PathBuf;
10
11#[derive(Clone, clap::ValueEnum)]
12pub enum VizModeCli {
13    Toposort,
14    Dot,
15    FlowDot,
16}
17
18pub(crate) enum CheckMode {
19    Runtime(PathBuf),
20    Check(PathBuf),
21    None,
22}
23
24#[derive(clap::Subcommand)]
25enum PipelineBackendCli<P: clap::Subcommand> {
26    /// A locally executable bash script
27    #[clap(subcommand_value_name = "PIPELINE")]
28    #[clap(subcommand_help_heading = "Pipeline")]
29    Bash {
30        /// Output directory to write pipeline scripts to. If the directory
31        /// doesn't exist, it will be created.
32        #[clap(long, default_value = "./flowey-out")]
33        out_dir: PathBuf,
34
35        /// Persistent storage directory shared across multiple runs. If the
36        /// directory doesn't exist, it will be created.
37        #[clap(long, default_value = "./flowey-persist")]
38        persist_dir: PathBuf,
39
40        /// Enable flowey internal debug logs at runtime
41        #[clap(help_heading = "Global Options (flowey)", global = true, long)]
42        runtime_debug_log: bool,
43
44        /// Attempt to run windows jobs on WSL2. This may or may not work,
45        /// depending on if the flowey nodes at play are resilient to running
46        /// in WSL2.
47        #[clap(help_heading = "Global Options (flowey)", global = true, long)]
48        windows_as_wsl: bool,
49
50        #[clap(subcommand)]
51        pipelines: P,
52    },
53    /// An ADO pipeline YAML file
54    #[clap(subcommand_value_name = "PIPELINE")]
55    #[clap(subcommand_help_heading = "Pipeline")]
56    Ado {
57        #[clap(subcommand)]
58        pipelines: P,
59
60        /// disable flowey internal debug logs at runtime
61        #[clap(help_heading = "Global Options (flowey)", global = true, long)]
62        no_runtime_debug_log: bool,
63
64        /// repo-root relative path to generated YAML file
65        #[clap(long)]
66        out: PathBuf,
67
68        /// check that the provided YAML matches the generated YAML.
69        #[clap(long, value_name = "YAML")]
70        check: Option<PathBuf>,
71
72        /// generate the pipeline JSON, also runs check
73        #[clap(long, value_name = "YAML")]
74        runtime: Option<PathBuf>,
75    },
76    /// A GitHub pipeline YAML file
77    #[clap(subcommand_value_name = "PIPELINE")]
78    #[clap(subcommand_help_heading = "Pipeline")]
79    Github {
80        #[clap(subcommand)]
81        pipelines: P,
82
83        /// disable flowey internal debug logs at runtime
84        #[clap(help_heading = "Global Options (flowey)", global = true, long)]
85        no_runtime_debug_log: bool,
86
87        /// repo-root relative path to generated YAML file
88        #[clap(long)]
89        out: PathBuf,
90
91        /// check that the provided YAML matches the generated YAML.
92        #[clap(long, value_name = "YAML")]
93        check: Option<PathBuf>,
94
95        /// generate the pipeline JSON, also runs check
96        #[clap(long, value_name = "YAML", conflicts_with = "check")]
97        runtime: Option<PathBuf>,
98    },
99    /// Run the pipeline directly using flowey
100    Run {
101        #[clap(subcommand)]
102        pipelines: P,
103
104        /// Output directory to emit artifacts into. If the directory
105        /// doesn't exist, it will be created.
106        #[clap(long, default_value = "./flowey-out")]
107        out_dir: PathBuf,
108
109        /// Persistent storage directory shared across multiple runs. If the
110        /// directory doesn't exist, it will be created.
111        #[clap(long, default_value = "./flowey-persist")]
112        persist_dir: PathBuf,
113
114        /// Attempt to run windows jobs on WSL2. This may or may not work,
115        /// depending on if the flowey nodes at play are resilient to running
116        /// in WSL2.
117        #[clap(help_heading = "Global Options (flowey)", global = true, long)]
118        windows_as_wsl: bool,
119    },
120}
121
122/// Generate and Run pipelines.
123#[derive(clap::Args)]
124#[clap(subcommand_help_heading = "Pipeline Kind")]
125#[clap(subcommand_value_name = "PIPELINE_KIND")]
126pub struct Pipeline<P: clap::Subcommand> {
127    /// (debug) Emit a visualization of the output flow, instead of the flow
128    /// itself.
129    #[clap(help_heading = "Global Options (flowey)", global = true, long)]
130    viz_mode: Option<VizModeCli>,
131
132    /// (debug) Filter the pipeline to only include the specified jobs.
133    ///
134    /// At this time, this will _not_ allow running a job without also running
135    /// any jobs it depends on!
136    ///
137    /// Accepts a comma-separated list of "job ids", a list of which can be
138    /// obtained by running `--include-jobs='?'`
139    ///
140    /// NOTE: because this is intended as a debugging tool, there is no
141    /// mechanism to ensure that "job ids" remain stable in the face of pipeline
142    /// updates / flowey updates. i.e: an `--include-jobs` invocation that works
143    /// today may not work after making changes to the pipeline definition /
144    /// updating flowey.
145    #[clap(help_heading = "Global Options (flowey)", global = true, long)]
146    #[expect(clippy::option_option, reason = "for clap derive")]
147    include_jobs: Option<Option<IncludeJobs>>,
148
149    #[clap(subcommand)]
150    project_pipeline: PipelineBackendCli<P>,
151}
152
153#[derive(Clone)]
154enum IncludeJobs {
155    Query,
156    List(Vec<usize>),
157}
158
159impl std::str::FromStr for IncludeJobs {
160    type Err = &'static str;
161
162    fn from_str(s: &str) -> Result<Self, Self::Err> {
163        if s == "?" {
164            return Ok(IncludeJobs::Query);
165        }
166
167        let mut list = Vec::new();
168        for n in s.split(',') {
169            if n == "?" {
170                return Err("can only pass '?' once");
171            }
172
173            list.push(
174                n.parse()
175                    .map_err(|_| "expected comma separated list of numbers")?,
176            );
177        }
178        Ok(IncludeJobs::List(list))
179    }
180}
181
182impl<P: clap::Subcommand + IntoPipeline> Pipeline<P> {
183    pub fn run(self, flowey_crate: &str, repo_root: &Path) -> anyhow::Result<()> {
184        let Self {
185            project_pipeline,
186            viz_mode,
187            include_jobs,
188        } = self;
189
190        match project_pipeline {
191            PipelineBackendCli::Bash {
192                pipelines,
193                out_dir,
194                persist_dir,
195                runtime_debug_log,
196                windows_as_wsl,
197            } => {
198                let mut resolved_pipeline =
199                    resolve_pipeline(pipelines, PipelineBackendHint::Local)?;
200
201                if matches!(
202                    resolve_include_jobs(&mut resolved_pipeline, include_jobs)?,
203                    EarlyExit::Yes
204                ) {
205                    return Ok(());
206                }
207
208                if let Some(viz_mode) = viz_mode {
209                    viz_pipeline(
210                        viz_mode,
211                        resolved_pipeline,
212                        FlowBackend::Local,
213                        crate::running_in_wsl(),
214                    )
215                } else {
216                    let _ = (out_dir, persist_dir, runtime_debug_log, windows_as_wsl);
217                    todo!("bash backend is not actively maintained, and currently broken")
218                }
219            }
220            PipelineBackendCli::Run {
221                pipelines,
222                out_dir,
223                persist_dir,
224                windows_as_wsl,
225            } => {
226                let mut resolved_pipeline =
227                    resolve_pipeline(pipelines, PipelineBackendHint::Local)?;
228
229                if matches!(
230                    resolve_include_jobs(&mut resolved_pipeline, include_jobs)?,
231                    EarlyExit::Yes
232                ) {
233                    return Ok(());
234                }
235
236                if let Some(viz_mode) = viz_mode {
237                    viz_pipeline(
238                        viz_mode,
239                        resolved_pipeline,
240                        FlowBackend::Local,
241                        crate::running_in_wsl(),
242                    )
243                } else {
244                    crate::pipeline_resolver::direct_run::direct_run(
245                        resolved_pipeline,
246                        windows_as_wsl,
247                        out_dir,
248                        persist_dir,
249                    )
250                }
251            }
252            PipelineBackendCli::Ado {
253                pipelines,
254                out,
255                no_runtime_debug_log,
256                check,
257                runtime,
258            } => {
259                let mut resolved_pipeline = resolve_pipeline(pipelines, PipelineBackendHint::Ado)?;
260
261                if matches!(
262                    resolve_include_jobs(&mut resolved_pipeline, include_jobs)?,
263                    EarlyExit::Yes
264                ) {
265                    return Ok(());
266                }
267
268                if let Some(viz_mode) = viz_mode {
269                    viz_pipeline(viz_mode, resolved_pipeline, FlowBackend::Ado, false)
270                } else {
271                    let mode = if let Some(runtime_path) = runtime {
272                        CheckMode::Runtime(runtime_path)
273                    } else if let Some(check_path) = check {
274                        CheckMode::Check(check_path)
275                    } else {
276                        CheckMode::None
277                    };
278
279                    crate::pipeline_resolver::ado_yaml::ado_yaml(
280                        resolved_pipeline,
281                        !no_runtime_debug_log,
282                        repo_root,
283                        &out,
284                        flowey_crate,
285                        mode,
286                    )
287                }
288            }
289            PipelineBackendCli::Github {
290                pipelines,
291                out,
292                no_runtime_debug_log,
293                check,
294                runtime,
295            } => {
296                let mut resolved_pipeline =
297                    resolve_pipeline(pipelines, PipelineBackendHint::Github)?;
298
299                if matches!(
300                    resolve_include_jobs(&mut resolved_pipeline, include_jobs)?,
301                    EarlyExit::Yes
302                ) {
303                    return Ok(());
304                }
305
306                if let Some(viz_mode) = viz_mode {
307                    viz_pipeline(viz_mode, resolved_pipeline, FlowBackend::Github, false)
308                } else {
309                    let mode = if let Some(runtime_path) = runtime {
310                        CheckMode::Runtime(runtime_path)
311                    } else if let Some(check_path) = check {
312                        CheckMode::Check(check_path)
313                    } else {
314                        CheckMode::None
315                    };
316
317                    crate::pipeline_resolver::github_yaml::github_yaml(
318                        resolved_pipeline,
319                        !no_runtime_debug_log,
320                        repo_root,
321                        &out,
322                        flowey_crate,
323                        mode,
324                    )
325                }
326            }
327        }
328    }
329}
330
331fn resolve_pipeline<P: IntoPipeline>(
332    pipelines: P,
333    backend_hint: PipelineBackendHint,
334) -> Result<crate::pipeline_resolver::generic::ResolvedPipeline, anyhow::Error> {
335    let pipeline = pipelines
336        .into_pipeline(backend_hint)
337        .context("error defining pipeline")?;
338
339    let resolved_pipeline = crate::pipeline_resolver::generic::resolve_pipeline(pipeline)
340        .context("invalid pipeline")?;
341
342    Ok(resolved_pipeline)
343}
344
345fn viz_pipeline(
346    viz_mode: VizModeCli,
347    resolved_pipeline: crate::pipeline_resolver::generic::ResolvedPipeline,
348    backend: FlowBackend,
349    with_persist_dir: bool,
350) -> Result<(), anyhow::Error> {
351    match viz_mode {
352        VizModeCli::Toposort => crate::pipeline_resolver::viz::viz_pipeline_toposort(
353            resolved_pipeline,
354            backend,
355            with_persist_dir,
356        ),
357        VizModeCli::Dot => {
358            crate::pipeline_resolver::viz::viz_pipeline_dot(resolved_pipeline, backend)
359        }
360        VizModeCli::FlowDot => crate::pipeline_resolver::viz::viz_pipeline_flow_dot(
361            resolved_pipeline,
362            backend,
363            with_persist_dir,
364        ),
365    }
366}
367
368enum EarlyExit {
369    Yes,
370    No,
371}
372
373#[expect(clippy::option_option, reason = "for clap derive")]
374fn resolve_include_jobs(
375    resolved_pipeline: &mut crate::pipeline_resolver::generic::ResolvedPipeline,
376    include_jobs: Option<Option<IncludeJobs>>,
377) -> anyhow::Result<EarlyExit> {
378    let Some(include_jobs) = include_jobs else {
379        return Ok(EarlyExit::No);
380    };
381
382    match include_jobs.unwrap_or(IncludeJobs::Query) {
383        IncludeJobs::Query => {
384            for (present_idx, &graph_idx) in resolved_pipeline.order.iter().enumerate() {
385                println!(
386                    "{}: {}",
387                    present_idx, resolved_pipeline.graph[graph_idx].label
388                );
389            }
390            Ok(EarlyExit::Yes)
391        }
392        IncludeJobs::List(list) => {
393            let preserve_jobs = list
394                .into_iter()
395                .map(|present_idx| resolved_pipeline.order.get(present_idx).cloned())
396                .collect::<Option<Vec<_>>>()
397                .context("passed invalid job idx. use '?' to list available jobs")?;
398            resolved_pipeline.trim_pipeline_graph(preserve_jobs);
399            Ok(EarlyExit::No)
400        }
401    }
402}