untar in parallel (#18184)

* untar in parallel

* make enum for 'ignore' return value
This commit is contained in:
Jeff Washington (jwash)
2021-06-24 17:29:49 -05:00
committed by GitHub
parent 50deba1f10
commit 31ec986ea1
2 changed files with 76 additions and 27 deletions

View File

@ -84,6 +84,12 @@ fn check_unpack_result(unpack_result: bool, path: String) -> Result<()> {
Ok(()) Ok(())
} }
pub enum UnpackPath<'a> {
Valid(&'a Path),
Ignore,
Invalid,
}
fn unpack_archive<'a, A: Read, C>( fn unpack_archive<'a, A: Read, C>(
archive: &mut Archive<A>, archive: &mut Archive<A>,
apparent_limit_size: u64, apparent_limit_size: u64,
@ -92,7 +98,7 @@ fn unpack_archive<'a, A: Read, C>(
mut entry_checker: C, mut entry_checker: C,
) -> Result<()> ) -> Result<()>
where where
C: FnMut(&[&str], tar::EntryType) -> Option<&'a Path>, C: FnMut(&[&str], tar::EntryType) -> UnpackPath<'a>,
{ {
let mut apparent_total_size: u64 = 0; let mut apparent_total_size: u64 = 0;
let mut actual_total_size: u64 = 0; let mut actual_total_size: u64 = 0;
@ -130,14 +136,17 @@ where
let parts: Vec<_> = parts.map(|p| p.unwrap()).collect(); let parts: Vec<_> = parts.map(|p| p.unwrap()).collect();
let unpack_dir = match entry_checker(parts.as_slice(), kind) { let unpack_dir = match entry_checker(parts.as_slice(), kind) {
None => { UnpackPath::Invalid => {
return Err(UnpackError::Archive(format!( return Err(UnpackError::Archive(format!(
"extra entry found: {:?} {:?}", "extra entry found: {:?} {:?}",
path_str, path_str,
entry.header().entry_type(), entry.header().entry_type(),
))); )));
} }
Some(unpack_dir) => unpack_dir, UnpackPath::Ignore => {
continue;
}
UnpackPath::Valid(unpack_dir) => unpack_dir,
}; };
apparent_total_size = checked_total_size_sum( apparent_total_size = checked_total_size_sum(
@ -193,13 +202,27 @@ where
/// Map from AppendVec file name to unpacked file system location /// Map from AppendVec file name to unpacked file system location
pub type UnpackedAppendVecMap = HashMap<String, PathBuf>; pub type UnpackedAppendVecMap = HashMap<String, PathBuf>;
// select/choose only 'index' out of each # of 'divisions' of total items.
pub struct ParallelSelector {
pub index: usize,
pub divisions: usize,
}
impl ParallelSelector {
pub fn select_index(&self, index: usize) -> bool {
index % self.divisions == self.index
}
}
pub fn unpack_snapshot<A: Read>( pub fn unpack_snapshot<A: Read>(
archive: &mut Archive<A>, archive: &mut Archive<A>,
ledger_dir: &Path, ledger_dir: &Path,
account_paths: &[PathBuf], account_paths: &[PathBuf],
parallel_selector: Option<ParallelSelector>,
) -> Result<UnpackedAppendVecMap> { ) -> Result<UnpackedAppendVecMap> {
assert!(!account_paths.is_empty()); assert!(!account_paths.is_empty());
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new(); let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
let mut i = 0;
unpack_archive( unpack_archive(
archive, archive,
@ -208,19 +231,31 @@ pub fn unpack_snapshot<A: Read>(
MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT, MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT,
|parts, kind| { |parts, kind| {
if is_valid_snapshot_archive_entry(parts, kind) { if is_valid_snapshot_archive_entry(parts, kind) {
i += 1;
match &parallel_selector {
Some(parallel_selector) => {
if !parallel_selector.select_index(i - 1) {
return UnpackPath::Ignore;
}
}
None => {}
};
if let ["accounts", file] = parts { if let ["accounts", file] = parts {
// Randomly distribute the accounts files about the available `account_paths`, // Randomly distribute the accounts files about the available `account_paths`,
let path_index = thread_rng().gen_range(0, account_paths.len()); let path_index = thread_rng().gen_range(0, account_paths.len());
account_paths.get(path_index).map(|path_buf| { match account_paths.get(path_index).map(|path_buf| {
unpacked_append_vec_map unpacked_append_vec_map
.insert(file.to_string(), path_buf.join("accounts").join(file)); .insert(file.to_string(), path_buf.join("accounts").join(file));
path_buf.as_path() path_buf.as_path()
}) }) {
Some(path) => UnpackPath::Valid(path),
None => UnpackPath::Invalid,
}
} else { } else {
Some(ledger_dir) UnpackPath::Valid(ledger_dir)
} }
} else { } else {
None UnpackPath::Invalid
} }
}, },
) )
@ -337,9 +372,9 @@ fn unpack_genesis<A: Read>(
MAX_GENESIS_ARCHIVE_UNPACKED_COUNT, MAX_GENESIS_ARCHIVE_UNPACKED_COUNT,
|p, k| { |p, k| {
if is_valid_genesis_archive_entry(p, k) { if is_valid_genesis_archive_entry(p, k) {
Some(unpack_dir) UnpackPath::Valid(unpack_dir)
} else { } else {
None UnpackPath::Invalid
} }
}, },
) )
@ -530,7 +565,7 @@ mod tests {
fn finalize_and_unpack_snapshot(archive: tar::Builder<Vec<u8>>) -> Result<()> { fn finalize_and_unpack_snapshot(archive: tar::Builder<Vec<u8>>) -> Result<()> {
with_finalize_and_unpack(archive, |a, b| { with_finalize_and_unpack(archive, |a, b| {
unpack_snapshot(a, b, &[PathBuf::new()]).map(|_| ()) unpack_snapshot(a, b, &[PathBuf::new()], None).map(|_| ())
}) })
} }

View File

@ -4,7 +4,7 @@ use {
accounts_index::AccountSecondaryIndexes, accounts_index::AccountSecondaryIndexes,
bank::{Bank, BankSlotDelta, Builtins}, bank::{Bank, BankSlotDelta, Builtins},
bank_forks::ArchiveFormat, bank_forks::ArchiveFormat,
hardened_unpack::{unpack_snapshot, UnpackError, UnpackedAppendVecMap}, hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap},
serde_snapshot::{ serde_snapshot::{
bank_from_stream, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages, bank_from_stream, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages,
}, },
@ -17,7 +17,7 @@ use {
bzip2::bufread::BzDecoder, bzip2::bufread::BzDecoder,
flate2::read::GzDecoder, flate2::read::GzDecoder,
log::*, log::*,
rayon::ThreadPool, rayon::{prelude::*, ThreadPool},
regex::Regex, regex::Regex,
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey}, solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey},
@ -600,7 +600,7 @@ pub struct BankFromArchiveTimings {
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn bank_from_archive<P: AsRef<Path>>( pub fn bank_from_archive<P: AsRef<Path> + std::marker::Sync>(
account_paths: &[PathBuf], account_paths: &[PathBuf],
frozen_account_pubkeys: &[Pubkey], frozen_account_pubkeys: &[Pubkey],
snapshot_path: &Path, snapshot_path: &Path,
@ -619,14 +619,29 @@ pub fn bank_from_archive<P: AsRef<Path>>(
.prefix(TMP_SNAPSHOT_PREFIX) .prefix(TMP_SNAPSHOT_PREFIX)
.tempdir_in(snapshot_path)?; .tempdir_in(snapshot_path)?;
let mut untar = Measure::start("untar"); let mut untar = Measure::start("snapshot untar");
let unpacked_append_vec_map = untar_snapshot_in( // From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
&snapshot_tar, let divisions = std::cmp::min(4, std::cmp::max(1, num_cpus::get() / 4));
unpack_dir.as_ref(), // create 'divisions' # of parallel workers, each responsible for 1/divisions of all the files to extract.
account_paths, let all_unpacked_append_vec_map = (0..divisions)
archive_format, .into_par_iter()
)?; .map(|index| {
untar_snapshot_in(
&snapshot_tar,
unpack_dir.as_ref(),
account_paths,
archive_format,
Some(ParallelSelector { index, divisions }),
)
})
.collect::<Vec<_>>();
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
for h in all_unpacked_append_vec_map {
unpacked_append_vec_map.extend(h?);
}
untar.stop(); untar.stop();
info!("{}", untar);
let mut measure = Measure::start("bank rebuild from snapshot"); let mut measure = Measure::start("bank rebuild from snapshot");
let unpacked_snapshots_dir = unpack_dir.as_ref().join("snapshots"); let unpacked_snapshots_dir = unpack_dir.as_ref().join("snapshots");
@ -773,33 +788,31 @@ fn untar_snapshot_in<P: AsRef<Path>>(
unpack_dir: &Path, unpack_dir: &Path,
account_paths: &[PathBuf], account_paths: &[PathBuf],
archive_format: ArchiveFormat, archive_format: ArchiveFormat,
parallel_selector: Option<ParallelSelector>,
) -> Result<UnpackedAppendVecMap> { ) -> Result<UnpackedAppendVecMap> {
let mut measure = Measure::start("snapshot untar");
let tar_name = File::open(&snapshot_tar)?; let tar_name = File::open(&snapshot_tar)?;
let account_paths_map = match archive_format { let account_paths_map = match archive_format {
ArchiveFormat::TarBzip2 => { ArchiveFormat::TarBzip2 => {
let tar = BzDecoder::new(BufReader::new(tar_name)); let tar = BzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar); let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths)? unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
} }
ArchiveFormat::TarGzip => { ArchiveFormat::TarGzip => {
let tar = GzDecoder::new(BufReader::new(tar_name)); let tar = GzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar); let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths)? unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
} }
ArchiveFormat::TarZstd => { ArchiveFormat::TarZstd => {
let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?; let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?;
let mut archive = Archive::new(tar); let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths)? unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
} }
ArchiveFormat::Tar => { ArchiveFormat::Tar => {
let tar = BufReader::new(tar_name); let tar = BufReader::new(tar_name);
let mut archive = Archive::new(tar); let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths)? unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
} }
}; };
measure.stop();
info!("{}", measure);
Ok(account_paths_map) Ok(account_paths_map)
} }
@ -916,6 +929,7 @@ pub fn verify_snapshot_archive<P, Q, R>(
unpack_dir, unpack_dir,
&[unpack_dir.to_path_buf()], &[unpack_dir.to_path_buf()],
archive_format, archive_format,
None,
) )
.unwrap(); .unwrap();