calculate_accounts_hash does not throttle threads on startup (#17338)

This commit is contained in:
Jeff Washington (jwash)
2021-05-20 10:25:54 -05:00
committed by GitHub
parent a9aa533684
commit 9b74988fc6

View File

@ -3973,69 +3973,72 @@ impl AccountsDb {
// We'll also accumulate the lamports within each chunk and fewer chunks results in less contention to accumulate the sum. // We'll also accumulate the lamports within each chunk and fewer chunks results in less contention to accumulate the sum.
let chunks = crate::accounts_hash::MERKLE_FANOUT.pow(4); let chunks = crate::accounts_hash::MERKLE_FANOUT.pow(4);
let total_lamports = Mutex::<u64>::new(0); let total_lamports = Mutex::<u64>::new(0);
let hashes: Vec<Vec<Hash>> = { let get_hashes = || {
self.thread_pool_clean.install(|| { keys.par_chunks(chunks)
keys.par_chunks(chunks) .map(|pubkeys| {
.map(|pubkeys| { let mut sum = 0u128;
let mut sum = 0u128; let result: Vec<Hash> = pubkeys
let result: Vec<Hash> = pubkeys .iter()
.iter() .filter_map(|pubkey| {
.filter_map(|pubkey| { if let AccountIndexGetResult::Found(lock, index) =
if let AccountIndexGetResult::Found(lock, index) = self.accounts_index.get(pubkey, Some(ancestors), Some(slot))
self.accounts_index.get(pubkey, Some(ancestors), Some(slot)) {
{ let (slot, account_info) = &lock.slot_list()[index];
let (slot, account_info) = &lock.slot_list()[index]; if account_info.lamports != 0 {
if account_info.lamports != 0 { // Because we're keeping the `lock' here, there is no need
// Because we're keeping the `lock' here, there is no need // to use retry_to_get_account_accessor()
// to use retry_to_get_account_accessor() // In other words, flusher/shrinker/cleaner is blocked to
// In other words, flusher/shrinker/cleaner is blocked to // cause any Accessor(None) situtation.
// cause any Accessor(None) situtation. // Anyway this race condition concern is currently a moot
// Anyway this race condition concern is currently a moot // point because calculate_accounts_hash() should not
// point because calculate_accounts_hash() should not // currently race with clean/shrink because the full hash
// currently race with clean/shrink because the full hash // is synchronous with clean/shrink in
// is synchronous with clean/shrink in // AccountsBackgroundService
// AccountsBackgroundService self.get_account_accessor(
self.get_account_accessor( *slot,
*slot, pubkey,
pubkey, account_info.store_id,
account_info.store_id, account_info.offset,
account_info.offset, )
) .get_loaded_account()
.get_loaded_account() .and_then(
.and_then( |loaded_account| {
|loaded_account| { let loaded_hash = loaded_account.loaded_hash();
let loaded_hash = loaded_account.loaded_hash(); let balance = account_info.lamports;
let balance = account_info.lamports; if check_hash {
if check_hash { let computed_hash =
let computed_hash = loaded_account.compute_hash(*slot, pubkey);
loaded_account.compute_hash(*slot, pubkey); if computed_hash != loaded_hash {
if computed_hash != loaded_hash { info!("hash mismatch found: computed: {}, loaded: {}, pubkey: {}", computed_hash, loaded_hash, pubkey);
info!("hash mismatch found: computed: {}, loaded: {}, pubkey: {}", computed_hash, loaded_hash, pubkey); mismatch_found
mismatch_found .fetch_add(1, Ordering::Relaxed);
.fetch_add(1, Ordering::Relaxed); return None;
return None;
}
} }
}
sum += balance as u128; sum += balance as u128;
Some(loaded_hash) Some(loaded_hash)
}, },
) )
} else {
None
}
} else { } else {
None None
} }
}) } else {
.collect(); None
let mut total = total_lamports.lock().unwrap(); }
*total = })
AccountsHash::checked_cast_for_capitalization(*total as u128 + sum); .collect();
result let mut total = total_lamports.lock().unwrap();
}) *total =
.collect() AccountsHash::checked_cast_for_capitalization(*total as u128 + sum);
}) result
}).collect()
};
let hashes: Vec<Vec<Hash>> = if check_hash {
get_hashes()
} else {
self.thread_pool_clean.install(get_hashes)
}; };
if mismatch_found.load(Ordering::Relaxed) > 0 { if mismatch_found.load(Ordering::Relaxed) > 0 {
warn!( warn!(