use thread pool for non-index hash calculations (#15149)

This commit is contained in:
Jeff Washington (jwash)
2021-02-05 13:48:55 -06:00
committed by GitHub
parent 6fd5ec0e4c
commit fabecdc86c
7 changed files with 54 additions and 11 deletions

View File

@ -8,8 +8,10 @@ use crate::{
cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}, cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES},
snapshot_packager_service::PendingSnapshotPackage, snapshot_packager_service::PendingSnapshotPackage,
}; };
use solana_runtime::snapshot_package::{ use rayon::ThreadPool;
AccountsPackage, AccountsPackagePre, AccountsPackageReceiver, use solana_runtime::{
accounts_db,
snapshot_package::{AccountsPackage, AccountsPackagePre, AccountsPackageReceiver},
}; };
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -44,6 +46,7 @@ impl AccountsHashVerifier {
.name("solana-accounts-hash".to_string()) .name("solana-accounts-hash".to_string())
.spawn(move || { .spawn(move || {
let mut hashes = vec![]; let mut hashes = vec![];
let mut thread_pool_storage = None;
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
@ -51,6 +54,13 @@ impl AccountsHashVerifier {
match accounts_package_receiver.recv_timeout(Duration::from_secs(1)) { match accounts_package_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(accounts_package) => { Ok(accounts_package) => {
if accounts_package.hash_for_testing.is_some()
&& thread_pool_storage.is_none()
{
thread_pool_storage =
Some(accounts_db::make_min_priority_thread_pool());
}
Self::process_accounts_package_pre( Self::process_accounts_package_pre(
accounts_package, accounts_package,
&cluster_info, &cluster_info,
@ -61,6 +71,7 @@ impl AccountsHashVerifier {
&exit, &exit,
fault_injection_rate_slots, fault_injection_rate_slots,
snapshot_interval_slots, snapshot_interval_slots,
thread_pool_storage.as_ref(),
); );
} }
Err(RecvTimeoutError::Disconnected) => break, Err(RecvTimeoutError::Disconnected) => break,
@ -74,6 +85,7 @@ impl AccountsHashVerifier {
} }
} }
#[allow(clippy::too_many_arguments)]
fn process_accounts_package_pre( fn process_accounts_package_pre(
accounts_package: AccountsPackagePre, accounts_package: AccountsPackagePre,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
@ -84,9 +96,12 @@ impl AccountsHashVerifier {
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
fault_injection_rate_slots: u64, fault_injection_rate_slots: u64,
snapshot_interval_slots: u64, snapshot_interval_slots: u64,
thread_pool: Option<&ThreadPool>,
) { ) {
let accounts_package = let accounts_package = solana_runtime::snapshot_utils::process_accounts_package_pre(
solana_runtime::snapshot_utils::process_accounts_package_pre(accounts_package); accounts_package,
thread_pool,
);
Self::process_accounts_package( Self::process_accounts_package(
accounts_package, accounts_package,
cluster_info, cluster_info,

View File

@ -1050,6 +1050,7 @@ fn new_banks_from_ledger(
None, None,
&snapshot_config.snapshot_package_output_path, &snapshot_config.snapshot_package_output_path,
snapshot_config.archive_format, snapshot_config.archive_format,
&bank_forks.root_bank().get_thread_pool(),
) )
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
error!("Unable to create snapshot: {}", err); error!("Unable to create snapshot: {}", err);

View File

@ -45,6 +45,7 @@ mod tests {
}; };
use solana_runtime::{ use solana_runtime::{
accounts_background_service::{ABSRequestSender, SnapshotRequestHandler}, accounts_background_service::{ABSRequestSender, SnapshotRequestHandler},
accounts_db,
bank::{Bank, BankSlotDelta}, bank::{Bank, BankSlotDelta},
bank_forks::{ArchiveFormat, BankForks, SnapshotConfig}, bank_forks::{ArchiveFormat, BankForks, SnapshotConfig},
genesis_utils::{create_genesis_config, GenesisConfigInfo}, genesis_utils::{create_genesis_config, GenesisConfigInfo},
@ -241,7 +242,10 @@ mod tests {
None, None,
) )
.unwrap(); .unwrap();
let snapshot_package = snapshot_utils::process_accounts_package_pre(snapshot_package); let snapshot_package = snapshot_utils::process_accounts_package_pre(
snapshot_package,
Some(&last_bank.get_thread_pool()),
);
snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap(); snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap();
// Restore bank from snapshot // Restore bank from snapshot
@ -419,6 +423,8 @@ mod tests {
&cluster_info, &cluster_info,
); );
let thread_pool = accounts_db::make_min_priority_thread_pool();
let _package_receiver = std::thread::Builder::new() let _package_receiver = std::thread::Builder::new()
.name("package-receiver".to_string()) .name("package-receiver".to_string())
.spawn(move || { .spawn(move || {
@ -431,6 +437,7 @@ mod tests {
let snapshot_package = let snapshot_package =
solana_runtime::snapshot_utils::process_accounts_package_pre( solana_runtime::snapshot_utils::process_accounts_package_pre(
snapshot_package, snapshot_package,
Some(&thread_pool),
); );
*pending_snapshot_package.lock().unwrap() = Some(snapshot_package); *pending_snapshot_package.lock().unwrap() = Some(snapshot_package);
} }

View File

@ -1933,6 +1933,7 @@ fn main() {
Some(snapshot_version), Some(snapshot_version),
output_directory, output_directory,
ArchiveFormat::TarZstd, ArchiveFormat::TarZstd,
&bank.get_thread_pool(),
) )
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
eprintln!("Unable to create snapshot: {}", err); eprintln!("Unable to create snapshot: {}", err);

View File

@ -983,7 +983,7 @@ impl ShrinkStats {
} }
} }
fn make_min_priority_thread_pool() -> ThreadPool { pub fn make_min_priority_thread_pool() -> ThreadPool {
// Use lower thread count to reduce priority. // Use lower thread count to reduce priority.
let num_threads = std::cmp::max(2, num_cpus::get() / 4); let num_threads = std::cmp::max(2, num_cpus::get() / 4);
rayon::ThreadPoolBuilder::new() rayon::ThreadPoolBuilder::new()
@ -3748,6 +3748,7 @@ impl AccountsDB {
Self::calculate_accounts_hash_without_index( Self::calculate_accounts_hash_without_index(
&combined_maps, &combined_maps,
simple_capitalization_enabled, simple_capitalization_enabled,
&self.thread_pool_clean,
) )
} else { } else {
self.calculate_accounts_hash(slot, ancestors, false, simple_capitalization_enabled) self.calculate_accounts_hash(slot, ancestors, false, simple_capitalization_enabled)
@ -3851,10 +3852,13 @@ impl AccountsDB {
pub fn calculate_accounts_hash_without_index( pub fn calculate_accounts_hash_without_index(
storages: &[SnapshotStorage], storages: &[SnapshotStorage],
simple_capitalization_enabled: bool, simple_capitalization_enabled: bool,
thread_pool: &ThreadPool,
) -> (Hash, u64) { ) -> (Hash, u64) {
thread_pool.install(|| {
let result = Self::scan_snapshot_stores(storages, simple_capitalization_enabled); let result = Self::scan_snapshot_stores(storages, simple_capitalization_enabled);
Self::rest_of_hash_calculation(result) Self::rest_of_hash_calculation(result)
})
} }
pub fn verify_bank_hash_and_lamports( pub fn verify_bank_hash_and_lamports(
@ -5180,7 +5184,11 @@ pub mod tests {
solana_logger::setup(); solana_logger::setup();
let (storages, _size, _slot_expected) = sample_storage(); let (storages, _size, _slot_expected) = sample_storage();
let result = AccountsDB::calculate_accounts_hash_without_index(&storages, true); let result = AccountsDB::calculate_accounts_hash_without_index(
&storages,
true,
&make_min_priority_thread_pool(),
);
let expected_hash = Hash::from_str("GKot5hBsd81kMupNCXHaqbhv3huEbxAFMLnpcX2hniwn").unwrap(); let expected_hash = Hash::from_str("GKot5hBsd81kMupNCXHaqbhv3huEbxAFMLnpcX2hniwn").unwrap();
assert_eq!(result, (expected_hash, 0)); assert_eq!(result, (expected_hash, 0));
} }

View File

@ -27,6 +27,7 @@ use crate::{
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use itertools::Itertools; use itertools::Itertools;
use log::*; use log::*;
use rayon::ThreadPool;
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_info}; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_info};
use solana_sdk::{ use solana_sdk::{
@ -4307,6 +4308,10 @@ impl Bank {
self.rc.accounts.accounts_db.get_accounts_hash(self.slot) self.rc.accounts.accounts_db.get_accounts_hash(self.slot)
} }
pub fn get_thread_pool(&self) -> &ThreadPool {
&self.rc.accounts.accounts_db.thread_pool_clean
}
pub fn update_accounts_hash_with_index_option( pub fn update_accounts_hash_with_index_option(
&self, &self,
do_not_use_index: bool, do_not_use_index: bool,

View File

@ -15,6 +15,7 @@ use bincode::{config::Options, serialize_into};
use bzip2::bufread::BzDecoder; use bzip2::bufread::BzDecoder;
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
use log::*; use log::*;
use rayon::ThreadPool;
use regex::Regex; use regex::Regex;
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey}; use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey};
@ -926,6 +927,7 @@ pub fn bank_to_snapshot_archive<P: AsRef<Path>, Q: AsRef<Path>>(
snapshot_version: Option<SnapshotVersion>, snapshot_version: Option<SnapshotVersion>,
snapshot_package_output_path: Q, snapshot_package_output_path: Q,
archive_format: ArchiveFormat, archive_format: ArchiveFormat,
thread_pool: &ThreadPool,
) -> Result<PathBuf> { ) -> Result<PathBuf> {
let snapshot_version = snapshot_version.unwrap_or_default(); let snapshot_version = snapshot_version.unwrap_or_default();
@ -952,13 +954,16 @@ pub fn bank_to_snapshot_archive<P: AsRef<Path>, Q: AsRef<Path>>(
None, None,
)?; )?;
let package = process_accounts_package_pre(package); let package = process_accounts_package_pre(package, Some(&thread_pool));
archive_snapshot_package(&package)?; archive_snapshot_package(&package)?;
Ok(package.tar_output_file) Ok(package.tar_output_file)
} }
pub fn process_accounts_package_pre(accounts_package: AccountsPackagePre) -> AccountsPackage { pub fn process_accounts_package_pre(
accounts_package: AccountsPackagePre,
thread_pool: Option<&ThreadPool>,
) -> AccountsPackage {
let mut time = Measure::start("hash"); let mut time = Measure::start("hash");
let hash = accounts_package.hash; // temporarily remaining here let hash = accounts_package.hash; // temporarily remaining here
@ -966,6 +971,7 @@ pub fn process_accounts_package_pre(accounts_package: AccountsPackagePre) -> Acc
let (hash, lamports) = AccountsDB::calculate_accounts_hash_without_index( let (hash, lamports) = AccountsDB::calculate_accounts_hash_without_index(
&accounts_package.storages, &accounts_package.storages,
accounts_package.simple_capitalization_testing, accounts_package.simple_capitalization_testing,
&thread_pool.unwrap(),
); );
assert_eq!(accounts_package.expected_capitalization, lamports); assert_eq!(accounts_package.expected_capitalization, lamports);