AcctIdx: disk generate index and filler accounts use more threads (#21566)

This commit is contained in:
Jeff Washington (jwash) 2021-12-09 11:54:14 -06:00 committed by GitHub
parent 8063273d09
commit a2df1eb502
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -6798,51 +6798,49 @@ impl AccountsDb {
.skip(pass * per_pass) .skip(pass * per_pass)
.take(per_pass) .take(per_pass)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
self.thread_pool.install(|| { roots_in_this_pass.into_par_iter().for_each(|slot| {
roots_in_this_pass.into_par_iter().for_each(|slot| { let storage_maps: Vec<Arc<AccountStorageEntry>> = self
let storage_maps: Vec<Arc<AccountStorageEntry>> = self .storage
.storage .get_slot_storage_entries(*slot)
.get_slot_storage_entries(*slot) .unwrap_or_default();
.unwrap_or_default(); if storage_maps.is_empty() {
if storage_maps.is_empty() { return;
return; }
}
let partition = crate::bank::Bank::variable_cycle_partition_from_previous_slot( let partition = crate::bank::Bank::variable_cycle_partition_from_previous_slot(
epoch_schedule, epoch_schedule,
*slot, *slot,
); );
let subrange = crate::bank::Bank::pubkey_range_from_partition(partition); let subrange = crate::bank::Bank::pubkey_range_from_partition(partition);
let idx = overall_index.fetch_add(1, Ordering::Relaxed); let idx = overall_index.fetch_add(1, Ordering::Relaxed);
let filler_entries = (idx + 1) * self.filler_account_count / root_count let filler_entries = (idx + 1) * self.filler_account_count / root_count
- idx * self.filler_account_count / root_count; - idx * self.filler_account_count / root_count;
let accounts = (0..filler_entries) let accounts = (0..filler_entries)
.map(|_| { .map(|_| {
let my_id = added.fetch_add(1, Ordering::Relaxed); let my_id = added.fetch_add(1, Ordering::Relaxed);
let my_id_bytes = u32::to_be_bytes(my_id as u32); let my_id_bytes = u32::to_be_bytes(my_id as u32);
// pubkey begins life as entire filler 'suffix' pubkey // pubkey begins life as entire filler 'suffix' pubkey
let mut key = self.filler_account_suffix.unwrap(); let mut key = self.filler_account_suffix.unwrap();
let rent_prefix_bytes = Self::filler_rent_partition_prefix_bytes(); let rent_prefix_bytes = Self::filler_rent_partition_prefix_bytes();
// first bytes are replaced with rent partition range: filler_rent_partition_prefix_bytes // first bytes are replaced with rent partition range: filler_rent_partition_prefix_bytes
key.as_mut()[0..rent_prefix_bytes] key.as_mut()[0..rent_prefix_bytes]
.copy_from_slice(&subrange.start().as_ref()[0..rent_prefix_bytes]); .copy_from_slice(&subrange.start().as_ref()[0..rent_prefix_bytes]);
// next bytes are replaced with my_id: filler_unique_id_bytes // next bytes are replaced with my_id: filler_unique_id_bytes
key.as_mut()[rent_prefix_bytes key.as_mut()[rent_prefix_bytes
..(rent_prefix_bytes + Self::filler_unique_id_bytes())] ..(rent_prefix_bytes + Self::filler_unique_id_bytes())]
.copy_from_slice(&my_id_bytes); .copy_from_slice(&my_id_bytes);
assert!(subrange.contains(&key)); assert!(subrange.contains(&key));
key key
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let add = accounts let add = accounts
.iter() .iter()
.map(|key| (key, &account)) .map(|key| (key, &account))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let hashes = (0..filler_entries).map(|_| hash).collect::<Vec<_>>(); let hashes = (0..filler_entries).map(|_| hash).collect::<Vec<_>>();
self.store_accounts_frozen(*slot, &add[..], Some(&hashes[..]), None, None); self.store_accounts_frozen(*slot, &add[..], Some(&hashes[..]), None, None);
})
}); });
self.accounts_index.set_startup(false); self.accounts_index.set_startup(false);
} }
@ -6882,7 +6880,14 @@ impl AccountsDb {
let storage_info = StorageSizeAndCountMap::default(); let storage_info = StorageSizeAndCountMap::default();
let total_processed_slots_across_all_threads = AtomicU64::new(0); let total_processed_slots_across_all_threads = AtomicU64::new(0);
let outer_slots_len = slots.len(); let outer_slots_len = slots.len();
let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot let threads = if self.accounts_index.is_disk_index_enabled() {
// these write directly to disk, so the more threads, the better
num_cpus::get()
} else {
// seems to be a good hueristic given varying # cpus for in-mem disk index
8
};
let chunk_size = (outer_slots_len / (std::cmp::max(1, threads.saturating_sub(1)))) + 1; // approximately 400k slots in a snapshot
let mut index_time = Measure::start("index"); let mut index_time = Measure::start("index");
let insertion_time_us = AtomicU64::new(0); let insertion_time_us = AtomicU64::new(0);
let rent_exempt = AtomicU64::new(0); let rent_exempt = AtomicU64::new(0);