diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 3b791cd20f..89b9afafac 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -505,7 +505,10 @@ impl AccountStorageEntry { } pub fn set_file>(&mut self, path: P) -> IOResult<()> { - self.accounts.set_file(path) + let num_accounts = self.accounts.set_file(path)?; + self.approx_store_count + .store(num_accounts, Ordering::Relaxed); + Ok(()) } pub fn get_relative_path(&self) -> Option { @@ -4060,8 +4063,7 @@ impl AccountsDB { } pub fn generate_index(&self) { - type AccountsMap<'a> = - DashMap)>>>; + type AccountsMap<'a> = HashMap)>>; let mut slots = self.storage.all_slots(); #[allow(clippy::stable_sort_primitive)] slots.sort(); @@ -4069,42 +4071,37 @@ impl AccountsDB { let mut last_log_update = Instant::now(); for (index, slot) in slots.iter().enumerate() { let now = Instant::now(); - if now.duration_since(last_log_update).as_secs() >= 10 { + if now.duration_since(last_log_update).as_secs() >= 2 { info!("generating index: {}/{} slots...", index, slots.len()); last_log_update = now; } - let accounts_map: AccountsMap = AccountsMap::new(); let storage_maps: Vec> = self .storage .get_slot_stores(*slot) .map(|res| res.read().unwrap().values().cloned().collect()) .unwrap_or_default(); - self.thread_pool.install(|| { - storage_maps.par_iter().for_each(|storage| { - let accounts = storage.accounts.accounts(0); - accounts.into_iter().for_each(|stored_account| { - let entry = accounts_map - .get(&stored_account.meta.pubkey) - .unwrap_or_else(|| { - accounts_map - .entry(stored_account.meta.pubkey) - .or_insert(Mutex::new(BTreeMap::new())) - .downgrade() - }); - assert!( - // There should only be one update per write version for a specific slot - // and account - entry - .lock() - .unwrap() - .insert( - stored_account.meta.write_version, - (storage.append_vec_id(), stored_account) - ) - .is_none() - ); - }) - }); + let num_accounts = storage_maps + .iter() + .map(|storage| storage.approx_stored_count()) + .sum(); + let mut accounts_map: AccountsMap = AccountsMap::with_capacity(num_accounts); + storage_maps.iter().for_each(|storage| { + let accounts = storage.accounts.accounts(0); + accounts.into_iter().for_each(|stored_account| { + let entry = accounts_map + .entry(stored_account.meta.pubkey) + .or_insert_with(BTreeMap::new); + assert!( + // There should only be one update per write version for a specific slot + // and account + entry + .insert( + stored_account.meta.write_version, + (storage.append_vec_id(), stored_account) + ) + .is_none() + ); + }) }); // Need to restore indexes even with older write versions which may // be shielding other accounts. When they are then purged, the @@ -4113,16 +4110,14 @@ impl AccountsDB { if !accounts_map.is_empty() { let mut _reclaims: Vec<(u64, AccountInfo)> = vec![]; for (pubkey, account_infos) in accounts_map.into_iter() { - for (_, (store_id, stored_account)) in - account_infos.into_inner().unwrap().into_iter() - { + for (_, (store_id, stored_account)) in account_infos.into_iter() { let account_info = AccountInfo { store_id, offset: stored_account.offset, stored_size: stored_account.stored_size, lamports: stored_account.account_meta.lamports, }; - self.accounts_index.upsert( + self.accounts_index.insert_new_if_missing( *slot, &pubkey, &stored_account.account_meta.owner, @@ -4168,9 +4163,6 @@ impl AccountsDB { trace!("id: {} clearing count", id); store.count_and_status.write().unwrap().0 = 0; } - store - .approx_store_count - .store(store.accounts.accounts(0).len(), Ordering::Relaxed); } } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 31d3b0bbe7..70eeee4d2e 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -506,6 +506,21 @@ impl AccountsIndex { .map(WriteAccountMapEntry::from_account_map_entry) } + fn insert_new_entry_if_missing(&self, pubkey: &Pubkey) -> (WriteAccountMapEntry, bool) { + let new_entry = Arc::new(AccountMapEntryInner { + ref_count: AtomicU64::new(0), + slot_list: RwLock::new(SlotList::with_capacity(1)), + }); + let mut w_account_maps = self.account_maps.write().unwrap(); + let mut is_newly_inserted = false; + let account_entry = w_account_maps.entry(*pubkey).or_insert_with(|| { + is_newly_inserted = true; + new_entry + }); + let w_account_entry = WriteAccountMapEntry::from_account_map_entry(account_entry.clone()); + (w_account_entry, is_newly_inserted) + } + fn get_account_write_entry_else_create( &self, pubkey: &Pubkey, @@ -513,18 +528,9 @@ impl AccountsIndex { let mut w_account_entry = self.get_account_write_entry(pubkey); let mut is_newly_inserted = false; if w_account_entry.is_none() { - let new_entry = Arc::new(AccountMapEntryInner { - ref_count: AtomicU64::new(0), - slot_list: RwLock::new(SlotList::with_capacity(1)), - }); - let mut w_account_maps = self.account_maps.write().unwrap(); - let account_entry = w_account_maps.entry(*pubkey).or_insert_with(|| { - is_newly_inserted = true; - new_entry - }); - w_account_entry = Some(WriteAccountMapEntry::from_account_map_entry( - account_entry.clone(), - )); + let entry_is_new = self.insert_new_entry_if_missing(pubkey); + w_account_entry = Some(entry_is_new.0); + is_newly_inserted = entry_is_new.1; } (w_account_entry.unwrap(), is_newly_inserted) @@ -726,6 +732,10 @@ impl AccountsIndex { account_data: &[u8], account_indexes: &HashSet, ) { + if account_indexes.is_empty() { + return; + } + if account_indexes.contains(&AccountIndex::ProgramId) { self.program_id_index.insert(account_owner, pubkey, slot); } @@ -763,6 +773,26 @@ impl AccountsIndex { } } + // Same functionally to upsert, but doesn't take the read lock + // initially on the accounts_map + // Can save time when inserting lots of new keys + pub fn insert_new_if_missing( + &self, + slot: Slot, + pubkey: &Pubkey, + account_owner: &Pubkey, + account_data: &[u8], + account_indexes: &HashSet, + account_info: T, + reclaims: &mut SlotList, + ) { + { + let (mut w_account_entry, _is_new) = self.insert_new_entry_if_missing(pubkey); + w_account_entry.update(slot, account_info, reclaims); + } + self.update_secondary_indexes(pubkey, slot, account_owner, account_data, account_indexes); + } + // Updates the given pubkey at the given slot with the new account information. // Returns true if the pubkey was newly inserted into the index, otherwise, if the // pubkey updates an existing entry in the index, returns false. diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index adb2162ee6..126c728392 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -271,7 +271,7 @@ impl AppendVec { } #[allow(clippy::mutex_atomic)] - pub fn set_file>(&mut self, path: P) -> io::Result<()> { + pub fn set_file>(&mut self, path: P) -> io::Result { // this AppendVec must not hold actual file; assert_eq!(self.file_size, 0); @@ -293,17 +293,18 @@ impl AppendVec { self.path = path.as_ref().to_path_buf(); self.map = map; - if !self.sanitize_layout_and_length() { + let (sanitized, num_accounts) = self.sanitize_layout_and_length(); + if !sanitized { return Err(std::io::Error::new( std::io::ErrorKind::Other, "incorrect layout/length/data", )); } - Ok(()) + Ok(num_accounts) } - fn sanitize_layout_and_length(&self) -> bool { + fn sanitize_layout_and_length(&self) -> (bool, usize) { let mut offset = 0; // This discards allocated accounts immediately after check at each loop iteration. @@ -311,15 +312,17 @@ impl AppendVec { // This code should not reuse AppendVec.accounts() method as the current form or // extend it to be reused here because it would allow attackers to accumulate // some measurable amount of memory needlessly. + let mut num_accounts = 0; while let Some((account, next_offset)) = self.get_account(offset) { if !account.sanitize() { - return false; + return (false, num_accounts); } offset = next_offset; + num_accounts += 1; } let aligned_current_len = u64_align!(self.current_len.load(Ordering::Relaxed)); - offset == aligned_current_len + (offset == aligned_current_len, num_accounts) } fn get_slice(&self, offset: usize, size: usize) -> Option<(&[u8], usize)> {