From ab7c96aa8179b9b0bd344634e9d43fe51ab7f830 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Wed, 5 May 2021 12:08:45 -0500 Subject: [PATCH] insert accounts in parallel when building initial index (#17040) * insert accounts in parallel when building initial index * rename nits from pr review * rename nits from pr review * rename nits from pr review * rename nits from pr review --- runtime/src/accounts_db.rs | 142 +++++++++++++++++++++---------------- 1 file changed, 82 insertions(+), 60 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 228f03d8c5..407ac792df 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -4908,70 +4908,92 @@ impl AccountsDb { let mut slots = self.storage.all_slots(); #[allow(clippy::stable_sort_primitive)] slots.sort(); - - let mut last_log_update = Instant::now(); - for (index, slot) in slots.iter().enumerate() { - let now = Instant::now(); - if now.duration_since(last_log_update).as_secs() >= 2 { - info!("generating index: {}/{} slots...", index, slots.len()); - last_log_update = now; - } - let storage_maps: Vec> = self - .storage - .get_slot_storage_entries(*slot) - .unwrap_or_default(); - let num_accounts = storage_maps - .iter() - .map(|storage| storage.approx_stored_count()) - .sum(); - let mut accounts_map: AccountsMap = AccountsMap::with_capacity(num_accounts); - storage_maps.iter().for_each(|storage| { - let accounts = storage.all_accounts(); - accounts.into_iter().for_each(|stored_account| { - let entry = accounts_map - .entry(stored_account.meta.pubkey) - .or_insert_with(BTreeMap::new); - assert!( - // There should only be one update per write version for a specific slot - // and account - entry - .insert( - stored_account.meta.write_version, - (storage.append_vec_id(), stored_account) - ) - .is_none() - ); - }) - }); - // Need to restore indexes even with older write versions which may - // be shielding other accounts. When they are then purged, the - // original non-shielded account value will be visible when the account - // is restored from the append-vec - if !accounts_map.is_empty() { - let mut _reclaims: Vec<(u64, AccountInfo)> = vec![]; - let dirty_keys = accounts_map.iter().map(|(pubkey, _info)| *pubkey).collect(); - self.uncleaned_pubkeys.insert(*slot, dirty_keys); - for (pubkey, account_infos) in accounts_map.into_iter() { - for (_, (store_id, stored_account)) in account_infos.into_iter() { - let account_info = AccountInfo { - store_id, - offset: stored_account.offset, - stored_size: stored_account.stored_size, - lamports: stored_account.account_meta.lamports, - }; - self.accounts_index.insert_new_if_missing( - *slot, - &pubkey, - &stored_account.account_meta.owner, - &stored_account.data, - &self.account_indexes, - account_info, - &mut _reclaims, + let total_processed_slots_across_all_threads = AtomicU64::new(0); + let outer_slots_len = slots.len(); + let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot + slots.par_chunks(chunk_size).for_each(|slots| { + let mut last_log_update = Instant::now(); + let mut my_last_reported_number_of_processed_slots = 0; + let mut was_first = false; + for (index, slot) in slots.iter().enumerate() { + let now = Instant::now(); + if now.duration_since(last_log_update).as_secs() >= 2 { + let my_total_newly_processed_slots_since_last_report = + (index as u64) - my_last_reported_number_of_processed_slots; + my_last_reported_number_of_processed_slots = index as u64; + let previous_total_processed_slots_across_all_threads = + total_processed_slots_across_all_threads.fetch_add( + my_total_newly_processed_slots_since_last_report, + Ordering::Relaxed, ); + was_first = was_first || 0 == previous_total_processed_slots_across_all_threads; + if was_first { + info!( + "generating index: {}/{} slots...", + previous_total_processed_slots_across_all_threads + + my_total_newly_processed_slots_since_last_report, + outer_slots_len + ); + } + last_log_update = now; + } + let storage_maps: Vec> = self + .storage + .get_slot_storage_entries(*slot) + .unwrap_or_default(); + let num_accounts = storage_maps + .iter() + .map(|storage| storage.approx_stored_count()) + .sum(); + let mut accounts_map: AccountsMap = AccountsMap::with_capacity(num_accounts); + storage_maps.iter().for_each(|storage| { + let accounts = storage.all_accounts(); + accounts.into_iter().for_each(|stored_account| { + let entry = accounts_map + .entry(stored_account.meta.pubkey) + .or_insert_with(BTreeMap::new); + assert!( + // There should only be one update per write version for a specific slot + // and account + entry + .insert( + stored_account.meta.write_version, + (storage.append_vec_id(), stored_account) + ) + .is_none() + ); + }) + }); + // Need to restore indexes even with older write versions which may + // be shielding other accounts. When they are then purged, the + // original non-shielded account value will be visible when the account + // is restored from the append-vec + if !accounts_map.is_empty() { + let mut _reclaims: Vec<(u64, AccountInfo)> = vec![]; + let dirty_keys = accounts_map.iter().map(|(pubkey, _info)| *pubkey).collect(); + self.uncleaned_pubkeys.insert(*slot, dirty_keys); + for (pubkey, account_infos) in accounts_map.into_iter() { + for (_, (store_id, stored_account)) in account_infos.into_iter() { + let account_info = AccountInfo { + store_id, + offset: stored_account.offset, + stored_size: stored_account.stored_size, + lamports: stored_account.account_meta.lamports, + }; + self.accounts_index.insert_new_if_missing( + *slot, + &pubkey, + &stored_account.account_meta.owner, + &stored_account.data, + &self.account_indexes, + account_info, + &mut _reclaims, + ); + } } } } - } + }); // Need to add these last, otherwise older updates will be cleaned for slot in slots {