flowey_lib_common/cache.rs
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! Cache the contents of a particular directory between runs.
5//!
6//! The contents of the provided `dir` will be saved at the end of a run, using
7//! the user-defined `key` string to tag the contents of the cache.
8//!
9//! Subsequent runs will then use the `key` to restore the contents of the
10//! directory.
11//!
12//! # A note of file sizes
13//!
14//! This node is backed by the in-box Cache@2 Task on ADO, and the in-box
15//! actions/cache@v3 Action on Github Actions.
16//!
17//! These actions have limits on the size of data they can cache at any given
18//! time, and potentially have issues with particularly large artifacts (e.g:
19//! gigabytes in size).
20//!
21//! In cases where you're intending to cache large files, it is recommended to
22//! implement caching functionality directly using [`NodeCtx::persistent_dir`],
23//! which is guaranteed to be reliable (when running on a system where such
24//! persistent storage is available).
25//!
26//! # Clearing the cache
27//!
28//! Clearing the cache is done in different ways depending on the backend:
29//!
30//! - Local: just delete the cache folder on your machine
31//! - Github: use the cache tasks's web UI to manage cache entries
32//! - ADO: define a pipeline-level variable called `FloweyCacheGeneration`, and set
33//! it to an new arbitrary value.
34//! - This is because ADO doesn't have a native way to flush the cache
35//! outside of updating the cache key in the YAML file itself.
36
37use flowey::node::prelude::*;
38use std::collections::BTreeSet;
39use std::io::Seek;
40use std::io::Write;
41
42/// Status of a cache directory.
43#[derive(Debug, Serialize, Deserialize)]
44pub enum CacheHit {
45 /// Complete miss - cache is empty.
46 Miss,
47 /// Direct hit - cache is perfectly restored.
48 Hit,
49 /// Partial hit - cache was partially restored.
50 PartialHit,
51}
52
53flowey_request! {
54 pub struct Request {
55 /// Friendly label for the directory being cached
56 pub label: String,
57 /// Absolute path to the directory that will be cached between runs
58 pub dir: ReadVar<PathBuf>,
59 /// The key created when saving a cache and the key used to search for a
60 /// cache.
61 pub key: ReadVar<String>,
62 /// An optional set of alternative restore keys.
63 ///
64 /// If no cache hit occurs for key, these restore keys are used
65 /// sequentially in the order provided to find and restore a cache.
66 pub restore_keys: Option<ReadVar<Vec<String>>>,
67 /// Variable to write the result of trying to restore the cache.
68 pub hitvar: WriteVar<CacheHit>,
69 }
70}
71
72new_flow_node!(struct Node);
73
74impl FlowNode for Node {
75 type Request = Request;
76
77 fn imports(_ctx: &mut ImportCtx<'_>) {}
78
79 fn emit(requests: Vec<Self::Request>, ctx: &mut NodeCtx<'_>) -> anyhow::Result<()> {
80 // -- end of req processing -- //
81
82 match ctx.backend() {
83 FlowBackend::Local => {
84 if !ctx.supports_persistent_dir() {
85 ctx.emit_minor_rust_step("Reporting cache misses", |ctx| {
86 let hitvars = requests
87 .into_iter()
88 .map(|v| v.hitvar.claim(ctx))
89 .collect::<Vec<_>>();
90
91 |rt| {
92 rt.write_all(hitvars, &CacheHit::Miss);
93 }
94 });
95
96 return Ok(());
97 };
98
99 for Request {
100 label,
101 dir,
102 key,
103 restore_keys,
104 hitvar,
105 } in requests
106 {
107 // work around a bug in how post-job nodes affect stage1 day
108 // culling...
109 let persistent_dir = ctx.persistent_dir().unwrap();
110
111 // Needed for saving the cache result.
112 let (hitvar_reader, hitvar2) = ctx.new_var();
113
114 let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();
115
116 ctx.emit_rust_step(format!("Restore cache: {label}"), |ctx| {
117 require_post_job.claim(ctx);
118 let persistent_dir = persistent_dir.clone().claim(ctx);
119 let dir = dir.clone().claim(ctx);
120 let key = key.clone().claim(ctx);
121 let restore_keys = restore_keys.claim(ctx);
122 let hitvar = hitvar.claim(ctx);
123 let hitvar2 = hitvar2.claim(ctx);
124 |rt| {
125 let persistent_dir = rt.read(persistent_dir);
126 let dir = rt.read(dir);
127 let key = rt.read(key);
128 let restore_keys = rt.read(restore_keys);
129
130 let set_hitvar = move |val| {
131 log::info!("cache status: {val:?}");
132 rt.write(hitvar, &val);
133 rt.write(hitvar2, &val);
134 };
135
136 // figure out what cache entries are available to us
137 //
138 // (reading this entire file into memory seems fine at
139 // this juncture, given the sort of datasets we're
140 // working with)
141 let available_keys: BTreeSet<String> = if let Ok(s) =
142 fs_err::read_to_string(persistent_dir.join("cache_keys"))
143 {
144 s.split('\n').map(|s| s.trim().to_owned()).collect()
145 } else {
146 BTreeSet::new()
147 };
148
149 // using the keys the user provided us, check if there's
150 // a match
151 let mut existing_cache_dir = None;
152 for (idx, key) in Some(key)
153 .into_iter()
154 .chain(restore_keys.into_iter().flatten())
155 .enumerate()
156 {
157 if available_keys.contains(&key) {
158 existing_cache_dir = Some((idx == 0, hash_key_to_dir(&key)));
159 break;
160 }
161 }
162
163 let Some((direct_hit, existing_cache_dir)) = existing_cache_dir else {
164 set_hitvar(CacheHit::Miss);
165 return Ok(());
166 };
167
168 crate::_util::copy_dir_all(
169 persistent_dir.join(existing_cache_dir),
170 dir,
171 )
172 .context("while restoring cache")?;
173
174 set_hitvar(if direct_hit {
175 CacheHit::Hit
176 } else {
177 CacheHit::PartialHit
178 });
179
180 Ok(())
181 }
182 });
183
184 ctx.emit_rust_step(format!("Saving cache: {label}"), |ctx| {
185 resolve_post_job.claim(ctx);
186 let hitvar_reader = hitvar_reader.claim(ctx);
187 let persistent_dir = persistent_dir.clone().claim(ctx);
188 let dir = dir.claim(ctx);
189 let key = key.claim(ctx);
190 move |rt| {
191 let persistent_dir = rt.read(persistent_dir);
192 let dir = rt.read(dir);
193 let key = rt.read(key);
194 let hitvar_reader = rt.read(hitvar_reader);
195
196 let mut cache_keys_file = fs_err::OpenOptions::new()
197 .append(true)
198 .create(true)
199 .read(true)
200 .open(persistent_dir.join("cache_keys"))?;
201
202 if matches!(hitvar_reader, CacheHit::Hit) {
203 // no need to update the cache
204 log::info!("was direct hit - no updates needed");
205 return Ok(());
206 }
207
208 // otherwise, need to update the cache
209 crate::_util::copy_dir_all(
210 dir,
211 persistent_dir.join(hash_key_to_dir(&key)),
212 )?;
213
214 cache_keys_file.seek(std::io::SeekFrom::End(0))?;
215 writeln!(cache_keys_file, "{}", key)?;
216
217 log::info!("cache saved");
218
219 Ok(())
220 }
221 });
222 }
223 }
224 FlowBackend::Ado => {
225 for Request {
226 label,
227 dir,
228 key,
229 restore_keys,
230 hitvar,
231 } in requests
232 {
233 let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();
234
235 let (dir_string, key, restore_keys) = {
236 let (processed_dir, write_processed_dir) = ctx.new_var();
237 let (processed_key, write_processed_key) = ctx.new_var();
238 let (processed_keys, write_processed_keys) = if restore_keys.is_some() {
239 let (a, b) = ctx.new_var();
240 (Some(a), Some(b))
241 } else {
242 (None, None)
243 };
244
245 ctx.emit_rust_step("Pre-processing cache vars", |ctx| {
246 require_post_job.claim(ctx);
247 let write_processed_dir = write_processed_dir.claim(ctx);
248 let write_processed_key = write_processed_key.claim(ctx);
249 let write_processed_keys = write_processed_keys.claim(ctx);
250
251 let dir = dir.clone().claim(ctx);
252 let key = key.claim(ctx);
253 let restore_keys = restore_keys.claim(ctx);
254
255 |rt| {
256 let dir = rt.read(dir);
257 // while we're here, we'll convert dir into a
258 // String, so it can be stuffed into an ADO var
259 rt.write(
260 write_processed_dir,
261 &dir.absolute()?.display().to_string(),
262 );
263
264 // Inject `$(FloweyCacheGeneration)` as part of the
265 // cache key to provide a non-intrusive mechanism to
266 // cycle the ADO cache when it gets into an
267 // inconsistent state.
268 //
269 // Deny cross-os caching by default (matching Github
270 // CI works by default).
271 //
272 // FUTURE: add toggle to request to allow cross-os
273 // caching?
274 let inject_extras = |s| {
275 format!(r#""$(FloweyCacheGeneration)" | "$(Agent.OS)" | "{s}""#)
276 };
277
278 let key = rt.read(key);
279 rt.write(write_processed_key, &inject_extras(key));
280
281 if let Some(write_processed_keys) = write_processed_keys {
282 let restore_keys = rt.read(restore_keys.unwrap());
283 rt.write(
284 write_processed_keys,
285 &restore_keys
286 .into_iter()
287 .map(inject_extras)
288 .collect::<Vec<_>>()
289 .join("\\n"),
290 );
291 }
292
293 Ok(())
294 }
295 });
296
297 (processed_dir, processed_key, processed_keys)
298 };
299
300 let (hitvar_str_reader, hitvar_str_writer) = ctx.new_var();
301
302 ctx.emit_ado_step(format!("Restore cache: {label}"), |ctx| {
303 let dir_string = dir_string.clone().claim(ctx);
304 let key = key.claim(ctx);
305 let restore_keys = restore_keys.claim(ctx);
306 let hitvar_str_writer = hitvar_str_writer.claim(ctx);
307 |rt| {
308 let dir_string = rt.get_var(dir_string).as_raw_var_name();
309 let key = rt.get_var(key).as_raw_var_name();
310 let restore_keys = if let Some(restore_keys) = restore_keys {
311 format!(
312 "restore_keys: $({})",
313 rt.get_var(restore_keys).as_raw_var_name()
314 )
315 } else {
316 String::new()
317 };
318
319 let hitvar_ado =
320 AdoRuntimeVar::dangerous_from_global("FLOWEY_CACHE_HITVAR", false);
321 // note the _lack_ of $() around the var!
322 let hitvar_input =
323 format!("cacheHitVar: {}", hitvar_ado.as_raw_var_name());
324 rt.set_var(hitvar_str_writer, hitvar_ado);
325
326 format!(
327 r#"
328 - task: Cache@2
329 inputs:
330 key: '$({key})'
331 path: $({dir_string})
332 {restore_keys}
333 {hitvar_input}
334 "#
335 )
336 }
337 });
338
339 ctx.emit_rust_step("map ADO hitvar to flowey", |ctx| {
340 let label = label.clone();
341 let dir = dir.clone().claim(ctx);
342
343 let hitvar = hitvar.claim(ctx);
344 let hitvar_str_reader = hitvar_str_reader.claim(ctx);
345 move |rt| {
346 let dir = rt.read(dir);
347 let hitvar_str = rt.read(hitvar_str_reader);
348 let mut var = match hitvar_str.as_str() {
349 "true" => CacheHit::Hit,
350 "false" => CacheHit::Miss,
351 "inexact" => CacheHit::PartialHit,
352 other => anyhow::bail!("unexpected cacheHitVar value: {other}"),
353 };
354
355 // WORKAROUND: ADO is really cool software, and
356 // sometimes, when it feels like it, i'll get into
357 // an inconsistent state where it reports a cache
358 // hit, but then the cache is actually empty.
359 if matches!(var, CacheHit::Hit | CacheHit::PartialHit) {
360 if dir.read_dir()?.next().is_none() {
361 log::error!("Detected inconsistent ADO cache entry: {label}");
362 log::error!("Please define/cycle the `FloweyCacheGeneration` pipeline variable");
363 var = CacheHit::Miss;
364 }
365 }
366
367 rt.write(hitvar, &var);
368 Ok(())
369 }
370 });
371
372 ctx.emit_rust_step(format!("validate cache entry: {label}"), |ctx| {
373 resolve_post_job.claim(ctx);
374 let dir = dir.clone().claim(ctx);
375 move |rt| {
376 let mut dir_contents = rt.read(dir).read_dir()?.peekable();
377
378 if dir_contents.peek().is_none() {
379 log::error!("Detected empty cache folder for entry: {label}");
380 log::error!("This is a bug - please update the pipeline code");
381 anyhow::bail!("cache error")
382 }
383
384 for entry in dir_contents {
385 let entry = entry?;
386 log::debug!("uploading: {}", entry.path().display());
387 }
388
389 Ok(())
390 }
391 });
392 }
393 }
394 FlowBackend::Github => {
395 for Request {
396 label,
397 dir,
398 key,
399 restore_keys,
400 hitvar,
401 } in requests
402 {
403 let (resolve_post_job, require_post_job) = ctx.new_post_job_side_effect();
404
405 let (dir_string, key, restore_keys) = {
406 let (processed_dir, write_processed_dir) = ctx.new_var();
407 let (processed_key, write_processed_key) = ctx.new_var();
408 let (processed_keys, write_processed_keys) = if restore_keys.is_some() {
409 let (a, b) = ctx.new_var();
410 (Some(a), Some(b))
411 } else {
412 (None, None)
413 };
414
415 ctx.emit_rust_step("Pre-processing cache vars", |ctx| {
416 require_post_job.claim(ctx);
417 let write_processed_dir = write_processed_dir.claim(ctx);
418 let write_processed_key = write_processed_key.claim(ctx);
419 let write_processed_keys = write_processed_keys.claim(ctx);
420
421 let dir = dir.clone().claim(ctx);
422 let key = key.claim(ctx);
423 let restore_keys = restore_keys.claim(ctx);
424
425 |rt| {
426 let dir = rt.read(dir);
427 rt.write(
428 write_processed_dir,
429 &dir.absolute()?.display().to_string(),
430 );
431
432 let key = rt.read(key);
433 let key = format!("{key}-{}-{}", rt.arch(), rt.platform());
434 rt.write(write_processed_key, &key);
435
436 if let Some(write_processed_keys) = write_processed_keys {
437 let restore_keys = rt.read(restore_keys.unwrap());
438 rt.write(
439 write_processed_keys,
440 &format!(
441 r#""[{}]""#,
442 restore_keys
443 .into_iter()
444 .map(|s| format!(
445 "'{s}-{}-{}'",
446 rt.arch(),
447 rt.platform()
448 ))
449 .collect::<Vec<_>>()
450 .join(", ")
451 ),
452 );
453 }
454
455 Ok(())
456 }
457 });
458
459 (processed_dir, processed_key, processed_keys)
460 };
461
462 let (hitvar_str_reader, hitvar_str_writer) = ctx.new_var();
463
464 let mut step = ctx
465 .emit_gh_step(format!("Restore cache: {label}"), "actions/cache@v4")
466 .with("key", key)
467 .with("path", dir_string);
468 if let Some(restore_keys) = restore_keys {
469 step = step.with("restore-keys", restore_keys);
470 }
471 step.output("cache-hit", hitvar_str_writer).finish(ctx);
472
473 ctx.emit_minor_rust_step("map Github cache-hit to flowey", |ctx| {
474 let hitvar = hitvar.claim(ctx);
475 let hitvar_str_reader = hitvar_str_reader.claim(ctx);
476 // TODO: How do we distinguish between a partial hit and a miss?
477 move |rt| {
478 let hitvar_str = rt.read(hitvar_str_reader);
479 // Github's cache action brilliantly only reports "false" if missing a cache key that exists,
480 // and leaves it blank if its a miss in other cases.
481 let var = match hitvar_str.as_str() {
482 "true" => CacheHit::Hit,
483 _ => CacheHit::Miss,
484 };
485
486 rt.write(hitvar, &var);
487 }
488 });
489
490 ctx.emit_rust_step(format!("validate cache entry: {label}"), |ctx| {
491 resolve_post_job.claim(ctx);
492 let dir = dir.clone().claim(ctx);
493 move |rt| {
494 let mut dir_contents = rt.read(dir).read_dir()?.peekable();
495
496 if dir_contents.peek().is_none() {
497 log::error!("Detected empty cache folder for entry: {label}");
498 log::error!("This is a bug - please update the pipeline code");
499 anyhow::bail!("cache error")
500 }
501
502 for entry in dir_contents {
503 let entry = entry?;
504 log::debug!("uploading: {}", entry.path().display());
505 }
506
507 Ok(())
508 }
509 });
510 }
511 }
512 }
513
514 Ok(())
515 }
516}
517
518// _technically_, if we want to be _super_ sure we're not gonna have a hash
519// collision, we should also do a content-hash of the thing we're about to
520// cache... but this should be OK for now, given that we don't expect to have a
521// massive number of cache entries.
522fn hash_key_to_dir(key: &str) -> String {
523 let hasher = &mut rustc_hash::FxHasher::default();
524 std::hash::Hash::hash(&key, hasher);
525 let hash = std::hash::Hasher::finish(hasher);
526 format!("{:08x?}", hash)
527}