1use flowey::node::prelude::*;
7use std::collections::BTreeMap;
8
9#[derive(Serialize, Deserialize)]
11pub enum RepoSource<C = VarNotClaimed> {
12 AdoResource(AdoResourcesRepositoryId),
18 GithubRepo { owner: String, name: String },
20 GithubSelf,
22 ExistingClone(ReadVar<PathBuf, C>),
24 LocalOnlyNewClone {
26 url: String,
27 path: PathBuf,
28 ignore_existing_clone: bool,
29 },
30}
31
32impl<C> Clone for RepoSource<C> {
33 fn clone(&self) -> Self {
34 match self {
35 Self::AdoResource(arg0) => Self::AdoResource(arg0.clone()),
36 Self::GithubRepo { owner, name } => Self::GithubRepo {
37 owner: owner.clone(),
38 name: name.clone(),
39 },
40 Self::GithubSelf => Self::GithubSelf,
41 Self::ExistingClone(arg0) => Self::ExistingClone(arg0.clone()),
42 Self::LocalOnlyNewClone {
43 url,
44 path,
45 ignore_existing_clone,
46 } => Self::LocalOnlyNewClone {
47 url: url.clone(),
48 path: path.clone(),
49 ignore_existing_clone: *ignore_existing_clone,
50 },
51 }
52 }
53}
54
55impl ClaimVar for RepoSource {
57 type Claimed = RepoSource<VarClaimed>;
58
59 fn claim(self, ctx: &mut StepCtx<'_>) -> Self::Claimed {
60 match self {
61 RepoSource::AdoResource(x) => RepoSource::AdoResource(x),
62 RepoSource::GithubRepo { owner, name } => RepoSource::GithubRepo { owner, name },
63 RepoSource::GithubSelf => RepoSource::GithubSelf,
64 RepoSource::ExistingClone(v) => RepoSource::ExistingClone(v.claim(ctx)),
65 RepoSource::LocalOnlyNewClone {
66 url,
67 path,
68 ignore_existing_clone,
69 } => RepoSource::LocalOnlyNewClone {
70 url,
71 path,
72 ignore_existing_clone,
73 },
74 }
75 }
76}
77
78flowey_request! {
79 pub enum Request {
80 CheckoutRepo {
95 repo_id: ReadVar<String>,
98 repo_path: WriteVar<PathBuf>,
100 persist_credentials: bool,
103 },
106 RegisterRepo {
108 repo_id: String,
111 repo_src: RepoSource,
113 allow_persist_credentials: bool,
121 depth: Option<usize>,
127 pre_run_deps: Vec<ReadVar<SideEffect>>,
128 },
129 LocalOnlyRequireExistingClones(bool),
132 }
133}
134
135new_flow_node!(struct Node);
136
137pub mod process_reqs {
139 use super::*;
140
141 pub struct RequestCheckoutRepo {
142 pub repo_id: ReadVar<String>,
143 pub repo_path: WriteVar<PathBuf>,
144 pub persist_credentials: bool,
145 }
146
147 pub struct RequestRegisterRepo {
148 pub repo_id: String,
149 pub repo_src: RepoSource,
150 pub allow_persist_credentials: bool,
151 pub depth: Option<usize>,
152 pub pre_run_deps: Vec<ReadVar<SideEffect>>,
153 }
154
155 pub struct ResolvedRequestsAdo {
156 pub checkout_repo: Vec<RequestCheckoutRepo>,
157 pub register_repo: Vec<RequestRegisterRepo>,
158 }
159
160 impl ResolvedRequestsAdo {
161 pub fn from_reqs(requests: Vec<Request>) -> anyhow::Result<Self> {
162 let ResolvedRequests::Ado(v) = process_reqs(requests, false)? else {
163 panic!()
164 };
165 Ok(v)
166 }
167 }
168
169 pub struct ResolvedRequestsLocal {
170 pub checkout_repo: Vec<RequestCheckoutRepo>,
171 pub register_repo: Vec<RequestRegisterRepo>,
172 pub require_local_clones: bool,
173 }
174
175 impl ResolvedRequestsLocal {
176 pub fn from_reqs(requests: Vec<Request>) -> anyhow::Result<Self> {
177 let ResolvedRequests::Local(v) = process_reqs(requests, true)? else {
178 panic!()
179 };
180 Ok(v)
181 }
182 }
183
184 enum ResolvedRequests {
185 Ado(ResolvedRequestsAdo),
186 Local(ResolvedRequestsLocal),
187 }
188
189 fn process_reqs(requests: Vec<Request>, is_local: bool) -> anyhow::Result<ResolvedRequests> {
190 let mut checkout_repo = Vec::new();
191 let mut register_repo = Vec::new();
192 let mut require_local_clones = None;
193
194 for req in requests {
195 match req {
196 Request::CheckoutRepo {
197 repo_id,
198 repo_path,
199 persist_credentials,
200 } => checkout_repo.push(RequestCheckoutRepo {
201 repo_id,
202 repo_path,
203 persist_credentials,
204 }),
205 Request::RegisterRepo {
206 repo_id,
207 repo_src,
208 allow_persist_credentials,
209 depth,
210 pre_run_deps,
211 } => register_repo.push(RequestRegisterRepo {
212 repo_id,
213 repo_src,
214 allow_persist_credentials,
215 depth,
216 pre_run_deps,
217 }),
218 Request::LocalOnlyRequireExistingClones(v) => same_across_all_reqs(
219 "LocalOnlyRequireExistingClones",
220 &mut require_local_clones,
221 v,
222 )?,
223 }
224 }
225
226 if !is_local {
227 if require_local_clones.is_some() {
228 anyhow::bail!(
229 "can only set `LocalOnlyRequireExistingClones` when using the Local backend"
230 )
231 }
232 }
233
234 Ok(if is_local {
235 ResolvedRequests::Local(ResolvedRequestsLocal {
236 checkout_repo,
237 register_repo,
238 require_local_clones: require_local_clones.ok_or(anyhow::anyhow!(
239 "Missing required request: LocalOnlyRequireExistingClones",
240 ))?,
241 })
242 } else {
243 ResolvedRequests::Ado(ResolvedRequestsAdo {
244 checkout_repo,
245 register_repo,
246 })
247 })
248 }
249}
250
251impl FlowNode for Node {
252 type Request = Request;
253
254 fn imports(dep: &mut ImportCtx<'_>) {
255 dep.import::<crate::install_git::Node>();
256 }
257
258 fn emit(requests: Vec<Self::Request>, ctx: &mut NodeCtx<'_>) -> anyhow::Result<()> {
259 match ctx.backend() {
260 FlowBackend::Local => Self::emit_local(requests, ctx),
261 FlowBackend::Ado => Self::emit_ado(requests, ctx),
262 FlowBackend::Github => Self::emit_gh(requests, ctx),
263 }
264 }
265}
266
267impl Node {
268 fn emit_ado(requests: Vec<Request>, ctx: &mut NodeCtx<'_>) -> anyhow::Result<()> {
269 let process_reqs::ResolvedRequestsAdo {
270 checkout_repo,
271 register_repo,
272 } = process_reqs::ResolvedRequestsAdo::from_reqs(requests)?;
273
274 if checkout_repo.is_empty() {
275 return Ok(());
276 }
277
278 let mut did_checkouts = Vec::new();
279 let mut registered_repos = BTreeMap::<(String, bool), (usize, RepoSource)>::new();
280 for (
281 idx,
282 process_reqs::RequestRegisterRepo {
283 repo_id,
284 repo_src,
285 allow_persist_credentials,
286 depth,
287 pre_run_deps,
288 },
289 ) in register_repo.into_iter().enumerate()
290 {
291 let existing = registered_repos.insert(
292 (repo_id.clone(), allow_persist_credentials),
293 (idx, repo_src.clone()),
294 );
295 if existing.is_some() {
296 anyhow::bail!("got a duplicate RegisterRepo request for {repo_id}")
297 }
298
299 let (persist_credentials_str, write_persist_credentials_str) = ctx.new_var();
300 let (active, write_active) = ctx.new_var();
301
302 ctx.emit_rust_step(format!("check if {repo_id} needs to be cloned"), |ctx| {
303 pre_run_deps.claim(ctx);
304 let write_active = write_active.claim(ctx);
305 let write_persist_credentials_str = write_persist_credentials_str.claim(ctx);
306 let repo_ids = checkout_repo
307 .iter()
308 .map(|process_reqs::RequestCheckoutRepo { repo_id, persist_credentials, .. }| {
309 ( repo_id.clone().claim(ctx), *persist_credentials)
310 })
311 .collect::<Vec<_>>();
312 let repo_id = repo_id.clone();
313 move |rt| {
314 for (requested_checkout_repo_id, persist_credentials) in repo_ids {
315 if rt.read(requested_checkout_repo_id) == repo_id {
316 if persist_credentials {
317 if allow_persist_credentials != persist_credentials {
318 anyhow::bail!("pipeline implementation bug: attempted to checkout repo with `persist_credentials`, whose registration didn't include `allow_persist_credentials: true`")
319 }
320 }
321
322 rt.write(write_persist_credentials_str, &persist_credentials.to_string());
323 rt.write(write_active, &true);
324 return Ok(());
325 }
326 }
327
328 rt.write(write_active, &false);
329 Ok(())
330 }
331 });
332
333 let (did_checkout, claim_did_checkout) = ctx.new_var();
334 if let RepoSource::AdoResource(checkout_str) = repo_src {
335 ctx.emit_ado_step_with_condition(
336 format!("checkout repo {repo_id}"),
337 active.clone(),
338 |ctx| {
339 claim_did_checkout.claim(ctx);
340 let persist_credentials_str = persist_credentials_str.claim(ctx);
341 move |rt| {
342 let checkout_str = rt.resolve_repository_id(checkout_str);
343 let persist_credentials =
344 rt.get_var(persist_credentials_str).as_raw_var_name();
345 let depth = match depth {
346 Some(x) => x.to_string(),
347 None => "0".into(),
348 };
349
350 format!(
357 r#"
358 - checkout: {checkout_str}
359 path: repo{idx}
360 fetchTags: false
361 fetchDepth: {depth}
362 persistCredentials: $({persist_credentials})
363 submodules: recursive
364 "#
365 )
366 }
367 },
368 );
369 } else {
370 ctx.emit_side_effect_step(
371 [
372 active.into_side_effect(),
373 persist_credentials_str.into_side_effect(),
374 ],
375 [claim_did_checkout],
376 )
377 }
378
379 did_checkouts.push(did_checkout);
380 }
381
382 ctx.emit_rust_step("report cloned repo directories", move |ctx| {
383 did_checkouts.claim(ctx);
384 let mut registered_repos = registered_repos.into_iter().map(|(k, (a, b))| (k, (a, b.claim(ctx)))).collect::<BTreeMap<_, _>>();
385 let checkout_repo = checkout_repo
386 .into_iter()
387 .map(|process_reqs::RequestCheckoutRepo { repo_id, repo_path, persist_credentials }| {
388 (repo_id.claim(ctx), repo_path.claim(ctx), persist_credentials)
389 })
390 .collect::<Vec<_>>();
391
392 move |rt| {
393 let mut checkout_reqs = BTreeMap::<(String, bool), Vec<ClaimedWriteVar<PathBuf>>>::new();
394 for (repo_id, repo_path, persist_credentials) in checkout_repo {
395 checkout_reqs
396 .entry((rt.read(repo_id), persist_credentials))
397 .or_default()
398 .push(repo_path);
399 }
400
401
402 for ((repo_id, persist_credentials), repo_paths) in checkout_reqs {
403 let (idx, repo_src) = registered_repos
404 .remove(&(repo_id.clone(), persist_credentials))
405 .with_context(|| format!("pipeline implementation bug: did not specify a RegisterRepo request for repo {repo_id}"))?;
406
407 let path = match repo_src {
408 RepoSource::AdoResource(_) => {
409 if cfg!(windows) {
411 Path::new(r#"D:\a\_work\1\"#)
412 } else {
413 Path::new("/mnt/vss/_work/1/")
414 }
415 .join(format!("repo{idx}"))
416 },
417 RepoSource::GithubRepo{ .. } | RepoSource::GithubSelf => anyhow::bail!("repo source for ADO backend must be an `AdoResource` or `ExistingClone`"),
418 RepoSource::ExistingClone(path) => {
419 let path = rt.read(path);
420 path.absolute().context(format!("Failed to make {} absolute", path.display()))?
421 },
422 RepoSource::LocalOnlyNewClone { .. } => unreachable!(),
423 };
424
425 log::info!("reporting repo is cloned at {}", path.display());
426 for var in repo_paths {
427 rt.write(var, &path);
428 }
429 }
430
431 Ok(())
432 }
433 });
434
435 Ok(())
436 }
437
438 fn emit_gh(requests: Vec<Request>, ctx: &mut NodeCtx<'_>) -> anyhow::Result<()> {
439 let process_reqs::ResolvedRequestsAdo {
440 checkout_repo,
441 register_repo,
442 } = process_reqs::ResolvedRequestsAdo::from_reqs(requests)?;
443
444 if checkout_repo.is_empty() {
445 return Ok(());
446 }
447
448 let mut did_checkouts = Vec::new();
449 let mut registered_repos = BTreeMap::<(String, bool), (usize, RepoSource)>::new();
450 for (
451 idx,
452 process_reqs::RequestRegisterRepo {
453 repo_id,
454 repo_src,
455 allow_persist_credentials,
456 depth,
457 pre_run_deps,
458 },
459 ) in register_repo.into_iter().enumerate()
460 {
461 let existing = registered_repos.insert(
462 (repo_id.clone(), allow_persist_credentials),
463 (idx, repo_src.clone()),
464 );
465 if existing.is_some() {
466 anyhow::bail!("got a duplicate RegisterRepo request for {repo_id}")
467 }
468
469 let (persist_credentials_str, write_persist_credentials_str) = ctx.new_var();
470 let (active, write_active) = ctx.new_var();
471 ctx.emit_rust_step(format!("check if {repo_id} needs to be cloned"), |ctx| {
472 pre_run_deps.claim(ctx);
473 let write_active = write_active.claim(ctx);
474 let write_persist_credentials_str = write_persist_credentials_str.claim(ctx);
475 let repo_ids = checkout_repo
476 .iter()
477 .map(|process_reqs::RequestCheckoutRepo { repo_id, persist_credentials, .. }| {
478 (repo_id.clone().claim(ctx), *persist_credentials)
479 })
480 .collect::<Vec<_>>();
481 let repo_id = repo_id.clone();
482 move |rt| {
483 for (requested_checkout_repo_id, persist_credentials) in repo_ids {
484 if rt.read(requested_checkout_repo_id) == repo_id {
485 if persist_credentials {
486 if allow_persist_credentials != persist_credentials {
487 anyhow::bail!("pipeline implementation bug: attempted to checkout repo with `persist_credentials`, whose registration didn't include `allow_persist_credentials: true`")
488 }
489 }
490
491 rt.write(write_persist_credentials_str, &persist_credentials.to_string());
492 rt.write(write_active, &true);
493 return Ok(());
494 }
495 }
496
497 rt.write(write_active, &false);
498 Ok(())
499 }
500 });
501
502 if matches!(
503 repo_src,
504 RepoSource::GithubSelf | RepoSource::GithubRepo { .. }
505 ) {
506 let mut step = ctx
507 .emit_gh_step(format!("checkout repo {repo_id}"), "actions/checkout@v4")
508 .condition(active.clone())
509 .with("path", format!("repo{idx}"))
510 .with("fetch-depth", depth.unwrap_or(0).to_string())
511 .with("persist-credentials", persist_credentials_str)
512 .requires_permission(GhPermission::Contents, GhPermissionValue::Read);
513 if let RepoSource::GithubRepo { owner, name } = repo_src {
514 step = step.with("repository", format!("{owner}/{name}"))
515 }
516 did_checkouts.push(step.finish(ctx));
517 } else if !matches!(repo_src, RepoSource::ExistingClone(_)) {
518 anyhow::bail!(
519 "repo source must be a `GithubRepo`, `GithubSelf`, or `ExistingClone` for GitHub backend"
520 );
521 }
522 }
523
524 let parent_path = ctx.get_gh_context_var().global().workspace();
525 ctx.emit_rust_step("report cloned repo directories", move |ctx| {
526 did_checkouts.claim(ctx);
527 let mut registered_repos = registered_repos.into_iter().map(|(k, (a, b))| (k, (a, b.claim(ctx)))).collect::<BTreeMap<_, _>>();
528 let checkout_repo = checkout_repo
529 .into_iter()
530 .map(|process_reqs::RequestCheckoutRepo { repo_id, repo_path, persist_credentials }| {
531 (repo_id.claim(ctx), repo_path.claim(ctx), persist_credentials)
532 })
533 .collect::<Vec<_>>();
534 let parent_path = parent_path.claim(ctx);
535
536 move |rt| {
537 let mut checkout_reqs = BTreeMap::<(String, bool), Vec<ClaimedWriteVar<PathBuf>>>::new();
538 for (repo_id, repo_path, persist_credentials) in checkout_repo {
539 checkout_reqs
540 .entry((rt.read(repo_id), persist_credentials))
541 .or_default()
542 .push(repo_path);
543 }
544
545 let parent_path = rt.read(parent_path);
546 for ((repo_id, persist_credentials), repo_paths) in checkout_reqs {
547 let (idx, repo_src) = registered_repos
548 .remove(&(repo_id.clone(), persist_credentials))
549 .with_context(|| format!("pipeline implementation bug: did not specify a RegisterRepo request for repo {repo_id}"))?;
550
551 let path = match repo_src {
552 RepoSource::AdoResource(_) => unreachable!(),
553 RepoSource::GithubRepo{ .. } => {
554 PathBuf::from(parent_path.clone()).join(format!("repo{idx}"))
555 },
556 RepoSource::GithubSelf => {
557 PathBuf::from(parent_path.clone()).join(format!("repo{idx}"))
558 },
559 RepoSource::ExistingClone(path) => {
560 let path = rt.read(path);
561 path.absolute().context(format!("Failed to make {} absolute", path.display()))?
562 },
563 RepoSource::LocalOnlyNewClone { .. } => unreachable!(),
564 };
565
566 log::info!("reporting repo is cloned at {}", path.display());
567
568 for var in repo_paths {
569 rt.write(var, &path);
570 }
571 }
572
573 Ok(())
574 }
575 });
576
577 Ok(())
578 }
579
580 fn emit_local(requests: Vec<Request>, ctx: &mut NodeCtx<'_>) -> anyhow::Result<()> {
581 let process_reqs::ResolvedRequestsLocal {
582 checkout_repo,
583 register_repo,
584 require_local_clones,
585 } = process_reqs::ResolvedRequestsLocal::from_reqs(requests)?;
586
587 if checkout_repo.is_empty() {
588 return Ok(());
589 }
590
591 let git_ensure_installed = ctx.reqv(crate::install_git::Request::EnsureInstalled);
592
593 ctx.emit_rust_step("report repo directory", move |ctx| {
594 git_ensure_installed.claim(ctx);
595 let register_repo = register_repo
596 .into_iter()
597 .map(|process_reqs::RequestRegisterRepo { repo_id, repo_src, allow_persist_credentials: _, depth, pre_run_deps }|
598 (repo_id, repo_src.claim(ctx), depth, pre_run_deps.claim(ctx)
599 )).collect::<Vec<_>>();
600 let checkout_repo = checkout_repo
601 .into_iter()
602 .map(|process_reqs::RequestCheckoutRepo { repo_id, repo_path, persist_credentials }| {
603 (repo_id.claim(ctx), repo_path.claim(ctx), persist_credentials)
604 })
605 .collect::<Vec<_>>();
606
607 move |rt| {
608 for (checkout_repo_id, repo_path, _persist_credentials) in checkout_repo {
609 let checkout_repo_id = rt.read(checkout_repo_id);
610
611 log::info!("reporting checkout info for {checkout_repo_id}");
612
613 let mut found_path = None;
614 for (repo_id, repo_src, depth, _) in ®ister_repo {
615 if &checkout_repo_id != repo_id {
616 continue;
617 }
618
619 match repo_src {
620 RepoSource::ExistingClone(path) => {
621 let path = rt.read(path.clone());
622 let path = path.absolute().context(format!("Failed to make {} absolute", path.display()))?;
623 found_path = Some(path);
624 break;
625 }
626 RepoSource::LocalOnlyNewClone { .. } if require_local_clones => {
627 anyhow::bail!("`LocalOnlyRequireExistingClones` is active, all repos must be registered using `RepoKind::ExistingClone`");
628 }
629 RepoSource::LocalOnlyNewClone { url, path, ignore_existing_clone } => {
630 let sh = xshell::Shell::new()?;
631 if sh.path_exists(path) {
632 sh.change_dir(path);
633 if xshell::cmd!(sh, "git status").run().is_ok()
634 && *ignore_existing_clone
635 {
636 rt.write(repo_path, path);
637 return Ok(());
638 }
639 }
640 if let Some(depth_arg) = depth {
641 let depth_arg_string = depth_arg.to_string();
642 xshell::cmd!(sh, "git clone --depth {depth_arg_string} {url} {path}").run()?;
643 } else {
644 xshell::cmd!(sh, "git clone {url} {path}").run()?;
645 }
646 found_path = Some(path.clone());
647 break;
648 }
649 RepoSource::AdoResource( .. ) => {
650 anyhow::bail!("ADO resources are not supported on local backend");
651 }
652 RepoSource::GithubRepo{ .. } | RepoSource::GithubSelf => {
653 anyhow::bail!("Github repos for GH Actions are not supported on local backend");
654 }
655 }
656 }
657
658 if let Some(path) = found_path {
659 rt.write(repo_path, &path);
660 } else {
661 anyhow::bail!("missing registration for id {checkout_repo_id}")
662 }
663 }
664
665 Ok(())
666 }
667 });
668
669 Ok(())
670 }
671}