flowey_lib_common/
cache.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Cache the contents of a particular directory between runs.
5//!
6//! The contents of the provided `dir` will be saved at the end of a run, using
7//! the user-defined `key` string to tag the contents of the cache.
8//!
9//! Subsequent runs will then use the `key` to restore the contents of the
10//! directory.
11//!
12//! # A note of file sizes
13//!
14//! This node is backed by the in-box Cache@2 Task on ADO, and the in-box
15//! actions/cache@v3 Action on Github Actions.
16//!
17//! These actions have limits on the size of data they can cache at any given
18//! time, and potentially have issues with particularly large artifacts (e.g:
19//! gigabytes in size).
20//!
21//! In cases where you're intending to cache large files, it is recommended to
22//! implement caching functionality directly using [`NodeCtx::persistent_dir`],
23//! which is guaranteed to be reliable (when running on a system where such
24//! persistent storage is available).
25//!
26//! # Clearing the cache
27//!
28//! Clearing the cache is done in different ways depending on the backend:
29//!
30//! - Local: just delete the cache folder on your machine
31//! - Github: use the cache tasks's web UI to manage cache entries
32//! - ADO: define a pipeline-level variable called `FloweyCacheGeneration`, and set
33//!   it to an new arbitrary value.
34//!     - This is because ADO doesn't have a native way to flush the cache
35//!       outside of updating the cache key in the YAML file itself.
36
37use flowey::node::prelude::*;
38use std::collections::BTreeSet;
39use std::io::Seek;
40use std::io::Write;
41
42/// Status of a cache directory.
43#[derive(Debug, Serialize, Deserialize)]
44pub enum CacheHit {
45    /// Complete miss - cache is empty.
46    Miss,
47    /// Direct hit - cache is perfectly restored.
48    Hit,
49    /// Partial hit - cache was partially restored.
50    PartialHit,
51}
52
53flowey_request! {
54    pub struct Request {
55        /// Friendly label for the directory being cached
56        pub label: String,
57        /// Absolute path to the directory that will be cached between runs
58        pub dir: ReadVar<PathBuf>,
59        /// The key created when saving a cache and the key used to search for a
60        /// cache.
61        pub key: ReadVar<String>,
62        /// An optional set of alternative restore keys.
63        ///
64        /// If no cache hit occurs for key, these restore keys are used
65        /// sequentially in the order provided to find and restore a cache.
66        pub restore_keys: Option<ReadVar<Vec<String>>>,
67        /// Variable to write the result of trying to restore the cache.
68        pub hitvar: WriteVar<CacheHit>,
69    }
70}
71
72new_flow_node!(struct Node);
73
74impl FlowNode for Node {
75    type Request = Request;
76
77    fn imports(_ctx: &mut ImportCtx<'_>) {}
78
79    fn emit(requests: Vec<Self::Request>, ctx: &mut NodeCtx<'_>) -> anyhow::Result<()> {
80        // -- end of req processing -- //
81
82        match ctx.backend() {
83            FlowBackend::Local => {
84                if !ctx.supports_persistent_dir() {
85                    ctx.emit_minor_rust_step("Reporting cache misses", |ctx| {
86                        let hitvars = requests
87                            .into_iter()
88                            .map(|v| v.hitvar.claim(ctx))
89                            .collect::<Vec<_>>();
90
91                        |rt| {
92                            rt.write_all(hitvars, &CacheHit::Miss);
93                        }
94                    });
95
96                    return Ok(());
97                };
98
99                for Request {
100                    label,
101                    dir,
102                    key,
103                    restore_keys,
104                    hitvar,
105                } in requests
106                {
107                    // work around a bug in how post-job nodes affect stage1 day
108                    // culling...
109                    let persistent_dir = ctx.persistent_dir().unwrap();
110
111                    // Needed for saving the cache result.
112                    let (hitvar_reader, hitvar2) = ctx.new_var();
113
114                    let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();
115
116                    ctx.emit_rust_step(format!("Restore cache: {label}"), |ctx| {
117                        require_post_job.claim(ctx);
118                        let persistent_dir = persistent_dir.clone().claim(ctx);
119                        let dir = dir.clone().claim(ctx);
120                        let key = key.clone().claim(ctx);
121                        let restore_keys = restore_keys.claim(ctx);
122                        let hitvar = hitvar.claim(ctx);
123                        let hitvar2 = hitvar2.claim(ctx);
124                        |rt| {
125                            let persistent_dir = rt.read(persistent_dir);
126                            let dir = rt.read(dir);
127                            let key = rt.read(key);
128                            let restore_keys = rt.read(restore_keys);
129
130                            let set_hitvar = move |val| {
131                                log::info!("cache status: {val:?}");
132                                rt.write(hitvar, &val);
133                                rt.write(hitvar2, &val);
134                            };
135
136                            // figure out what cache entries are available to us
137                            //
138                            // (reading this entire file into memory seems fine at
139                            // this juncture, given the sort of datasets we're
140                            // working with)
141                            let available_keys: BTreeSet<String> = if let Ok(s) =
142                                fs_err::read_to_string(persistent_dir.join("cache_keys"))
143                            {
144                                s.split('\n').map(|s| s.trim().to_owned()).collect()
145                            } else {
146                                BTreeSet::new()
147                            };
148
149                            // using the keys the user provided us, check if there's
150                            // a match
151                            let mut existing_cache_dir = None;
152                            for (idx, key) in Some(key)
153                                .into_iter()
154                                .chain(restore_keys.into_iter().flatten())
155                                .enumerate()
156                            {
157                                if available_keys.contains(&key) {
158                                    existing_cache_dir = Some((idx == 0, hash_key_to_dir(&key)));
159                                    break;
160                                }
161                            }
162
163                            let Some((direct_hit, existing_cache_dir)) = existing_cache_dir else {
164                                set_hitvar(CacheHit::Miss);
165                                return Ok(());
166                            };
167
168                            crate::_util::copy_dir_all(
169                                persistent_dir.join(existing_cache_dir),
170                                dir,
171                            )
172                            .context("while restoring cache")?;
173
174                            set_hitvar(if direct_hit {
175                                CacheHit::Hit
176                            } else {
177                                CacheHit::PartialHit
178                            });
179
180                            Ok(())
181                        }
182                    });
183
184                    ctx.emit_rust_step(format!("Saving cache: {label}"), |ctx| {
185                        resolve_post_job.claim(ctx);
186                        let hitvar_reader = hitvar_reader.claim(ctx);
187                        let persistent_dir = persistent_dir.clone().claim(ctx);
188                        let dir = dir.claim(ctx);
189                        let key = key.claim(ctx);
190                        move |rt| {
191                            let persistent_dir = rt.read(persistent_dir);
192                            let dir = rt.read(dir);
193                            let key = rt.read(key);
194                            let hitvar_reader = rt.read(hitvar_reader);
195
196                            let mut cache_keys_file = fs_err::OpenOptions::new()
197                                .append(true)
198                                .create(true)
199                                .read(true)
200                                .open(persistent_dir.join("cache_keys"))?;
201
202                            if matches!(hitvar_reader, CacheHit::Hit) {
203                                // no need to update the cache
204                                log::info!("was direct hit - no updates needed");
205                                return Ok(());
206                            }
207
208                            // otherwise, need to update the cache
209                            crate::_util::copy_dir_all(
210                                dir,
211                                persistent_dir.join(hash_key_to_dir(&key)),
212                            )?;
213
214                            cache_keys_file.seek(std::io::SeekFrom::End(0))?;
215                            writeln!(cache_keys_file, "{}", key)?;
216
217                            log::info!("cache saved");
218
219                            Ok(())
220                        }
221                    });
222                }
223            }
224            FlowBackend::Ado => {
225                for Request {
226                    label,
227                    dir,
228                    key,
229                    restore_keys,
230                    hitvar,
231                } in requests
232                {
233                    let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();
234
235                    let (dir_string, key, restore_keys) = {
236                        let (processed_dir, write_processed_dir) = ctx.new_var();
237                        let (processed_key, write_processed_key) = ctx.new_var();
238                        let (processed_keys, write_processed_keys) = if restore_keys.is_some() {
239                            let (a, b) = ctx.new_var();
240                            (Some(a), Some(b))
241                        } else {
242                            (None, None)
243                        };
244
245                        ctx.emit_rust_step("Pre-processing cache vars", |ctx| {
246                            require_post_job.claim(ctx);
247                            let write_processed_dir = write_processed_dir.claim(ctx);
248                            let write_processed_key = write_processed_key.claim(ctx);
249                            let write_processed_keys = write_processed_keys.claim(ctx);
250
251                            let dir = dir.clone().claim(ctx);
252                            let key = key.claim(ctx);
253                            let restore_keys = restore_keys.claim(ctx);
254
255                            |rt| {
256                                let dir = rt.read(dir);
257                                // while we're here, we'll convert dir into a
258                                // String, so it can be stuffed into an ADO var
259                                rt.write(
260                                    write_processed_dir,
261                                    &dir.absolute()?.display().to_string(),
262                                );
263
264                                // Inject `$(FloweyCacheGeneration)` as part of the
265                                // cache key to provide a non-intrusive mechanism to
266                                // cycle the ADO cache when it gets into an
267                                // inconsistent state.
268                                //
269                                // Deny cross-os caching by default (matching Github
270                                // CI works by default).
271                                //
272                                // FUTURE: add toggle to request to allow cross-os
273                                // caching?
274                                let inject_extras = |s| {
275                                    format!(r#""$(FloweyCacheGeneration)" | "$(Agent.OS)" | "{s}""#)
276                                };
277
278                                let key = rt.read(key);
279                                rt.write(write_processed_key, &inject_extras(key));
280
281                                if let Some(write_processed_keys) = write_processed_keys {
282                                    let restore_keys = rt.read(restore_keys.unwrap());
283                                    rt.write(
284                                        write_processed_keys,
285                                        &restore_keys
286                                            .into_iter()
287                                            .map(inject_extras)
288                                            .collect::<Vec<_>>()
289                                            .join("\\n"),
290                                    );
291                                }
292
293                                Ok(())
294                            }
295                        });
296
297                        (processed_dir, processed_key, processed_keys)
298                    };
299
300                    let (hitvar_str_reader, hitvar_str_writer) = ctx.new_var();
301
302                    ctx.emit_ado_step(format!("Restore cache: {label}"), |ctx| {
303                        let dir_string = dir_string.clone().claim(ctx);
304                        let key = key.claim(ctx);
305                        let restore_keys = restore_keys.claim(ctx);
306                        let hitvar_str_writer = hitvar_str_writer.claim(ctx);
307                        |rt| {
308                            let dir_string = rt.get_var(dir_string).as_raw_var_name();
309                            let key = rt.get_var(key).as_raw_var_name();
310                            let restore_keys = if let Some(restore_keys) = restore_keys {
311                                format!(
312                                    "restore_keys: $({})",
313                                    rt.get_var(restore_keys).as_raw_var_name()
314                                )
315                            } else {
316                                String::new()
317                            };
318
319                            let hitvar_ado =
320                                AdoRuntimeVar::dangerous_from_global("FLOWEY_CACHE_HITVAR", false);
321                            // note the _lack_ of $() around the var!
322                            let hitvar_input =
323                                format!("cacheHitVar: {}", hitvar_ado.as_raw_var_name());
324                            rt.set_var(hitvar_str_writer, hitvar_ado);
325
326                            format!(
327                                r#"
328                                - task: Cache@2
329                                  inputs:
330                                    key: '$({key})'
331                                    path: $({dir_string})
332                                    {restore_keys}
333                                    {hitvar_input}
334                            "#
335                            )
336                        }
337                    });
338
339                    ctx.emit_rust_step("map ADO hitvar to flowey", |ctx| {
340                        let label = label.clone();
341                        let dir = dir.clone().claim(ctx);
342
343                        let hitvar = hitvar.claim(ctx);
344                        let hitvar_str_reader = hitvar_str_reader.claim(ctx);
345                        move |rt| {
346                            let dir = rt.read(dir);
347                            let hitvar_str = rt.read(hitvar_str_reader);
348                            let mut var = match hitvar_str.as_str() {
349                                "true" => CacheHit::Hit,
350                                "false" => CacheHit::Miss,
351                                "inexact" => CacheHit::PartialHit,
352                                other => anyhow::bail!("unexpected cacheHitVar value: {other}"),
353                            };
354
355                            // WORKAROUND: ADO is really cool software, and
356                            // sometimes, when it feels like it, i'll get into
357                            // an inconsistent state where it reports a cache
358                            // hit, but then the cache is actually empty.
359                            if matches!(var, CacheHit::Hit | CacheHit::PartialHit) {
360                                if dir.read_dir()?.next().is_none() {
361                                    log::error!("Detected inconsistent ADO cache entry: {label}");
362                                    log::error!("Please define/cycle the `FloweyCacheGeneration` pipeline variable");
363                                    var = CacheHit::Miss;
364                                }
365                            }
366
367                            rt.write(hitvar, &var);
368                            Ok(())
369                        }
370                    });
371
372                    ctx.emit_rust_step(format!("validate cache entry: {label}"), |ctx| {
373                        resolve_post_job.claim(ctx);
374                        let dir = dir.clone().claim(ctx);
375                        move |rt| {
376                            let mut dir_contents = rt.read(dir).read_dir()?.peekable();
377
378                            if dir_contents.peek().is_none() {
379                                log::error!("Detected empty cache folder for entry: {label}");
380                                log::error!("This is a bug - please update the pipeline code");
381                                anyhow::bail!("cache error")
382                            }
383
384                            for entry in dir_contents {
385                                let entry = entry?;
386                                log::debug!("uploading: {}", entry.path().display());
387                            }
388
389                            Ok(())
390                        }
391                    });
392                }
393            }
394            FlowBackend::Github => {
395                for Request {
396                    label,
397                    dir,
398                    key,
399                    restore_keys,
400                    hitvar,
401                } in requests
402                {
403                    let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();
404
405                    let (dir_string, key, restore_keys) = {
406                        let (processed_dir, write_processed_dir) = ctx.new_var();
407                        let (processed_key, write_processed_key) = ctx.new_var();
408                        let (processed_keys, write_processed_keys) = if restore_keys.is_some() {
409                            let (a, b) = ctx.new_var();
410                            (Some(a), Some(b))
411                        } else {
412                            (None, None)
413                        };
414
415                        ctx.emit_rust_step("Pre-processing cache vars", |ctx| {
416                            require_post_job.claim(ctx);
417                            let write_processed_dir = write_processed_dir.claim(ctx);
418                            let write_processed_key = write_processed_key.claim(ctx);
419                            let write_processed_keys = write_processed_keys.claim(ctx);
420
421                            let dir = dir.clone().claim(ctx);
422                            let key = key.claim(ctx);
423                            let restore_keys = restore_keys.claim(ctx);
424
425                            |rt| {
426                                let dir = rt.read(dir);
427                                rt.write(
428                                    write_processed_dir,
429                                    &dir.absolute()?.display().to_string(),
430                                );
431
432                                let key = rt.read(key);
433                                let key = format!("{key}-{}-{}", rt.arch(), rt.platform());
434                                rt.write(write_processed_key, &key);
435
436                                if let Some(write_processed_keys) = write_processed_keys {
437                                    let restore_keys = rt.read(restore_keys.unwrap());
438                                    rt.write(
439                                        write_processed_keys,
440                                        &format!(
441                                            r#""[{}]""#,
442                                            restore_keys
443                                                .into_iter()
444                                                .map(|s| format!(
445                                                    "'{s}-{}-{}'",
446                                                    rt.arch(),
447                                                    rt.platform()
448                                                ))
449                                                .collect::<Vec<_>>()
450                                                .join(", ")
451                                        ),
452                                    );
453                                }
454
455                                Ok(())
456                            }
457                        });
458
459                        (processed_dir, processed_key, processed_keys)
460                    };
461
462                    let (hitvar_str_reader, hitvar_str_writer) = ctx.new_var();
463
464                    let mut step = ctx
465                        .emit_gh_step(format!("Restore cache: {label}"), "actions/cache@v4")
466                        .with("key", key)
467                        .with("path", dir_string);
468                    if let Some(restore_keys) = restore_keys {
469                        step = step.with("restore-keys", restore_keys);
470                    }
471                    step.output("cache-hit", hitvar_str_writer).finish(ctx);
472
473                    ctx.emit_minor_rust_step("map Github cache-hit to flowey", |ctx| {
474                        let hitvar = hitvar.claim(ctx);
475                        let hitvar_str_reader = hitvar_str_reader.claim(ctx);
476                        // TODO: How do we distinguish between a partial hit and a miss?
477                        move |rt| {
478                            let hitvar_str = rt.read(hitvar_str_reader);
479                            // Github's cache action brilliantly only reports "false" if missing a cache key that exists,
480                            // and leaves it blank if its a miss in other cases.
481                            let var = match hitvar_str.as_str() {
482                                "true" => CacheHit::Hit,
483                                _ => CacheHit::Miss,
484                            };
485
486                            rt.write(hitvar, &var);
487                        }
488                    });
489
490                    ctx.emit_rust_step(format!("validate cache entry: {label}"), |ctx| {
491                        resolve_post_job.claim(ctx);
492                        let dir = dir.clone().claim(ctx);
493                        move |rt| {
494                            let mut dir_contents = rt.read(dir).read_dir()?.peekable();
495
496                            if dir_contents.peek().is_none() {
497                                log::error!("Detected empty cache folder for entry: {label}");
498                                log::error!("This is a bug - please update the pipeline code");
499                                anyhow::bail!("cache error")
500                            }
501
502                            for entry in dir_contents {
503                                let entry = entry?;
504                                log::debug!("uploading: {}", entry.path().display());
505                            }
506
507                            Ok(())
508                        }
509                    });
510                }
511            }
512        }
513
514        Ok(())
515    }
516}
517
518// _technically_, if we want to be _super_ sure we're not gonna have a hash
519// collision, we should also do a content-hash of the thing we're about to
520// cache... but this should be OK for now, given that we don't expect to have a
521// massive number of cache entries.
522fn hash_key_to_dir(key: &str) -> String {
523    let hasher = &mut rustc_hash::FxHasher::default();
524    std::hash::Hash::hash(&key, hasher);
525    let hash = std::hash::Hasher::finish(hasher);
526    format!("{:08x?}", hash)
527}