diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 63d2631214..9e9f160329 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -2097,7 +2097,7 @@ impl AccountsDb { ); } - fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I, is_startup: bool) -> usize + fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I, _is_startup: bool) -> usize where I: Iterator>, { @@ -2139,23 +2139,10 @@ impl AccountsDb { let mut index_read_elapsed = Measure::start("index_read_elapsed"); let mut alive_total = 0; - let accounts_index_map_lock = if is_startup { - // at startup, there is nobody else to contend with the accounts_index read lock, so it is more efficient for us to keep it held - Some(self.accounts_index.get_account_maps_read_lock()) - } else { - None - }; - let accounts_index_map_lock_ref = accounts_index_map_lock.as_ref(); - let mut alive_accounts: Vec<_> = Vec::with_capacity(stored_accounts.len()); let mut unrefed_pubkeys = vec![]; for (pubkey, stored_account) in &stored_accounts { - let lookup = if is_startup { - self.accounts_index - .get_account_read_entry_with_lock(pubkey, accounts_index_map_lock_ref.unwrap()) - } else { - self.accounts_index.get_account_read_entry(pubkey) - }; + let lookup = self.accounts_index.get_account_read_entry(pubkey); if let Some(locked_entry) = lookup { let is_alive = locked_entry.slot_list().iter().any(|(_slot, i)| { i.store_id == stored_account.store_id @@ -2175,7 +2162,6 @@ impl AccountsDb { } } - drop(accounts_index_map_lock); index_read_elapsed.stop(); let aligned_total: u64 = Self::page_align(alive_total as u64); @@ -4458,10 +4444,9 @@ impl AccountsDb { let keys: Vec<_> = self .accounts_index .account_maps - .read() - .unwrap() - .keys() - .cloned() + .iter() + .map(|btree| btree.read().unwrap().keys().cloned().collect::>()) + .flatten() .collect(); collect.stop(); @@ -5973,7 +5958,13 @@ impl AccountsDb { } let mut stored_sizes_and_counts = HashMap::new(); - for account_entry in self.accounts_index.account_maps.read().unwrap().values() { + for account_entry in self + .accounts_index + .account_maps + .iter() + .map(|i| i.read().unwrap().values().cloned().collect::>()) + .flatten() + { for (_slot, account_entry) in account_entry.slot_list.read().unwrap().iter() { let storage_entry_meta = stored_sizes_and_counts .entry(account_entry.store_id) @@ -6022,13 +6013,15 @@ impl AccountsDb { #[allow(clippy::stable_sort_primitive)] roots.sort(); info!("{}: accounts_index roots: {:?}", label, roots,); - for (pubkey, account_entry) in self.accounts_index.account_maps.read().unwrap().iter() { - info!(" key: {} ref_count: {}", pubkey, account_entry.ref_count(),); - info!( - " slots: {:?}", - *account_entry.slot_list.read().unwrap() - ); - } + self.accounts_index.account_maps.iter().for_each(|i| { + for (pubkey, account_entry) in i.read().unwrap().iter() { + info!(" key: {} ref_count: {}", pubkey, account_entry.ref_count(),); + info!( + " slots: {:?}", + *account_entry.slot_list.read().unwrap() + ); + } + }); } fn print_count_and_status(&self, label: &str) { diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 5c44594252..ee33c58c8f 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -7,6 +7,7 @@ use crate::{ use bv::BitVec; use log::*; use ouroboros::self_referencing; +use rayon::prelude::*; use solana_measure::measure::Measure; use solana_sdk::{ clock::{BankId, Slot}, @@ -30,7 +31,7 @@ use std::{ use thiserror::Error; pub const ITER_BATCH_SIZE: usize = 1000; - +const BINS: usize = 16; pub type ScanResult = Result; pub type SlotList = Vec<(Slot, T)>; pub type SlotSlice<'s, T> = &'s [(Slot, T)]; @@ -525,7 +526,7 @@ pub struct AccountsIndexRootsStats { } pub struct AccountsIndexIterator<'a, T> { - account_maps: &'a LockMapType, + account_maps: &'a LockMapTypeSlice, start_bound: Bound, end_bound: Bound, is_finished: bool, @@ -540,7 +541,7 @@ impl<'a, T> AccountsIndexIterator<'a, T> { } } - pub fn new(account_maps: &'a LockMapType, range: Option) -> Self + pub fn new(account_maps: &'a LockMapTypeSlice, range: Option) -> Self where R: RangeBounds, { @@ -568,10 +569,15 @@ impl<'a, T: 'static + Clone> Iterator for AccountsIndexIterator<'a, T> { let chunk: Vec<(Pubkey, AccountMapEntry)> = self .account_maps - .read() - .unwrap() - .range((self.start_bound, self.end_bound)) - .map(|(pubkey, account_map_entry)| (*pubkey, account_map_entry.clone())) + .iter() + .map(|i| { + i.read() + .unwrap() + .range((self.start_bound, self.end_bound)) + .map(|(pubkey, account_map_entry)| (*pubkey, account_map_entry.clone())) + .collect::>() + }) + .flatten() .take(ITER_BATCH_SIZE) .collect(); @@ -589,8 +595,14 @@ pub trait ZeroLamport { fn is_zero_lamport(&self) -> bool; } +fn get_bin_pubkey(pubkey: &Pubkey) -> usize { + let byte_of_pubkey_to_bin = 0; // TODO: this should not be 0. For now it needs to be due to requests for in-order pubkeys + (pubkey.as_ref()[byte_of_pubkey_to_bin] as usize) * BINS / ((u8::MAX as usize) + 1) +} + type MapType = AccountMap>; -type LockMapType = RwLock>; +type LockMapType = Vec>>; +type LockMapTypeSlice = [RwLock>]; type AccountMapsWriteLock<'a, T> = RwLockWriteGuard<'a, MapType>; type AccountMapsReadLock<'a, T> = RwLockReadGuard<'a, MapType>; @@ -634,7 +646,10 @@ pub struct AccountsIndex { impl Default for AccountsIndex { fn default() -> Self { Self { - account_maps: LockMapType::::default(), + account_maps: (0..BINS) + .into_iter() + .map(|_| RwLock::new(AccountMap::>::default())) + .collect::>(), program_id_index: SecondaryIndex::::new( "program_id_index_stats", ), @@ -651,7 +666,9 @@ impl Default for AccountsIndex { } } -impl AccountsIndex { +impl + AccountsIndex +{ fn iter(&self, range: Option) -> AccountsIndexIterator where R: RangeBounds, @@ -976,7 +993,7 @@ impl AccountsIndex { } pub fn get_account_read_entry(&self, pubkey: &Pubkey) -> Option> { - let lock = self.get_account_maps_read_lock(); + let lock = self.get_account_maps_read_lock(pubkey); self.get_account_read_entry_with_lock(pubkey, &lock) } @@ -991,7 +1008,7 @@ impl AccountsIndex { } fn get_account_write_entry(&self, pubkey: &Pubkey) -> Option> { - self.account_maps + self.account_maps[get_bin_pubkey(pubkey)] .read() .unwrap() .get(pubkey) @@ -1012,7 +1029,7 @@ impl AccountsIndex { self.insert_new_entry_if_missing_with_lock(*pubkey, w_account_maps, new_entry) } None => { - let mut w_account_maps = self.get_account_maps_write_lock(); + let mut w_account_maps = self.get_account_maps_write_lock(pubkey); self.insert_new_entry_if_missing_with_lock(*pubkey, &mut w_account_maps, new_entry) } } @@ -1061,7 +1078,7 @@ impl AccountsIndex { ) { if !dead_keys.is_empty() { for key in dead_keys.iter() { - let mut w_index = self.get_account_maps_write_lock(); + let mut w_index = self.get_account_maps_write_lock(key); if let btree_map::Entry::Occupied(index_entry) = w_index.entry(**key) { if index_entry.get().slot_list.read().unwrap().is_empty() { index_entry.remove(); @@ -1250,7 +1267,7 @@ impl AccountsIndex { ancestors: Option<&Ancestors>, max_root: Option, ) -> AccountIndexGetResult<'_, T> { - let read_lock = self.account_maps.read().unwrap(); + let read_lock = self.account_maps[get_bin_pubkey(pubkey)].read().unwrap(); let account = read_lock .get(pubkey) .cloned() @@ -1344,12 +1361,12 @@ impl AccountsIndex { } } - fn get_account_maps_write_lock(&self) -> AccountMapsWriteLock { - self.account_maps.write().unwrap() + fn get_account_maps_write_lock(&self, pubkey: &Pubkey) -> AccountMapsWriteLock { + self.account_maps[get_bin_pubkey(pubkey)].write().unwrap() } - pub(crate) fn get_account_maps_read_lock(&self) -> AccountMapsReadLock { - self.account_maps.read().unwrap() + pub(crate) fn get_account_maps_read_lock(&self, pubkey: &Pubkey) -> AccountMapsReadLock { + self.account_maps[get_bin_pubkey(pubkey)].read().unwrap() } // Same functionally to upsert, but: @@ -1365,37 +1382,47 @@ impl AccountsIndex { item_len: usize, items: impl Iterator, ) -> (Vec, u64) { - // returns (duplicate pubkey mask, insertion time us) - let potentially_new_items = items - .map(|(pubkey, account_info)| { - // this value is equivalent to what update() below would have created if we inserted a new item - ( - *pubkey, - WriteAccountMapEntry::new_entry_after_update(slot, account_info), - ) - }) - .collect::>(); // collect here so we have created all data prior to obtaining lock - - let mut _reclaims = SlotList::new(); - let mut duplicate_keys = Vec::with_capacity(item_len / 100); // just an estimate - let mut w_account_maps = self.get_account_maps_write_lock(); - let mut insert_time = Measure::start("insert_into_primary_index"); - potentially_new_items + let expected_items_per_bin = item_len * 2 / BINS; // big enough so not likely to re-allocate, small enough to not over-allocate + let mut binned = (0..BINS) .into_iter() - .for_each(|(pubkey, new_item)| { - let already_exists = self.insert_new_entry_if_missing_with_lock( - pubkey, - &mut w_account_maps, - new_item, - ); - if let Some((mut w_account_entry, account_info, pubkey)) = already_exists { - w_account_entry.update(slot, account_info, &mut _reclaims); - duplicate_keys.push(pubkey); - } - }); + .map(|pubkey_bin| (pubkey_bin, Vec::with_capacity(expected_items_per_bin))) + .collect::>(); + items.for_each(|(pubkey, account_info)| { + let bin = get_bin_pubkey(pubkey); + // this value is equivalent to what update() below would have created if we inserted a new item + let info = WriteAccountMapEntry::new_entry_after_update(slot, account_info); + binned[bin].1.push((*pubkey, info)); + }); + binned.retain(|x| !x.1.is_empty()); - insert_time.stop(); - (duplicate_keys, insert_time.as_us()) + let insertion_time = AtomicU64::new(0); + + let duplicate_keys = binned + .into_par_iter() + .map(|(pubkey_bin, items)| { + let mut _reclaims = SlotList::new(); + let mut w_account_maps = self.account_maps[pubkey_bin].write().unwrap(); + let mut insert_time = Measure::start("insert_into_primary_index"); // really should be in each loop + let mut duplicate_keys = Vec::with_capacity(items.len()); + items.into_iter().for_each(|(pubkey, new_item)| { + let already_exists = self.insert_new_entry_if_missing_with_lock( + pubkey, + &mut w_account_maps, + new_item, + ); + if let Some((mut w_account_entry, account_info, pubkey)) = already_exists { + w_account_entry.update(slot, account_info, &mut _reclaims); + duplicate_keys.push(pubkey); + } + }); + insert_time.stop(); + insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed); + duplicate_keys + }) + .flatten() + .collect::>(); + + (duplicate_keys, insertion_time.load(Ordering::Relaxed)) } // Updates the given pubkey at the given slot with the new account information. @@ -1509,7 +1536,7 @@ impl AccountsIndex { // locked and inserted the pubkey inbetween when `is_slot_list_empty=true` and the call to // remove() below. if is_slot_list_empty { - let mut w_maps = self.get_account_maps_write_lock(); + let mut w_maps = self.get_account_maps_write_lock(pubkey); if let Some(x) = w_maps.get(pubkey) { if x.slot_list.read().unwrap().is_empty() { w_maps.remove(pubkey); @@ -2605,7 +2632,14 @@ pub mod tests { } fn test_new_entry_code_paths_helper< - T: 'static + Clone + IsCached + ZeroLamport + std::cmp::PartialEq + std::fmt::Debug, + T: 'static + + Sync + + Send + + Clone + + IsCached + + ZeroLamport + + std::cmp::PartialEq + + std::fmt::Debug, >( account_infos: [T; 2], is_cached: bool, @@ -2671,7 +2705,7 @@ pub mod tests { for lock in &[false, true] { let read_lock = if *lock { - Some(index.get_account_maps_read_lock()) + Some(index.get_account_maps_read_lock(&key)) } else { None }; @@ -2721,7 +2755,7 @@ pub mod tests { let account_info = true; let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, account_info); - let mut w_account_maps = index.get_account_maps_write_lock(); + let mut w_account_maps = index.get_account_maps_write_lock(&key.pubkey()); let write = index.insert_new_entry_if_missing_with_lock( key.pubkey(), &mut w_account_maps,