Add SnapshotArchiveInfo and refactor functions in snapshot_utils (#18232)

This commit is contained in:
Brooks Prumo
2021-07-01 12:20:56 -05:00
committed by GitHub
parent 5e424826ba
commit 45d54b1fc6
9 changed files with 392 additions and 134 deletions

View File

@ -38,6 +38,20 @@ use {
thiserror::Error,
};
/// Information about a snapshot archive: its path, slot, hash, and archive format
pub struct SnapshotArchiveInfo {
/// Path to the snapshot archive file
pub path: PathBuf,
/// Slot that the snapshot was made
pub slot: Slot,
/// Hash of the accounts at this slot
pub hash: Hash,
/// Archive format for the snapshot file
pub archive_format: ArchiveFormat,
}
pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
pub const MAX_SNAPSHOTS: usize = 8; // Save some snapshots but not too many
@ -47,6 +61,9 @@ const DEFAULT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0;
const TMP_SNAPSHOT_PREFIX: &str = "tmp-snapshot-";
pub const DEFAULT_MAX_SNAPSHOTS_TO_RETAIN: usize = 2;
pub const SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str =
r"^snapshot-(\d+)-([[:alnum:]]+)\.(tar|tar\.bz2|tar\.zst|tar\.gz)$";
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum SnapshotVersion {
V1_2_0,
@ -237,6 +254,7 @@ pub fn remove_tmp_snapshot_archives(snapshot_path: &Path) {
}
}
/// Make a snapshot archive out of the AccountsPackage
pub fn archive_snapshot_package(
snapshot_package: &AccountsPackage,
maximum_snapshots_to_retain: usize,
@ -593,9 +611,9 @@ fn serialize_status_cache(
Ok(())
}
/// Remove the snapshot directory for this slot
pub fn remove_snapshot<P: AsRef<Path>>(slot: Slot, snapshot_path: P) -> Result<()> {
let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot);
// Remove the snapshot directory for this slot
fs::remove_dir_all(slot_snapshot_dir)?;
Ok(())
}
@ -610,8 +628,9 @@ pub struct BankFromArchiveTimings {
// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
pub const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
/// Rebuild a bank from a snapshot archive
#[allow(clippy::too_many_arguments)]
pub fn bank_from_archive<P: AsRef<Path> + std::marker::Sync>(
pub fn bank_from_snapshot_archive<P>(
account_paths: &[PathBuf],
frozen_account_pubkeys: &[Pubkey],
snapshot_path: &Path,
@ -625,7 +644,10 @@ pub fn bank_from_archive<P: AsRef<Path> + std::marker::Sync>(
limit_load_slot_count_from_snapshot: Option<usize>,
shrink_ratio: AccountShrinkThreshold,
test_hash_calculation: bool,
) -> Result<(Bank, BankFromArchiveTimings)> {
) -> Result<(Bank, BankFromArchiveTimings)>
where
P: AsRef<Path> + std::marker::Sync,
{
let unpack_dir = tempfile::Builder::new()
.prefix(TMP_SNAPSHOT_PREFIX)
.tempdir_in(snapshot_path)?;
@ -684,15 +706,18 @@ pub fn bank_from_archive<P: AsRef<Path> + std::marker::Sync>(
Ok((bank, timings))
}
pub fn get_snapshot_archive_path(
/// Build the snapshot archive path from its components: the snapshot archive output directory, the
/// snapshot slot, the accounts hash, and the archive format.
pub fn build_snapshot_archive_path(
snapshot_output_dir: PathBuf,
snapshot_hash: &(Slot, Hash),
slot: Slot,
hash: &Hash,
archive_format: ArchiveFormat,
) -> PathBuf {
snapshot_output_dir.join(format!(
"snapshot-{}-{}.{}",
snapshot_hash.0,
snapshot_hash.1,
slot,
hash,
get_archive_ext(archive_format),
))
}
@ -709,61 +734,84 @@ fn archive_format_from_str(archive_format: &str) -> Option<ArchiveFormat> {
/// Parse a snapshot archive filename into its Slot, Hash, and Archive Format
fn parse_snapshot_archive_filename(archive_filename: &str) -> Option<(Slot, Hash, ArchiveFormat)> {
let snapshot_archive_filename_regex =
Regex::new(r"^snapshot-(\d+)-([[:alnum:]]+)\.(tar|tar\.bz2|tar\.zst|tar\.gz)$");
let regex = Regex::new(SNAPSHOT_ARCHIVE_FILENAME_REGEX);
snapshot_archive_filename_regex
.ok()?
.captures(archive_filename)
.and_then(|captures| {
let slot = captures.get(1).map(|x| x.as_str().parse::<Slot>())?.ok()?;
let hash = captures.get(2).map(|x| x.as_str().parse::<Hash>())?.ok()?;
let archive_format = captures
.get(3)
.map(|x| archive_format_from_str(x.as_str()))??;
regex.ok()?.captures(archive_filename).and_then(|captures| {
let slot = captures.get(1).map(|x| x.as_str().parse::<Slot>())?.ok()?;
let hash = captures.get(2).map(|x| x.as_str().parse::<Hash>())?.ok()?;
let archive_format = captures
.get(3)
.map(|x| archive_format_from_str(x.as_str()))??;
Some((slot, hash, archive_format))
})
Some((slot, hash, archive_format))
})
}
/// Get a list of the snapshot archives in a directory, sorted by Slot in descending order
fn get_snapshot_archives<P: AsRef<Path>>(
snapshot_output_dir: P,
) -> Vec<(PathBuf, (Slot, Hash, ArchiveFormat))> {
/// Get a list of the snapshot archives in a directory
pub fn get_snapshot_archives<P>(snapshot_output_dir: P) -> Vec<SnapshotArchiveInfo>
where
P: AsRef<Path>,
{
match fs::read_dir(&snapshot_output_dir) {
Err(err) => {
info!("Unable to read snapshot directory: {}", err);
vec![]
}
Ok(files) => {
let mut archives: Vec<_> = files
.filter_map(|entry| {
if let Ok(entry) = entry {
let path = entry.path();
if path.is_file() {
if let Some(snapshot_hash) = parse_snapshot_archive_filename(
path.file_name().unwrap().to_str().unwrap(),
) {
return Some((path, snapshot_hash));
}
Ok(files) => files
.filter_map(|entry| {
if let Ok(entry) = entry {
let path = entry.path();
if path.is_file() {
if let Some((slot, hash, archive_format)) = parse_snapshot_archive_filename(
path.file_name().unwrap().to_str().unwrap(),
) {
return Some(SnapshotArchiveInfo {
path,
slot,
hash,
archive_format,
});
}
}
None
})
.collect();
archives.sort_by(|a, b| (b.1).0.cmp(&(a.1).0)); // reverse sort by slot
archives
}
}
None
})
.collect(),
}
}
/// Get the snapshot archive with the highest Slot in a directory
pub fn get_highest_snapshot_archive_path<P: AsRef<Path>>(
snapshot_output_dir: P,
) -> Option<(PathBuf, (Slot, Hash, ArchiveFormat))> {
let archives = get_snapshot_archives(snapshot_output_dir);
archives.into_iter().next()
/// Get a sorted list of the snapshot archives in a directory
fn get_sorted_snapshot_archives<P>(snapshot_output_dir: P) -> Vec<SnapshotArchiveInfo>
where
P: AsRef<Path>,
{
let mut snapshot_archives = get_snapshot_archives(snapshot_output_dir);
sort_snapshot_archives(&mut snapshot_archives);
snapshot_archives
}
/// Sort the list of snapshot archives by slot, in descending order
fn sort_snapshot_archives(snapshot_archives: &mut Vec<SnapshotArchiveInfo>) {
snapshot_archives.sort_unstable_by(|a, b| b.slot.cmp(&a.slot));
}
/// Get the highest slot of the snapshots in a directory
pub fn get_highest_snapshot_archive_slot<P>(snapshot_output_dir: P) -> Option<Slot>
where
P: AsRef<Path>,
{
get_highest_snapshot_archive_info(snapshot_output_dir)
.map(|snapshot_archive_info| snapshot_archive_info.slot)
}
/// Get the path (and metadata) for the snapshot archive with the highest slot in a directory
pub fn get_highest_snapshot_archive_info<P>(snapshot_output_dir: P) -> Option<SnapshotArchiveInfo>
where
P: AsRef<Path>,
{
get_sorted_snapshot_archives(snapshot_output_dir)
.into_iter()
.next()
}
pub fn purge_old_snapshot_archives<P: AsRef<Path>>(
@ -775,12 +823,12 @@ pub fn purge_old_snapshot_archives<P: AsRef<Path>>(
snapshot_output_dir.as_ref(),
maximum_snapshots_to_retain
);
let mut archives = get_snapshot_archives(snapshot_output_dir);
let mut archives = get_sorted_snapshot_archives(snapshot_output_dir);
// Keep the oldest snapshot so we can always play the ledger from it.
archives.pop();
let max_snaps = max(1, maximum_snapshots_to_retain);
for old_archive in archives.into_iter().skip(max_snaps) {
fs::remove_file(old_archive.0)
fs::remove_file(old_archive.path)
.unwrap_or_else(|err| info!("Failed to remove old snapshot: {:}", err));
}
}
@ -993,7 +1041,7 @@ pub fn purge_old_snapshots(snapshot_path: &Path) {
}
}
// Gather the necessary elements for a snapshot of the given `root_bank`
/// Gather the necessary elements for a snapshot of the given `root_bank`
pub fn snapshot_bank(
root_bank: &Bank,
status_cache_slot_deltas: Vec<BankSlotDelta>,
@ -1035,6 +1083,9 @@ pub fn snapshot_bank(
/// Convenience function to create a snapshot archive out of any Bank, regardless of state. The
/// Bank will be frozen during the process.
///
/// Requires:
/// - `bank` is complete
pub fn bank_to_snapshot_archive<P: AsRef<Path>, Q: AsRef<Path>>(
snapshot_path: P,
bank: &Bank,
@ -1055,7 +1106,7 @@ pub fn bank_to_snapshot_archive<P: AsRef<Path>, Q: AsRef<Path>>(
let temp_dir = tempfile::tempdir_in(snapshot_path)?;
let storages: Vec<_> = bank.get_snapshot_storages();
let storages = bank.get_snapshot_storages();
let slot_snapshot_paths = add_snapshot(&temp_dir, bank, &storages, snapshot_version)?;
let package = package_snapshot(
bank,
@ -1104,9 +1155,10 @@ pub fn process_accounts_package_pre(
("calculate_hash", time.as_us(), i64),
);
let tar_output_file = get_snapshot_archive_path(
let tar_output_file = build_snapshot_archive_path(
accounts_package.snapshot_output_dir,
&(accounts_package.slot, hash),
accounts_package.slot,
&hash,
accounts_package.archive_format,
);
@ -1128,6 +1180,10 @@ mod tests {
use super::*;
use assert_matches::assert_matches;
use bincode::{deserialize_from, serialize_into};
use solana_sdk::{
genesis_config::create_genesis_config,
signature::{Keypair, Signer},
};
use std::mem::size_of;
#[test]
@ -1246,47 +1302,104 @@ mod tests {
Some((43, Hash::default(), ArchiveFormat::TarZstd))
);
assert_eq!(
parse_snapshot_archive_filename(&format!("snapshot-42-{}.tar", Hash::default())),
Some((42, Hash::default(), ArchiveFormat::Tar))
parse_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default())),
Some((44, Hash::default(), ArchiveFormat::Tar))
);
assert!(parse_snapshot_archive_filename("invalid").is_none());
assert!(parse_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_none());
assert!(parse_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_none());
assert!(parse_snapshot_archive_filename("snapshot-12345678-HASH1234.bad!ext").is_none());
assert!(parse_snapshot_archive_filename(&format!(
"snapshot-12345678-{}.bad!ext",
Hash::new_unique()
))
.is_none());
assert!(parse_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_none());
assert!(parse_snapshot_archive_filename("snapshot-bad!slot-HASH1234.bad!ext").is_none());
assert!(parse_snapshot_archive_filename("snapshot-12345678-HASH1234.bad!ext").is_none());
assert!(parse_snapshot_archive_filename("snapshot-bad!slot-HASH1234.tar").is_none());
assert!(parse_snapshot_archive_filename(&format!(
"snapshot-bad!slot-{}.bad!ext",
Hash::new_unique()
))
.is_none());
assert!(parse_snapshot_archive_filename(&format!(
"snapshot-12345678-{}.bad!ext",
Hash::new_unique()
))
.is_none());
assert!(parse_snapshot_archive_filename(&format!(
"snapshot-bad!slot-{}.tar",
Hash::new_unique()
))
.is_none());
assert!(parse_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_none());
assert!(parse_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_none());
assert!(parse_snapshot_archive_filename("snapshot-bad!slot-HASH1234.tar").is_none());
assert!(parse_snapshot_archive_filename(&format!(
"snapshot-bad!slot-{}.tar",
Hash::new_unique()
))
.is_none());
}
#[test]
fn test_get_snapshot_archives() {
let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
let min_slot = 123;
let max_slot = 456;
for slot in min_slot..max_slot {
let snapshot_filename = format!("snapshot-{}-{}.tar", slot, Hash::default());
let snapshot_filepath = temp_snapshot_archives_dir.path().join(snapshot_filename);
/// A test helper function that creates snapshot archive files. Creates snapshot files in the
/// range (`min_snapshot_slot`, `max_snapshot_slot`]. Additionally, "bad" files are created
/// for snapshots to ensure the tests properly filter them out.
fn common_create_snapshot_archive_files(
snapshot_dir: &Path,
min_snapshot_slot: Slot,
max_snapshot_slot: Slot,
) {
for snapshot_slot in min_snapshot_slot..max_snapshot_slot {
let snapshot_filename = format!("snapshot-{}-{}.tar", snapshot_slot, Hash::default());
let snapshot_filepath = snapshot_dir.join(snapshot_filename);
File::create(snapshot_filepath).unwrap();
}
// Add in a snapshot with a bad filename and high slot to ensure filename are filtered and
// sorted correctly
let bad_filename = format!("snapshot-{}-{}.bad!ext", max_slot + 1, Hash::default());
let bad_filepath = temp_snapshot_archives_dir.path().join(bad_filename);
let bad_filename = format!("snapshot-{}-bad!hash.tar", max_snapshot_slot + 1);
let bad_filepath = snapshot_dir.join(bad_filename);
File::create(bad_filepath).unwrap();
}
let results = get_snapshot_archives(temp_snapshot_archives_dir);
assert_eq!(results.len(), max_slot - min_slot);
assert_eq!(results[0].1 .0 as usize, max_slot - 1);
#[test]
fn test_get_snapshot_archives() {
solana_logger::setup();
let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
let min_slot = 123;
let max_slot = 456;
common_create_snapshot_archive_files(temp_snapshot_archives_dir.path(), min_slot, max_slot);
let snapshot_archives = get_snapshot_archives(temp_snapshot_archives_dir);
assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
}
#[test]
fn test_get_sorted_snapshot_archives() {
solana_logger::setup();
let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
let min_slot = 12;
let max_slot = 45;
common_create_snapshot_archive_files(temp_snapshot_archives_dir.path(), min_slot, max_slot);
let sorted_snapshot_archives = get_sorted_snapshot_archives(temp_snapshot_archives_dir);
assert_eq!(sorted_snapshot_archives.len() as Slot, max_slot - min_slot);
assert_eq!(sorted_snapshot_archives[0].slot, max_slot - 1);
}
#[test]
fn test_get_highest_snapshot_archive_slot() {
solana_logger::setup();
let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
let min_slot = 123;
let max_slot = 456;
common_create_snapshot_archive_files(temp_snapshot_archives_dir.path(), min_slot, max_slot);
assert_eq!(
get_highest_snapshot_archive_slot(temp_snapshot_archives_dir.path()),
Some(max_slot - 1)
);
}
fn common_test_purge_old_snapshot_archives(
@ -1339,4 +1452,140 @@ mod tests {
let expected_snapshots = vec![&snap1_name, &snap2_name, &snap3_name];
common_test_purge_old_snapshot_archives(&snapshot_names, 2, &expected_snapshots);
}
/// Test roundtrip of bank to snapshot, then back again. This test creates the simplest bank
/// possible, so the contents of the snapshot archive will be quite minimal.
#[test]
fn test_roundtrip_bank_to_snapshot_to_bank_simple() {
solana_logger::setup();
let genesis_config = GenesisConfig::default();
let original_bank = Bank::new(&genesis_config);
while !original_bank.is_complete() {
original_bank.register_tick(&Hash::new_unique());
}
let accounts_dir = tempfile::TempDir::new().unwrap();
let snapshot_dir = tempfile::TempDir::new().unwrap();
let snapshot_package_output_dir = tempfile::TempDir::new().unwrap();
let snapshot_archive_format = ArchiveFormat::Tar;
let snapshot_archive_path = bank_to_snapshot_archive(
snapshot_dir.path(),
&original_bank,
None,
snapshot_package_output_dir.path(),
snapshot_archive_format,
None,
1,
)
.unwrap();
let (roundtrip_bank, _) = bank_from_snapshot_archive(
&[PathBuf::from(accounts_dir.path())],
&[],
snapshot_dir.path(),
&snapshot_archive_path,
snapshot_archive_format,
&genesis_config,
None,
None,
AccountSecondaryIndexes::default(),
false,
None,
AccountShrinkThreshold::default(),
false,
)
.unwrap();
assert_eq!(original_bank, roundtrip_bank);
}
/// Test roundtrip of bank to snapshot, then back again. This test is more involved than the
/// simple version above; creating multiple banks over multiple slots and doing multiple
/// transfers. So this snapshot should contain more data.
#[test]
fn test_roundtrip_bank_to_snapshot_to_bank_complex() {
solana_logger::setup();
let collector = Pubkey::new_unique();
let key1 = Keypair::new();
let key2 = Keypair::new();
let key3 = Keypair::new();
let key4 = Keypair::new();
let key5 = Keypair::new();
let (genesis_config, mint_keypair) = create_genesis_config(1_000_000);
let bank0 = Arc::new(Bank::new(&genesis_config));
bank0.transfer(1, &mint_keypair, &key1.pubkey()).unwrap();
bank0.transfer(2, &mint_keypair, &key2.pubkey()).unwrap();
bank0.transfer(3, &mint_keypair, &key3.pubkey()).unwrap();
while !bank0.is_complete() {
bank0.register_tick(&Hash::new_unique());
}
let slot = 1;
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &collector, slot));
bank1.transfer(3, &mint_keypair, &key3.pubkey()).unwrap();
bank1.transfer(4, &mint_keypair, &key4.pubkey()).unwrap();
bank1.transfer(5, &mint_keypair, &key5.pubkey()).unwrap();
while !bank1.is_complete() {
bank1.register_tick(&Hash::new_unique());
}
let slot = slot + 1;
let bank2 = Arc::new(Bank::new_from_parent(&bank1, &collector, slot));
bank2.transfer(1, &mint_keypair, &key1.pubkey()).unwrap();
while !bank2.is_complete() {
bank2.register_tick(&Hash::new_unique());
}
let slot = slot + 1;
let bank3 = Arc::new(Bank::new_from_parent(&bank2, &collector, slot));
bank3.transfer(1, &mint_keypair, &key1.pubkey()).unwrap();
while !bank3.is_complete() {
bank3.register_tick(&Hash::new_unique());
}
let slot = slot + 1;
let bank4 = Arc::new(Bank::new_from_parent(&bank3, &collector, slot));
bank4.transfer(1, &mint_keypair, &key1.pubkey()).unwrap();
while !bank4.is_complete() {
bank4.register_tick(&Hash::new_unique());
}
let accounts_dir = tempfile::TempDir::new().unwrap();
let snapshot_dir = tempfile::TempDir::new().unwrap();
let snapshot_package_output_dir = tempfile::TempDir::new().unwrap();
let snapshot_archive_format = ArchiveFormat::Tar;
let full_snapshot_archive_path = bank_to_snapshot_archive(
snapshot_dir.path(),
&bank4,
None,
snapshot_package_output_dir.path(),
snapshot_archive_format,
None,
std::usize::MAX,
)
.unwrap();
let (roundtrip_bank, _) = bank_from_snapshot_archive(
&[PathBuf::from(accounts_dir.path())],
&[],
snapshot_dir.path(),
&full_snapshot_archive_path,
snapshot_archive_format,
&genesis_config,
None,
None,
AccountSecondaryIndexes::default(),
false,
None,
AccountShrinkThreshold::default(),
false,
)
.unwrap();
assert_eq!(*bank4, roundtrip_bank);
}
}