Make purge_old_snapshot_archives aware of maximum_incremental_snapshot_archives_to_retain (#19615)

This commit is contained in:
Brooks Prumo
2021-09-04 07:37:29 -05:00
committed by GitHub
parent 8352bc48db
commit 333e5a9446
2 changed files with 182 additions and 49 deletions

View File

@ -252,7 +252,11 @@ pub fn download_snapshot<'a, 'b>(
maximum_snapshots_to_retain: usize, maximum_snapshots_to_retain: usize,
progress_notify_callback: &'a mut DownloadProgressCallbackOption<'b>, progress_notify_callback: &'a mut DownloadProgressCallbackOption<'b>,
) -> Result<(), String> { ) -> Result<(), String> {
snapshot_utils::purge_old_snapshot_archives(snapshot_archives_dir, maximum_snapshots_to_retain); snapshot_utils::purge_old_snapshot_archives(
snapshot_archives_dir,
maximum_snapshots_to_retain,
snapshot_utils::DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
);
for compression in &[ for compression in &[
ArchiveFormat::TarZstd, ArchiveFormat::TarZstd,

View File

@ -52,7 +52,7 @@ const DEFAULT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0;
pub(crate) const TMP_BANK_SNAPSHOT_PREFIX: &str = "tmp-bank-snapshot-"; pub(crate) const TMP_BANK_SNAPSHOT_PREFIX: &str = "tmp-bank-snapshot-";
pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-"; pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: usize = 2; pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: usize = 2;
pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: usize = 100; pub const DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN: usize = 4;
pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz)$"; pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz)$";
pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz)$"; pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz)$";
@ -372,7 +372,11 @@ pub fn archive_snapshot_package(
fs::rename(&archive_path, &snapshot_package.path()) fs::rename(&archive_path, &snapshot_package.path())
.map_err(|e| SnapshotError::IoWithSource(e, "archive path rename"))?; .map_err(|e| SnapshotError::IoWithSource(e, "archive path rename"))?;
purge_old_snapshot_archives(tar_dir, maximum_snapshot_archives_to_retain); purge_old_snapshot_archives(
tar_dir,
maximum_snapshot_archives_to_retain,
DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
);
timer.stop(); timer.stop();
info!( info!(
@ -1209,21 +1213,24 @@ where
incremental_snapshot_archives.into_iter().rev().next() incremental_snapshot_archives.into_iter().rev().next()
} }
pub fn purge_old_snapshot_archives<P>(snapshot_archives_dir: P, maximum_snapshots_to_retain: usize) pub fn purge_old_snapshot_archives<P>(
where snapshot_archives_dir: P,
maximum_full_snapshot_archives_to_retain: usize,
maximum_incremental_snapshot_archives_to_retain: usize,
) where
P: AsRef<Path>, P: AsRef<Path>,
{ {
info!( info!(
"Purging old snapshot archives in {}, retaining {} full snapshots", "Purging old snapshot archives in {}, retaining {} full snapshots",
snapshot_archives_dir.as_ref().display(), snapshot_archives_dir.as_ref().display(),
maximum_snapshots_to_retain maximum_full_snapshot_archives_to_retain
); );
let mut snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir); let mut snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir);
snapshot_archives.sort_unstable(); snapshot_archives.sort_unstable();
snapshot_archives.reverse(); snapshot_archives.reverse();
// Keep the oldest snapshot so we can always play the ledger from it. // Keep the oldest snapshot so we can always play the ledger from it.
snapshot_archives.pop(); snapshot_archives.pop();
let max_snaps = max(1, maximum_snapshots_to_retain); let max_snaps = max(1, maximum_full_snapshot_archives_to_retain);
for old_archive in snapshot_archives.into_iter().skip(max_snaps) { for old_archive in snapshot_archives.into_iter().skip(max_snaps) {
trace!( trace!(
"Purging old full snapshot archive: {}", "Purging old full snapshot archive: {}",
@ -1233,24 +1240,52 @@ where
.unwrap_or_else(|err| info!("Failed to remove old full snapshot archive: {}", err)); .unwrap_or_else(|err| info!("Failed to remove old full snapshot archive: {}", err));
} }
// Only keep incremental snapshots for the latest full snapshot // Purge incremental snapshots with a different base slot than the highest full snapshot slot.
// bprumo TODO issue #18639: As an option to further reduce the number of incremental // Of the incremental snapshots with the same base slot, purge the oldest ones and retain the
// snapshots, only a subset of the incremental snapshots for the lastest full snapshot could be // latest.
// kept. This could reuse maximum_snapshots_to_retain, or use a new field just for incremental //
// snapshots. // First split the incremental snapshot archives into two vectors:
// In case there are incremental snapshots but no full snapshots, make sure all the incremental // - One vector will be all the incremental snapshot archives with a _different_ base slot than
// snapshots are purged. // the highest full snapshot slot.
let last_full_snapshot_slot = // - The other vector will be all the incremental snapshot archives with the _same_ base slot
get_highest_full_snapshot_archive_slot(&snapshot_archives_dir).unwrap_or(Slot::MAX); // as the highest full snapshot slot.
//
// To find the incremental snapshot archives to retain, first sort the second vector (the
// _same_ base slot), then reverse (so highest slots are first) and skip the first
// `maximum_incremental_snapshot_archives_to_retain`.
//
// Purge all the rest.
let highest_full_snapshot_slot = get_highest_full_snapshot_archive_slot(&snapshot_archives_dir);
let mut incremental_snapshot_archives_with_same_base_slot = vec![];
let mut incremental_snapshot_archives_with_different_base_slot = vec![];
get_incremental_snapshot_archives(&snapshot_archives_dir) get_incremental_snapshot_archives(&snapshot_archives_dir)
.drain(..)
.for_each(|incremental_snapshot_archive| {
if Some(incremental_snapshot_archive.base_slot()) == highest_full_snapshot_slot {
incremental_snapshot_archives_with_same_base_slot
.push(incremental_snapshot_archive);
} else {
incremental_snapshot_archives_with_different_base_slot
.push(incremental_snapshot_archive);
}
});
incremental_snapshot_archives_with_same_base_slot.sort_unstable();
incremental_snapshot_archives_with_different_base_slot
.iter() .iter()
.filter(|archive_info| archive_info.base_slot() < last_full_snapshot_slot) .chain(
.for_each(|old_archive| { incremental_snapshot_archives_with_same_base_slot
.iter()
.rev()
.skip(maximum_incremental_snapshot_archives_to_retain),
)
.for_each(|incremental_snapshot_archive| {
trace!( trace!(
"Purging old incremental snapshot archive: {}", "Purging old incremental snapshot archive: {}",
old_archive.path().display() incremental_snapshot_archive.path().display()
); );
fs::remove_file(old_archive.path()).unwrap_or_else(|err| { fs::remove_file(incremental_snapshot_archive.path()).unwrap_or_else(|err| {
info!("Failed to remove old incremental snapshot archive: {}", err) info!("Failed to remove old incremental snapshot archive: {}", err)
}) })
}); });
@ -2241,7 +2276,8 @@ mod tests {
fn common_test_purge_old_snapshot_archives( fn common_test_purge_old_snapshot_archives(
snapshot_names: &[&String], snapshot_names: &[&String],
maximum_snapshots_to_retain: usize, maximum_full_snapshot_archives_to_retain: usize,
maximum_incremental_snapshot_archives_to_retain: usize,
expected_snapshots: &[&String], expected_snapshots: &[&String],
) { ) {
let temp_snap_dir = tempfile::TempDir::new().unwrap(); let temp_snap_dir = tempfile::TempDir::new().unwrap();
@ -2250,7 +2286,11 @@ mod tests {
let snap_path = temp_snap_dir.path().join(&snap_name); let snap_path = temp_snap_dir.path().join(&snap_name);
let mut _snap_file = File::create(snap_path); let mut _snap_file = File::create(snap_path);
} }
purge_old_snapshot_archives(temp_snap_dir.path(), maximum_snapshots_to_retain); purge_old_snapshot_archives(
temp_snap_dir.path(),
maximum_full_snapshot_archives_to_retain,
maximum_incremental_snapshot_archives_to_retain,
);
let mut retained_snaps = HashSet::new(); let mut retained_snaps = HashSet::new();
for entry in fs::read_dir(temp_snap_dir.path()).unwrap() { for entry in fs::read_dir(temp_snap_dir.path()).unwrap() {
@ -2280,14 +2320,29 @@ mod tests {
let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default()); let snap3_name = format!("snapshot-50-{}.tar.zst", Hash::default());
let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name]; let snapshot_names = vec![&snap1_name, &snap2_name, &snap3_name];
let expected_snapshots = vec![&snap1_name, &snap3_name]; let expected_snapshots = vec![&snap1_name, &snap3_name];
common_test_purge_old_snapshot_archives(&snapshot_names, 1, &expected_snapshots); common_test_purge_old_snapshot_archives(
&snapshot_names,
1,
DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
&expected_snapshots,
);
// retaining 0, the expectation is the same as for 1, as at least 1 newest is expected to be retained // retaining 0, the expectation is the same as for 1, as at least 1 newest is expected to be retained
common_test_purge_old_snapshot_archives(&snapshot_names, 0, &expected_snapshots); common_test_purge_old_snapshot_archives(
&snapshot_names,
0,
DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
&expected_snapshots,
);
// retaining 2, all three should be retained // retaining 2, all three should be retained
let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name]; let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
common_test_purge_old_snapshot_archives(&snapshot_names, 2, &expected_snapshots); common_test_purge_old_snapshot_archives(
&snapshot_names,
2,
DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
&expected_snapshots,
);
} }
/// Mimic a running node's behavior w.r.t. purging old snapshot archives. Take snapshots in a /// Mimic a running node's behavior w.r.t. purging old snapshot archives. Take snapshots in a
@ -2317,7 +2372,11 @@ mod tests {
continue; continue;
} }
purge_old_snapshot_archives(&snapshot_archives_dir, maximum_snapshots_to_retain); purge_old_snapshot_archives(
&snapshot_archives_dir,
maximum_snapshots_to_retain,
usize::MAX,
);
let mut full_snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir); let mut full_snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir);
full_snapshot_archives.sort_unstable(); full_snapshot_archives.sort_unstable();
assert_eq!( assert_eq!(
@ -2340,38 +2399,108 @@ mod tests {
#[test] #[test]
fn test_purge_old_incremental_snapshot_archives() { fn test_purge_old_incremental_snapshot_archives() {
let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); let snapshot_archives_dir = tempfile::TempDir::new().unwrap();
let starting_slot = 100_000;
for snapshot_filename in [ let maximum_incremental_snapshot_archives_to_retain =
format!("snapshot-100-{}.tar", Hash::default()), DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN;
format!("snapshot-200-{}.tar", Hash::default()), let maximum_full_snapshot_archives_to_retain = DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN;
format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
format!("incremental-snapshot-100-140-{}.tar", Hash::default()), let incremental_snapshot_interval = 100;
format!("incremental-snapshot-100-160-{}.tar", Hash::default()), let num_incremental_snapshots_per_full_snapshot =
format!("incremental-snapshot-100-180-{}.tar", Hash::default()), maximum_incremental_snapshot_archives_to_retain * 2;
format!("incremental-snapshot-200-220-{}.tar", Hash::default()), let full_snapshot_interval =
format!("incremental-snapshot-200-240-{}.tar", Hash::default()), incremental_snapshot_interval * num_incremental_snapshots_per_full_snapshot;
format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
format!("incremental-snapshot-200-280-{}.tar", Hash::default()), let mut snapshot_filenames = vec![];
] { (starting_slot..)
.step_by(full_snapshot_interval)
.take(maximum_full_snapshot_archives_to_retain * 2)
.for_each(|full_snapshot_slot| {
let snapshot_filename =
format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default());
let snapshot_path = snapshot_archives_dir.path().join(&snapshot_filename); let snapshot_path = snapshot_archives_dir.path().join(&snapshot_filename);
File::create(snapshot_path).unwrap(); File::create(snapshot_path).unwrap();
} snapshot_filenames.push(snapshot_filename);
purge_old_snapshot_archives(snapshot_archives_dir.path(), std::usize::MAX); (full_snapshot_slot..)
.step_by(incremental_snapshot_interval)
.take(num_incremental_snapshots_per_full_snapshot)
.skip(1)
.for_each(|incremental_snapshot_slot| {
let snapshot_filename = format!(
"incremental-snapshot-{}-{}-{}.tar",
full_snapshot_slot,
incremental_snapshot_slot,
Hash::default()
);
let snapshot_path = snapshot_archives_dir.path().join(&snapshot_filename);
File::create(snapshot_path).unwrap();
snapshot_filenames.push(snapshot_filename);
});
});
let remaining_incremental_snapshot_archives = purge_old_snapshot_archives(
snapshot_archives_dir.path(),
maximum_full_snapshot_archives_to_retain,
maximum_incremental_snapshot_archives_to_retain,
);
// Ensure correct number of full snapshot archives are purged/retained
// NOTE: One extra full snapshot is always kept (the oldest), hence the `+1`
let mut remaining_full_snapshot_archives =
get_full_snapshot_archives(snapshot_archives_dir.path());
assert_eq!(
remaining_full_snapshot_archives.len(),
maximum_full_snapshot_archives_to_retain + 1,
);
remaining_full_snapshot_archives.sort_unstable();
// Ensure correct number of incremental snapshot archives are purged/retained
let mut remaining_incremental_snapshot_archives =
get_incremental_snapshot_archives(snapshot_archives_dir.path()); get_incremental_snapshot_archives(snapshot_archives_dir.path());
assert_eq!(remaining_incremental_snapshot_archives.len(), 4); assert_eq!(
for archive in &remaining_incremental_snapshot_archives { remaining_incremental_snapshot_archives.len(),
assert_eq!(archive.base_slot(), 200); maximum_incremental_snapshot_archives_to_retain
);
remaining_incremental_snapshot_archives.sort_unstable();
// Ensure all remaining incremental snapshots are only for the latest full snapshot
let latest_full_snapshot_archive_slot =
remaining_full_snapshot_archives.last().unwrap().slot();
for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
assert_eq!(
incremental_snapshot_archive.base_slot(),
latest_full_snapshot_archive_slot
);
} }
// Ensure the remaining incremental snapshots are at the right slot
let expected_remaing_incremental_snapshot_archive_slots =
(latest_full_snapshot_archive_slot..)
.step_by(incremental_snapshot_interval)
.take(num_incremental_snapshots_per_full_snapshot)
.skip(
num_incremental_snapshots_per_full_snapshot
- maximum_incremental_snapshot_archives_to_retain,
)
.collect::<Vec<_>>();
let actual_remaining_incremental_snapshot_archive_slots =
remaining_incremental_snapshot_archives
.iter()
.map(|snapshot| snapshot.slot())
.collect::<Vec<_>>();
assert_eq!(
actual_remaining_incremental_snapshot_archive_slots,
expected_remaing_incremental_snapshot_archive_slots
);
} }
#[test] #[test]
fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() { fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() {
let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); let snapshot_archives_dir = tempfile::TempDir::new().unwrap();
for snapshot_filename in [ for snapshot_filenames in [
format!("incremental-snapshot-100-120-{}.tar", Hash::default()), format!("incremental-snapshot-100-120-{}.tar", Hash::default()),
format!("incremental-snapshot-100-140-{}.tar", Hash::default()), format!("incremental-snapshot-100-140-{}.tar", Hash::default()),
format!("incremental-snapshot-100-160-{}.tar", Hash::default()), format!("incremental-snapshot-100-160-{}.tar", Hash::default()),
@ -2381,11 +2510,11 @@ mod tests {
format!("incremental-snapshot-200-260-{}.tar", Hash::default()), format!("incremental-snapshot-200-260-{}.tar", Hash::default()),
format!("incremental-snapshot-200-280-{}.tar", Hash::default()), format!("incremental-snapshot-200-280-{}.tar", Hash::default()),
] { ] {
let snapshot_path = snapshot_archives_dir.path().join(&snapshot_filename); let snapshot_path = snapshot_archives_dir.path().join(&snapshot_filenames);
File::create(snapshot_path).unwrap(); File::create(snapshot_path).unwrap();
} }
purge_old_snapshot_archives(snapshot_archives_dir.path(), std::usize::MAX); purge_old_snapshot_archives(snapshot_archives_dir.path(), usize::MAX, usize::MAX);
let remaining_incremental_snapshot_archives = let remaining_incremental_snapshot_archives =
get_incremental_snapshot_archives(snapshot_archives_dir.path()); get_incremental_snapshot_archives(snapshot_archives_dir.path());