Speed up generate_index (#14792) (#14807)

(cherry picked from commit 424bb797a6)

Co-authored-by: sakridge <sakridge@gmail.com>
This commit is contained in:
mergify[bot]
2021-01-23 18:50:04 +00:00
committed by GitHub
parent fef4100f5f
commit 40f32fd37d
3 changed files with 81 additions and 56 deletions

View File

@ -505,7 +505,10 @@ impl AccountStorageEntry {
} }
pub fn set_file<P: AsRef<Path>>(&mut self, path: P) -> IOResult<()> { pub fn set_file<P: AsRef<Path>>(&mut self, path: P) -> IOResult<()> {
self.accounts.set_file(path) let num_accounts = self.accounts.set_file(path)?;
self.approx_store_count
.store(num_accounts, Ordering::Relaxed);
Ok(())
} }
pub fn get_relative_path(&self) -> Option<PathBuf> { pub fn get_relative_path(&self) -> Option<PathBuf> {
@ -4060,8 +4063,7 @@ impl AccountsDB {
} }
pub fn generate_index(&self) { pub fn generate_index(&self) {
type AccountsMap<'a> = type AccountsMap<'a> = HashMap<Pubkey, BTreeMap<u64, (AppendVecId, StoredAccountMeta<'a>)>>;
DashMap<Pubkey, Mutex<BTreeMap<u64, (AppendVecId, StoredAccountMeta<'a>)>>>;
let mut slots = self.storage.all_slots(); let mut slots = self.storage.all_slots();
#[allow(clippy::stable_sort_primitive)] #[allow(clippy::stable_sort_primitive)]
slots.sort(); slots.sort();
@ -4069,42 +4071,37 @@ impl AccountsDB {
let mut last_log_update = Instant::now(); let mut last_log_update = Instant::now();
for (index, slot) in slots.iter().enumerate() { for (index, slot) in slots.iter().enumerate() {
let now = Instant::now(); let now = Instant::now();
if now.duration_since(last_log_update).as_secs() >= 10 { if now.duration_since(last_log_update).as_secs() >= 2 {
info!("generating index: {}/{} slots...", index, slots.len()); info!("generating index: {}/{} slots...", index, slots.len());
last_log_update = now; last_log_update = now;
} }
let accounts_map: AccountsMap = AccountsMap::new();
let storage_maps: Vec<Arc<AccountStorageEntry>> = self let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage .storage
.get_slot_stores(*slot) .get_slot_stores(*slot)
.map(|res| res.read().unwrap().values().cloned().collect()) .map(|res| res.read().unwrap().values().cloned().collect())
.unwrap_or_default(); .unwrap_or_default();
self.thread_pool.install(|| { let num_accounts = storage_maps
storage_maps.par_iter().for_each(|storage| { .iter()
let accounts = storage.accounts.accounts(0); .map(|storage| storage.approx_stored_count())
accounts.into_iter().for_each(|stored_account| { .sum();
let entry = accounts_map let mut accounts_map: AccountsMap = AccountsMap::with_capacity(num_accounts);
.get(&stored_account.meta.pubkey) storage_maps.iter().for_each(|storage| {
.unwrap_or_else(|| { let accounts = storage.accounts.accounts(0);
accounts_map accounts.into_iter().for_each(|stored_account| {
.entry(stored_account.meta.pubkey) let entry = accounts_map
.or_insert(Mutex::new(BTreeMap::new())) .entry(stored_account.meta.pubkey)
.downgrade() .or_insert_with(BTreeMap::new);
}); assert!(
assert!( // There should only be one update per write version for a specific slot
// There should only be one update per write version for a specific slot // and account
// and account entry
entry .insert(
.lock() stored_account.meta.write_version,
.unwrap() (storage.append_vec_id(), stored_account)
.insert( )
stored_account.meta.write_version, .is_none()
(storage.append_vec_id(), stored_account) );
) })
.is_none()
);
})
});
}); });
// Need to restore indexes even with older write versions which may // Need to restore indexes even with older write versions which may
// be shielding other accounts. When they are then purged, the // be shielding other accounts. When they are then purged, the
@ -4113,16 +4110,14 @@ impl AccountsDB {
if !accounts_map.is_empty() { if !accounts_map.is_empty() {
let mut _reclaims: Vec<(u64, AccountInfo)> = vec![]; let mut _reclaims: Vec<(u64, AccountInfo)> = vec![];
for (pubkey, account_infos) in accounts_map.into_iter() { for (pubkey, account_infos) in accounts_map.into_iter() {
for (_, (store_id, stored_account)) in for (_, (store_id, stored_account)) in account_infos.into_iter() {
account_infos.into_inner().unwrap().into_iter()
{
let account_info = AccountInfo { let account_info = AccountInfo {
store_id, store_id,
offset: stored_account.offset, offset: stored_account.offset,
stored_size: stored_account.stored_size, stored_size: stored_account.stored_size,
lamports: stored_account.account_meta.lamports, lamports: stored_account.account_meta.lamports,
}; };
self.accounts_index.upsert( self.accounts_index.insert_new_if_missing(
*slot, *slot,
&pubkey, &pubkey,
&stored_account.account_meta.owner, &stored_account.account_meta.owner,
@ -4168,9 +4163,6 @@ impl AccountsDB {
trace!("id: {} clearing count", id); trace!("id: {} clearing count", id);
store.count_and_status.write().unwrap().0 = 0; store.count_and_status.write().unwrap().0 = 0;
} }
store
.approx_store_count
.store(store.accounts.accounts(0).len(), Ordering::Relaxed);
} }
} }
} }

View File

@ -506,6 +506,21 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
.map(WriteAccountMapEntry::from_account_map_entry) .map(WriteAccountMapEntry::from_account_map_entry)
} }
fn insert_new_entry_if_missing(&self, pubkey: &Pubkey) -> (WriteAccountMapEntry<T>, bool) {
let new_entry = Arc::new(AccountMapEntryInner {
ref_count: AtomicU64::new(0),
slot_list: RwLock::new(SlotList::with_capacity(1)),
});
let mut w_account_maps = self.account_maps.write().unwrap();
let mut is_newly_inserted = false;
let account_entry = w_account_maps.entry(*pubkey).or_insert_with(|| {
is_newly_inserted = true;
new_entry
});
let w_account_entry = WriteAccountMapEntry::from_account_map_entry(account_entry.clone());
(w_account_entry, is_newly_inserted)
}
fn get_account_write_entry_else_create( fn get_account_write_entry_else_create(
&self, &self,
pubkey: &Pubkey, pubkey: &Pubkey,
@ -513,18 +528,9 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
let mut w_account_entry = self.get_account_write_entry(pubkey); let mut w_account_entry = self.get_account_write_entry(pubkey);
let mut is_newly_inserted = false; let mut is_newly_inserted = false;
if w_account_entry.is_none() { if w_account_entry.is_none() {
let new_entry = Arc::new(AccountMapEntryInner { let entry_is_new = self.insert_new_entry_if_missing(pubkey);
ref_count: AtomicU64::new(0), w_account_entry = Some(entry_is_new.0);
slot_list: RwLock::new(SlotList::with_capacity(1)), is_newly_inserted = entry_is_new.1;
});
let mut w_account_maps = self.account_maps.write().unwrap();
let account_entry = w_account_maps.entry(*pubkey).or_insert_with(|| {
is_newly_inserted = true;
new_entry
});
w_account_entry = Some(WriteAccountMapEntry::from_account_map_entry(
account_entry.clone(),
));
} }
(w_account_entry.unwrap(), is_newly_inserted) (w_account_entry.unwrap(), is_newly_inserted)
@ -726,6 +732,10 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
account_data: &[u8], account_data: &[u8],
account_indexes: &HashSet<AccountIndex>, account_indexes: &HashSet<AccountIndex>,
) { ) {
if account_indexes.is_empty() {
return;
}
if account_indexes.contains(&AccountIndex::ProgramId) { if account_indexes.contains(&AccountIndex::ProgramId) {
self.program_id_index.insert(account_owner, pubkey, slot); self.program_id_index.insert(account_owner, pubkey, slot);
} }
@ -763,6 +773,26 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
} }
} }
// Same functionally to upsert, but doesn't take the read lock
// initially on the accounts_map
// Can save time when inserting lots of new keys
pub fn insert_new_if_missing(
&self,
slot: Slot,
pubkey: &Pubkey,
account_owner: &Pubkey,
account_data: &[u8],
account_indexes: &HashSet<AccountIndex>,
account_info: T,
reclaims: &mut SlotList<T>,
) {
{
let (mut w_account_entry, _is_new) = self.insert_new_entry_if_missing(pubkey);
w_account_entry.update(slot, account_info, reclaims);
}
self.update_secondary_indexes(pubkey, slot, account_owner, account_data, account_indexes);
}
// Updates the given pubkey at the given slot with the new account information. // Updates the given pubkey at the given slot with the new account information.
// Returns true if the pubkey was newly inserted into the index, otherwise, if the // Returns true if the pubkey was newly inserted into the index, otherwise, if the
// pubkey updates an existing entry in the index, returns false. // pubkey updates an existing entry in the index, returns false.

View File

@ -271,7 +271,7 @@ impl AppendVec {
} }
#[allow(clippy::mutex_atomic)] #[allow(clippy::mutex_atomic)]
pub fn set_file<P: AsRef<Path>>(&mut self, path: P) -> io::Result<()> { pub fn set_file<P: AsRef<Path>>(&mut self, path: P) -> io::Result<usize> {
// this AppendVec must not hold actual file; // this AppendVec must not hold actual file;
assert_eq!(self.file_size, 0); assert_eq!(self.file_size, 0);
@ -293,17 +293,18 @@ impl AppendVec {
self.path = path.as_ref().to_path_buf(); self.path = path.as_ref().to_path_buf();
self.map = map; self.map = map;
if !self.sanitize_layout_and_length() { let (sanitized, num_accounts) = self.sanitize_layout_and_length();
if !sanitized {
return Err(std::io::Error::new( return Err(std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
"incorrect layout/length/data", "incorrect layout/length/data",
)); ));
} }
Ok(()) Ok(num_accounts)
} }
fn sanitize_layout_and_length(&self) -> bool { fn sanitize_layout_and_length(&self) -> (bool, usize) {
let mut offset = 0; let mut offset = 0;
// This discards allocated accounts immediately after check at each loop iteration. // This discards allocated accounts immediately after check at each loop iteration.
@ -311,15 +312,17 @@ impl AppendVec {
// This code should not reuse AppendVec.accounts() method as the current form or // This code should not reuse AppendVec.accounts() method as the current form or
// extend it to be reused here because it would allow attackers to accumulate // extend it to be reused here because it would allow attackers to accumulate
// some measurable amount of memory needlessly. // some measurable amount of memory needlessly.
let mut num_accounts = 0;
while let Some((account, next_offset)) = self.get_account(offset) { while let Some((account, next_offset)) = self.get_account(offset) {
if !account.sanitize() { if !account.sanitize() {
return false; return (false, num_accounts);
} }
offset = next_offset; offset = next_offset;
num_accounts += 1;
} }
let aligned_current_len = u64_align!(self.current_len.load(Ordering::Relaxed)); let aligned_current_len = u64_align!(self.current_len.load(Ordering::Relaxed));
offset == aligned_current_len (offset == aligned_current_len, num_accounts)
} }
fn get_slice(&self, offset: usize, size: usize) -> Option<(&[u8], usize)> { fn get_slice(&self, offset: usize, size: usize) -> Option<(&[u8], usize)> {