parallelize update_index (#20601)
This commit is contained in:
committed by
GitHub
parent
0d934d311e
commit
6ec159a1ae
@ -1432,9 +1432,13 @@ impl ShrinkStats {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn quarter_thread_count() -> usize {
|
||||||
|
std::cmp::max(2, num_cpus::get() / 4)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn make_min_priority_thread_pool() -> ThreadPool {
|
pub fn make_min_priority_thread_pool() -> ThreadPool {
|
||||||
// Use lower thread count to reduce priority.
|
// Use lower thread count to reduce priority.
|
||||||
let num_threads = std::cmp::max(2, num_cpus::get() / 4);
|
let num_threads = quarter_thread_count();
|
||||||
rayon::ThreadPoolBuilder::new()
|
rayon::ThreadPoolBuilder::new()
|
||||||
.thread_name(|i| format!("solana-cleanup-accounts-{}", i))
|
.thread_name(|i| format!("solana-cleanup-accounts-{}", i))
|
||||||
.num_threads(num_threads)
|
.num_threads(num_threads)
|
||||||
@ -5799,15 +5803,22 @@ impl AccountsDb {
|
|||||||
|
|
||||||
// previous_slot_entry_was_cached = true means we just need to assert that after this update is complete
|
// previous_slot_entry_was_cached = true means we just need to assert that after this update is complete
|
||||||
// that there are no items we would have put in reclaims that are not cached
|
// that there are no items we would have put in reclaims that are not cached
|
||||||
fn update_index(
|
fn update_index<T: ReadableAccount + Sync>(
|
||||||
&self,
|
&self,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
infos: Vec<AccountInfo>,
|
infos: Vec<AccountInfo>,
|
||||||
accounts: &[(&Pubkey, &impl ReadableAccount)],
|
accounts: &[(&Pubkey, &T)],
|
||||||
previous_slot_entry_was_cached: bool,
|
previous_slot_entry_was_cached: bool,
|
||||||
) -> SlotList<AccountInfo> {
|
) -> SlotList<AccountInfo> {
|
||||||
let mut reclaims = SlotList::<AccountInfo>::with_capacity(infos.len() * 2);
|
// using a thread pool here results in deadlock panics from bank_hashes.write()
|
||||||
for (info, pubkey_account) in infos.into_iter().zip(accounts.iter()) {
|
// so, instead we limit how many threads will be created to the same size as the bg thread pool
|
||||||
|
let chunk_size = std::cmp::max(1, accounts.len() / quarter_thread_count()); // # pubkeys/thread
|
||||||
|
infos
|
||||||
|
.par_chunks(chunk_size)
|
||||||
|
.zip(accounts.par_chunks(chunk_size))
|
||||||
|
.map(|(infos_chunk, accounts_chunk)| {
|
||||||
|
let mut reclaims = Vec::with_capacity(infos_chunk.len() / 2);
|
||||||
|
for (info, pubkey_account) in infos_chunk.iter().zip(accounts_chunk.iter()) {
|
||||||
let pubkey = pubkey_account.0;
|
let pubkey = pubkey_account.0;
|
||||||
self.accounts_index.upsert(
|
self.accounts_index.upsert(
|
||||||
slot,
|
slot,
|
||||||
@ -5815,12 +5826,15 @@ impl AccountsDb {
|
|||||||
pubkey_account.1.owner(),
|
pubkey_account.1.owner(),
|
||||||
pubkey_account.1.data(),
|
pubkey_account.1.data(),
|
||||||
&self.account_indexes,
|
&self.account_indexes,
|
||||||
info,
|
*info,
|
||||||
&mut reclaims,
|
&mut reclaims,
|
||||||
previous_slot_entry_was_cached,
|
previous_slot_entry_was_cached,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
reclaims
|
reclaims
|
||||||
|
})
|
||||||
|
.flatten()
|
||||||
|
.collect::<Vec<_>>()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn should_not_shrink(aligned_bytes: u64, total_bytes: u64, num_stores: usize) -> bool {
|
fn should_not_shrink(aligned_bytes: u64, total_bytes: u64, num_stores: usize) -> bool {
|
||||||
@ -6167,6 +6181,7 @@ impl AccountsDb {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Store the account update.
|
/// Store the account update.
|
||||||
|
/// only called by tests
|
||||||
pub fn store_uncached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) {
|
pub fn store_uncached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) {
|
||||||
self.store(slot, accounts, false);
|
self.store(slot, accounts, false);
|
||||||
}
|
}
|
||||||
@ -6190,11 +6205,14 @@ impl AccountsDb {
|
|||||||
.store_total_data
|
.store_total_data
|
||||||
.fetch_add(total_data as u64, Ordering::Relaxed);
|
.fetch_add(total_data as u64, Ordering::Relaxed);
|
||||||
|
|
||||||
|
{
|
||||||
|
// we need to drop bank_hashes to prevent deadlocks
|
||||||
let mut bank_hashes = self.bank_hashes.write().unwrap();
|
let mut bank_hashes = self.bank_hashes.write().unwrap();
|
||||||
let slot_info = bank_hashes
|
let slot_info = bank_hashes
|
||||||
.entry(slot)
|
.entry(slot)
|
||||||
.or_insert_with(BankHashInfo::default);
|
.or_insert_with(BankHashInfo::default);
|
||||||
slot_info.stats.merge(&stats);
|
slot_info.stats.merge(&stats);
|
||||||
|
}
|
||||||
|
|
||||||
// we use default hashes for now since the same account may be stored to the cache multiple times
|
// we use default hashes for now since the same account may be stored to the cache multiple times
|
||||||
self.store_accounts_unfrozen(slot, accounts, None, is_cached_store);
|
self.store_accounts_unfrozen(slot, accounts, None, is_cached_store);
|
||||||
@ -6338,10 +6356,10 @@ impl AccountsDb {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn store_accounts_frozen<'a>(
|
fn store_accounts_frozen<'a, T: ReadableAccount + Sync>(
|
||||||
&'a self,
|
&'a self,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
accounts: &[(&Pubkey, &impl ReadableAccount)],
|
accounts: &[(&Pubkey, &T)],
|
||||||
hashes: Option<&[impl Borrow<Hash>]>,
|
hashes: Option<&[impl Borrow<Hash>]>,
|
||||||
storage_finder: Option<StorageFinder<'a>>,
|
storage_finder: Option<StorageFinder<'a>>,
|
||||||
write_version_producer: Option<Box<dyn Iterator<Item = StoredMetaWriteVersion>>>,
|
write_version_producer: Option<Box<dyn Iterator<Item = StoredMetaWriteVersion>>>,
|
||||||
@ -6362,10 +6380,10 @@ impl AccountsDb {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn store_accounts_custom<'a>(
|
fn store_accounts_custom<'a, T: ReadableAccount + Sync>(
|
||||||
&'a self,
|
&'a self,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
accounts: &[(&Pubkey, &impl ReadableAccount)],
|
accounts: &[(&Pubkey, &T)],
|
||||||
hashes: Option<&[impl Borrow<Hash>]>,
|
hashes: Option<&[impl Borrow<Hash>]>,
|
||||||
storage_finder: Option<StorageFinder<'a>>,
|
storage_finder: Option<StorageFinder<'a>>,
|
||||||
write_version_producer: Option<Box<dyn Iterator<Item = u64>>>,
|
write_version_producer: Option<Box<dyn Iterator<Item = u64>>>,
|
||||||
|
Reference in New Issue
Block a user