Various snapshot-related code clean up (bp #14487) (#14513)

* Create account paths once

(cherry picked from commit fe0ba4a429)

* Replace incorrect symlink_dir usage with symlink_file

(cherry picked from commit f2a7f561a0)

* Reduce TempDir exposure

(cherry picked from commit 9f70f7dc3e)

* Rename AccountsPackage::root to AccountsPackage::slot

(cherry picked from commit 141e6706e6)

* Rename CompressionType to ArchiveFormat

(cherry picked from commit 7be6770808)

Co-authored-by: Michael Vines <mvines@gmail.com>
This commit is contained in:
mergify[bot]
2021-01-09 18:29:09 +00:00
committed by GitHub
parent ae73cc8d05
commit 484bd48b35
14 changed files with 129 additions and 128 deletions

View File

@@ -1,7 +1,7 @@
use crate::{
accounts_index::AccountIndex,
bank::{Bank, BankSlotDelta, Builtins},
bank_forks::CompressionType,
bank_forks::ArchiveFormat,
hardened_unpack::{unpack_snapshot, UnpackError},
serde_snapshot::{
bank_from_stream, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages,
@@ -27,7 +27,6 @@ use std::{
str::FromStr,
};
use tar::Archive;
use tempfile::TempDir;
use thiserror::Error;
pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
@@ -159,7 +158,7 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
status_cache_slot_deltas: Vec<BankSlotDelta>,
snapshot_package_output_path: P,
snapshot_storages: SnapshotStorages,
compression: CompressionType,
archive_format: ArchiveFormat,
snapshot_version: SnapshotVersion,
) -> Result<AccountsPackage> {
// Hard link all the snapshots we need for this package
@@ -181,7 +180,7 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
let snapshot_package_output_file = get_snapshot_archive_path(
&snapshot_package_output_path,
&(bank.slot(), bank.get_accounts_hash()),
&compression,
&archive_format,
);
let package = AccountsPackage::new(
@@ -192,19 +191,19 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
snapshot_storages,
snapshot_package_output_file,
bank.get_accounts_hash(),
compression,
archive_format,
snapshot_version,
);
Ok(package)
}
fn get_compression_ext(compression: &CompressionType) -> &'static str {
match compression {
CompressionType::Bzip2 => ".tar.bz2",
CompressionType::Gzip => ".tar.gz",
CompressionType::Zstd => ".tar.zst",
CompressionType::NoCompression => ".tar",
fn get_archive_ext(archive_format: &ArchiveFormat) -> &'static str {
match archive_format {
ArchiveFormat::TarBzip2 => ".tar.bz2",
ArchiveFormat::TarGzip => ".tar.gz",
ArchiveFormat::TarZstd => ".tar.zst",
ArchiveFormat::Tar => ".tar",
}
}
@@ -230,13 +229,16 @@ pub fn remove_tmp_snapshot_archives(snapshot_path: &Path) {
pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()> {
info!(
"Generating snapshot archive for slot {}",
snapshot_package.root
snapshot_package.slot
);
serialize_status_cache(
snapshot_package.root,
snapshot_package.slot,
&snapshot_package.slot_deltas,
&snapshot_package.snapshot_links,
&snapshot_package
.snapshot_links
.path()
.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
)?;
let mut timer = Measure::start("snapshot_package-package_snapshots");
@@ -274,10 +276,10 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
));
// `storage_path` - The file path where the AppendVec itself is located
// `output_path` - The directory where the AppendVec will be placed in the staging directory.
// `output_path` - The file path where the AppendVec will be placed in the staging directory.
let storage_path =
fs::canonicalize(storage_path).expect("Could not get absolute path for accounts");
symlink::symlink_dir(storage_path, &output_path)?;
symlink::symlink_file(storage_path, &output_path)?;
if !output_path.is_file() {
return Err(SnapshotError::StoragePathSymlinkInvalid);
}
@@ -289,7 +291,7 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
f.write_all(snapshot_package.snapshot_version.as_str().as_bytes())?;
}
let file_ext = get_compression_ext(&snapshot_package.compression);
let file_ext = get_archive_ext(&snapshot_package.archive_format);
// Tar the staging directory into the archive at `archive_path`
//
@@ -320,23 +322,23 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
Some(tar_output) => {
let mut archive_file = fs::File::create(&archive_path)?;
match snapshot_package.compression {
CompressionType::Bzip2 => {
match snapshot_package.archive_format {
ArchiveFormat::TarBzip2 => {
let mut encoder =
bzip2::write::BzEncoder::new(archive_file, bzip2::Compression::Best);
io::copy(tar_output, &mut encoder)?;
let _ = encoder.finish()?;
}
CompressionType::Gzip => {
ArchiveFormat::TarGzip => {
let mut encoder =
flate2::write::GzEncoder::new(archive_file, flate2::Compression::default());
io::copy(tar_output, &mut encoder)?;
let _ = encoder.finish()?;
}
CompressionType::NoCompression => {
ArchiveFormat::Tar => {
io::copy(tar_output, &mut archive_file)?;
}
CompressionType::Zstd => {
ArchiveFormat::TarZstd => {
let mut encoder = zstd::stream::Encoder::new(archive_file, 0)?;
io::copy(tar_output, &mut encoder)?;
let _ = encoder.finish()?;
@@ -361,13 +363,13 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
info!(
"Successfully created {:?}. slot: {}, elapsed ms: {}, size={}",
snapshot_package.tar_output_file,
snapshot_package.root,
snapshot_package.slot,
timer.as_ms(),
metadata.len()
);
datapoint_info!(
"snapshot-package",
("slot", snapshot_package.root, i64),
("slot", snapshot_package.slot, i64),
("duration_ms", timer.as_ms(), i64),
("size", metadata.len(), i64)
);
@@ -546,14 +548,10 @@ pub fn add_snapshot<P: AsRef<Path>>(
fn serialize_status_cache(
slot: Slot,
slot_deltas: &[BankSlotDelta],
snapshot_links: &TempDir,
status_cache_path: &Path,
) -> Result<()> {
// the status cache is stored as snapshot_path/status_cache
let snapshot_status_cache_file_path =
snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
let consumed_size = serialize_snapshot_data_file(&snapshot_status_cache_file_path, |stream| {
let consumed_size = serialize_snapshot_data_file(status_cache_path, |stream| {
serialize_into(stream, slot_deltas)?;
Ok(())
})?;
@@ -585,7 +583,7 @@ pub fn bank_from_archive<P: AsRef<Path>>(
frozen_account_pubkeys: &[Pubkey],
snapshot_path: &PathBuf,
snapshot_tar: P,
compression: CompressionType,
archive_format: ArchiveFormat,
genesis_config: &GenesisConfig,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
additional_builtins: Option<&Builtins>,
@@ -595,7 +593,7 @@ pub fn bank_from_archive<P: AsRef<Path>>(
let unpack_dir = tempfile::Builder::new()
.prefix(TMP_SNAPSHOT_DIR_PREFIX)
.tempdir_in(snapshot_path)?;
untar_snapshot_in(&snapshot_tar, &unpack_dir, compression)?;
untar_snapshot_in(&snapshot_tar, &unpack_dir, archive_format)?;
let mut measure = Measure::start("bank rebuild from snapshot");
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
@@ -629,27 +627,27 @@ pub fn bank_from_archive<P: AsRef<Path>>(
pub fn get_snapshot_archive_path<P: AsRef<Path>>(
snapshot_output_dir: P,
snapshot_hash: &(Slot, Hash),
compression: &CompressionType,
archive_format: &ArchiveFormat,
) -> PathBuf {
snapshot_output_dir.as_ref().join(format!(
"snapshot-{}-{}{}",
snapshot_hash.0,
snapshot_hash.1,
get_compression_ext(compression),
get_archive_ext(archive_format),
))
}
fn compression_type_from_str(compress: &str) -> Option<CompressionType> {
match compress {
"tar.bz2" => Some(CompressionType::Bzip2),
"tar.gz" => Some(CompressionType::Gzip),
"tar.zst" => Some(CompressionType::Zstd),
"tar" => Some(CompressionType::NoCompression),
fn archive_format_from_str(archive_format: &str) -> Option<ArchiveFormat> {
match archive_format {
"tar.bz2" => Some(ArchiveFormat::TarBzip2),
"tar.gz" => Some(ArchiveFormat::TarGzip),
"tar.zst" => Some(ArchiveFormat::TarZstd),
"tar" => Some(ArchiveFormat::Tar),
_ => None,
}
}
fn snapshot_hash_of(archive_filename: &str) -> Option<(Slot, Hash, CompressionType)> {
fn snapshot_hash_of(archive_filename: &str) -> Option<(Slot, Hash, ArchiveFormat)> {
let snapshot_filename_regex =
Regex::new(r"snapshot-(\d+)-([[:alnum:]]+)\.(tar|tar\.bz2|tar\.zst|tar\.gz)$").unwrap();
@@ -658,12 +656,12 @@ fn snapshot_hash_of(archive_filename: &str) -> Option<(Slot, Hash, CompressionTy
let hash_str = captures.get(2).unwrap().as_str();
let ext = captures.get(3).unwrap().as_str();
if let (Ok(slot), Ok(hash), Some(compression)) = (
if let (Ok(slot), Ok(hash), Some(archive_format)) = (
slot_str.parse::<Slot>(),
hash_str.parse::<Hash>(),
compression_type_from_str(ext),
archive_format_from_str(ext),
) {
return Some((slot, hash, compression));
return Some((slot, hash, archive_format));
}
}
None
@@ -671,7 +669,7 @@ fn snapshot_hash_of(archive_filename: &str) -> Option<(Slot, Hash, CompressionTy
pub fn get_snapshot_archives<P: AsRef<Path>>(
snapshot_output_dir: P,
) -> Vec<(PathBuf, (Slot, Hash, CompressionType))> {
) -> Vec<(PathBuf, (Slot, Hash, ArchiveFormat))> {
match fs::read_dir(&snapshot_output_dir) {
Err(err) => {
info!("Unable to read snapshot directory: {}", err);
@@ -702,7 +700,7 @@ pub fn get_snapshot_archives<P: AsRef<Path>>(
pub fn get_highest_snapshot_archive_path<P: AsRef<Path>>(
snapshot_output_dir: P,
) -> Option<(PathBuf, (Slot, Hash, CompressionType))> {
) -> Option<(PathBuf, (Slot, Hash, ArchiveFormat))> {
let archives = get_snapshot_archives(snapshot_output_dir);
archives.into_iter().next()
}
@@ -720,27 +718,27 @@ pub fn purge_old_snapshot_archives<P: AsRef<Path>>(snapshot_output_dir: P) {
pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
snapshot_tar: P,
unpack_dir: Q,
compression: CompressionType,
archive_format: ArchiveFormat,
) -> Result<()> {
let mut measure = Measure::start("snapshot untar");
let tar_name = File::open(&snapshot_tar)?;
match compression {
CompressionType::Bzip2 => {
match archive_format {
ArchiveFormat::TarBzip2 => {
let tar = BzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir)?;
}
CompressionType::Gzip => {
ArchiveFormat::TarGzip => {
let tar = GzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir)?;
}
CompressionType::Zstd => {
ArchiveFormat::TarZstd => {
let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?;
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir)?;
}
CompressionType::NoCompression => {
ArchiveFormat::Tar => {
let tar = BufReader::new(tar_name);
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir)?;
@@ -839,7 +837,7 @@ pub fn verify_snapshot_archive<P, Q, R>(
snapshot_archive: P,
snapshots_to_verify: Q,
storages_to_verify: R,
compression: CompressionType,
archive_format: ArchiveFormat,
) where
P: AsRef<Path>,
Q: AsRef<Path>,
@@ -847,7 +845,7 @@ pub fn verify_snapshot_archive<P, Q, R>(
{
let temp_dir = tempfile::TempDir::new().unwrap();
let unpack_dir = temp_dir.path();
untar_snapshot_in(snapshot_archive, &unpack_dir, compression).unwrap();
untar_snapshot_in(snapshot_archive, &unpack_dir, archive_format).unwrap();
// Check snapshots are the same
let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR);
@@ -878,7 +876,7 @@ pub fn snapshot_bank(
snapshot_path: &Path,
snapshot_package_output_path: &Path,
snapshot_version: SnapshotVersion,
compression: &CompressionType,
archive_format: &ArchiveFormat,
) -> Result<()> {
let storages: Vec<_> = root_bank.get_snapshot_storages();
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
@@ -899,7 +897,7 @@ pub fn snapshot_bank(
status_cache_slot_deltas,
snapshot_package_output_path,
storages,
compression.clone(),
archive_format.clone(),
snapshot_version,
)?;
@@ -1024,15 +1022,15 @@ mod tests {
fn test_snapshot_hash_of() {
assert_eq!(
snapshot_hash_of(&format!("snapshot-42-{}.tar.bz2", Hash::default())),
Some((42, Hash::default(), CompressionType::Bzip2))
Some((42, Hash::default(), ArchiveFormat::TarBzip2))
);
assert_eq!(
snapshot_hash_of(&format!("snapshot-43-{}.tar.zst", Hash::default())),
Some((43, Hash::default(), CompressionType::Zstd))
Some((43, Hash::default(), ArchiveFormat::TarZstd))
);
assert_eq!(
snapshot_hash_of(&format!("snapshot-42-{}.tar", Hash::default())),
Some((42, Hash::default(), CompressionType::NoCompression))
Some((42, Hash::default(), ArchiveFormat::Tar))
);
assert!(snapshot_hash_of("invalid").is_none());