Accounts cleanup service and perf improvements (#8799)

* Use atomic for ref count instead of taking rwlock

* Accounts cleanup service

* Review comments
This commit is contained in:
sakridge
2020-03-23 08:50:23 -07:00
committed by GitHub
parent 4d2b83d01f
commit 4b397d15b3
8 changed files with 249 additions and 64 deletions

View File

@@ -467,11 +467,37 @@ pub struct AccountsDB {
/// Thread pool used for par_iter
pub thread_pool: ThreadPool,
pub thread_pool_clean: ThreadPool,
/// Number of append vecs to create to maximize parallelism when scanning
/// the accounts
min_num_stores: usize,
pub bank_hashes: RwLock<HashMap<Slot, BankHashInfo>>,
pub dead_slots: RwLock<HashSet<Slot>>,
}
fn make_min_priority_thread_pool() -> ThreadPool {
use thread_priority::{
set_thread_priority, thread_native_id, NormalThreadSchedulePolicy, ThreadPriority,
ThreadSchedulePolicy,
};
let num_threads = get_thread_count();
rayon::ThreadPoolBuilder::new()
.start_handler(|_id| {
let thread_id = thread_native_id();
set_thread_priority(
thread_id,
ThreadPriority::Min,
ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Normal),
)
.unwrap();
})
.thread_name(|i| format!("solana-accounts-cleanup-{}", i))
.num_threads(num_threads)
.build()
.unwrap()
}
impl Default for AccountsDB {
@@ -490,11 +516,14 @@ impl Default for AccountsDB {
file_size: DEFAULT_FILE_SIZE,
thread_pool: rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("solana-accounts-db-{}", i))
.build()
.unwrap(),
thread_pool_clean: make_min_priority_thread_pool(),
min_num_stores: num_threads,
bank_hashes: RwLock::new(bank_hashes),
frozen_accounts: HashMap::new(),
dead_slots: RwLock::new(HashSet::new()),
}
}
}
@@ -653,7 +682,7 @@ impl AccountsDB {
// the hot loop will be the order of ~Xms.
const INDEX_CLEAN_BULK_COUNT: usize = 4096;
let mut measure = Measure::start("clean_old_root-ms");
let mut clean_rooted = Measure::start("clean_old_root-ms");
let reclaim_vecs =
purges_in_root
.par_chunks(INDEX_CLEAN_BULK_COUNT)
@@ -666,19 +695,19 @@ impl AccountsDB {
reclaims
});
let reclaims: Vec<_> = reclaim_vecs.flatten().collect();
measure.stop();
inc_new_counter_info!("clean-old-root-par-clean-ms", measure.as_ms() as usize);
clean_rooted.stop();
inc_new_counter_info!("clean-old-root-par-clean-ms", clean_rooted.as_ms() as usize);
let mut measure = Measure::start("clean_old_root-ms");
let mut measure = Measure::start("clean_old_root_reclaims");
self.handle_reclaims(&reclaims);
measure.stop();
debug!("{} {}", clean_rooted, measure);
inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize);
}
fn clear_uncleaned_roots(&self) {
let mut accounts_index = self.accounts_index.write().unwrap();
accounts_index.uncleaned_roots.clear();
drop(accounts_index);
}
// Purge zero lamport accounts and older rooted account states as garbage
@@ -687,25 +716,55 @@ impl AccountsDB {
// can be purged because there are no live append vecs in the ancestors
pub fn clean_accounts(&self) {
self.report_store_stats();
let mut purges = HashMap::new();
let mut purges_in_root = Vec::new();
let no_ancestors = HashMap::new();
let mut accounts_scan = Measure::start("accounts_scan");
let accounts_index = self.accounts_index.read().unwrap();
let pubkeys: Vec<Pubkey> = accounts_index.account_maps.keys().cloned().collect();
// parallel scan the index.
let (mut purges, purges_in_root) = pubkeys
.par_chunks(4096)
.map(|pubkeys: &[Pubkey]| {
let mut purges_in_root = Vec::new();
let mut purges = HashMap::new();
for pubkey in pubkeys {
if let Some((list, index)) = accounts_index.get(pubkey, &no_ancestors) {
let (slot, account_info) = &list[index];
if account_info.lamports == 0 {
purges.insert(*pubkey, accounts_index.would_purge(pubkey));
} else if accounts_index.uncleaned_roots.contains(slot) {
purges_in_root.push(*pubkey);
}
}
}
(purges, purges_in_root)
})
.reduce(
|| (HashMap::new(), Vec::new()),
|m1, m2| {
// Collapse down the hashmaps/vecs into one.
let x = m2.0.iter().fold(m1.0, |mut acc, (k, vs)| {
acc.insert(k.clone(), vs.clone());
acc
});
let mut y = vec![];
y.extend(m1.1);
y.extend(m2.1);
(x, y)
},
);
accounts_index.scan_accounts(&no_ancestors, |pubkey, (account_info, slot)| {
if account_info.lamports == 0 {
purges.insert(*pubkey, accounts_index.would_purge(pubkey));
} else if accounts_index.uncleaned_roots.contains(&slot) {
purges_in_root.push(*pubkey);
}
});
drop(accounts_index);
accounts_scan.stop();
let mut clean_old_rooted = Measure::start("clean_old_roots");
if !purges_in_root.is_empty() {
self.clean_old_rooted_accounts(purges_in_root);
}
self.clear_uncleaned_roots();
clean_old_rooted.stop();
let mut store_counts_time = Measure::start("store_counts");
let accounts_index = self.accounts_index.read().unwrap();
// Calculate store counts as if everything was purged
@@ -741,9 +800,11 @@ impl AccountsDB {
}
}
}
store_counts_time.stop();
// Only keep purges where the entire history of the account in the root set
// can be purged. All AppendVecs for those updates are dead.
let mut purge_filter = Measure::start("purge_filter");
purges.retain(|pubkey, account_infos| {
let mut would_unref_count = 0;
for (_slot_id, account_info) in account_infos {
@@ -756,7 +817,9 @@ impl AccountsDB {
would_unref_count == accounts_index.ref_count_from_storage(&pubkey)
});
purge_filter.stop();
let mut reclaims_time = Measure::start("reclaims");
// Recalculate reclaims with new purge set
let mut reclaims = Vec::new();
let mut dead_keys = Vec::new();
@@ -768,8 +831,8 @@ impl AccountsDB {
reclaims.extend(new_reclaims);
}
drop(accounts_index);
drop(storage);
drop(accounts_index);
if !dead_keys.is_empty() {
let mut accounts_index = self.accounts_index.write().unwrap();
@@ -779,22 +842,47 @@ impl AccountsDB {
}
self.handle_reclaims(&reclaims);
reclaims_time.stop();
debug!(
"clean_accounts: {} {} {} {}",
accounts_scan, store_counts_time, purge_filter, reclaims_time
);
}
fn handle_reclaims(&self, reclaims: &[(Slot, AccountInfo)]) {
let mut dead_accounts = Measure::start("reclaims::remove_dead_accounts");
let mut dead_slots = self.remove_dead_accounts(reclaims);
let dead_slots = self.remove_dead_accounts(reclaims);
dead_accounts.stop();
let dead_slots_len = {
let mut dead_slots_w = self.dead_slots.write().unwrap();
dead_slots_w.extend(dead_slots);
dead_slots_w.len()
};
if dead_slots_len > 5000 {
self.process_dead_slots();
}
}
pub fn process_dead_slots(&self) {
let empty = HashSet::new();
let mut dead_slots_w = self.dead_slots.write().unwrap();
let dead_slots = std::mem::replace(&mut *dead_slots_w, empty);
drop(dead_slots_w);
let mut clean_dead_slots = Measure::start("reclaims::purge_slots");
self.clean_dead_slots(&mut dead_slots);
self.clean_dead_slots(&dead_slots);
clean_dead_slots.stop();
let mut purge_slots = Measure::start("reclaims::purge_slots");
for slot in dead_slots {
self.purge_slot(slot);
}
self.purge_slots(&dead_slots);
purge_slots.stop();
debug!(
"process_dead_slots({}): {} {}",
dead_slots.len(),
clean_dead_slots,
purge_slots
);
}
pub fn scan_accounts<F, A>(&self, ancestors: &HashMap<Slot, usize>, scan_func: F) -> A
@@ -884,10 +972,10 @@ impl AccountsDB {
pubkey: &Pubkey,
) -> Option<(Account, Slot)> {
let (lock, index) = accounts_index.get(pubkey, ancestors)?;
let slot = lock.1[index].0;
let slot = lock[index].0;
//TODO: thread this as a ref
if let Some(slot_storage) = storage.0.get(&slot) {
let info = &lock.1[index].1;
let info = &lock[index].1;
slot_storage
.get(&info.store_id)
.and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account()))
@@ -965,10 +1053,22 @@ impl AccountsDB {
}
pub fn purge_slot(&self, slot: Slot) {
let mut slots = HashSet::new();
slots.insert(slot);
self.purge_slots(&slots);
}
pub fn purge_slots(&self, slots: &HashSet<Slot>) {
//add_root should be called first
let is_root = self.accounts_index.read().unwrap().is_root(slot);
if !is_root {
self.storage.write().unwrap().0.remove(&slot);
let accounts_index = self.accounts_index.read().unwrap();
let non_roots: Vec<_> = slots
.iter()
.filter(|slot| !accounts_index.is_root(**slot))
.collect();
drop(accounts_index);
let mut storage = self.storage.write().unwrap();
for slot in non_roots {
storage.0.remove(&slot);
}
}
@@ -1197,7 +1297,7 @@ impl AccountsDB {
.par_iter()
.filter_map(|pubkey| {
if let Some((list, index)) = accounts_index.get(pubkey, ancestors) {
let (slot, account_info) = &list.1[index];
let (slot, account_info) = &list[index];
if account_info.lamports != 0 {
storage
.0
@@ -1369,20 +1469,39 @@ impl AccountsDB {
dead_slots
}
fn clean_dead_slots(&self, dead_slots: &mut HashSet<Slot>) {
pub fn clean_dead_slots(&self, dead_slots: &HashSet<Slot>) {
if !dead_slots.is_empty() {
{
let mut measure = Measure::start("clean_dead_slots-ms");
let index = self.accounts_index.read().unwrap();
let storage = self.storage.read().unwrap();
let mut stores: Vec<Arc<AccountStorageEntry>> = vec![];
for slot in dead_slots.iter() {
for store in storage.0.get(slot).unwrap().values() {
for account in store.accounts.accounts(0) {
index.unref_from_storage(&account.meta.pubkey);
}
stores.push(store.clone());
}
}
drop(storage);
datapoint_info!("clean_dead_slots", ("stores", stores.len(), i64));
let pubkeys: Vec<Vec<Pubkey>> = {
self.thread_pool_clean.install(|| {
stores
.into_par_iter()
.map(|store| {
let accounts = store.accounts.accounts(0);
accounts
.into_iter()
.map(|account| account.meta.pubkey)
.collect::<Vec<Pubkey>>()
})
.collect()
})
};
let index = self.accounts_index.read().unwrap();
for pubkey_v in pubkeys {
for pubkey in pubkey_v {
index.unref_from_storage(&pubkey);
}
}
drop(index);
measure.stop();
inc_new_counter_info!("clean_dead_slots-unref-ms", measure.as_ms() as usize);
@@ -1594,7 +1713,7 @@ impl AccountsDB {
let mut counts = HashMap::new();
for slot_list in accounts_index.account_maps.values() {
for (_slot, account_entry) in slot_list.read().unwrap().1.iter() {
for (_slot, account_entry) in slot_list.1.read().unwrap().iter() {
*counts.entry(account_entry.store_id).or_insert(0) += 1;
}
}
@@ -2072,6 +2191,7 @@ pub mod tests {
#[test]
fn test_lazy_gc_slot() {
solana_logger::setup();
//This test is pedantic
//A slot is purged when a non root bank is cleaned up. If a slot is behind root but it is
//not root, it means we are retaining dead banks.
@@ -2084,7 +2204,7 @@ pub mod tests {
let id = {
let index = accounts.accounts_index.read().unwrap();
let (list, idx) = index.get(&pubkey, &ancestors).unwrap();
list.1[idx].1.store_id
list[idx].1.store_id
};
accounts.add_root(1);
@@ -2095,6 +2215,9 @@ pub mod tests {
accounts.store(1, &[(&pubkey, &account)]);
//slot is gone
print_accounts("pre-clean", &accounts);
accounts.clean_accounts();
accounts.process_dead_slots();
assert!(accounts.storage.read().unwrap().0.get(&0).is_none());
//new value is there
@@ -2270,7 +2393,7 @@ pub mod tests {
info!("{}: accounts.accounts_index roots: {:?}", label, roots,);
for (pubkey, list) in &accounts.accounts_index.read().unwrap().account_maps {
info!(" key: {}", pubkey);
info!(" slots: {:?}", *list.read().unwrap());
info!(" slots: {:?}", *list.1.read().unwrap());
}
}
@@ -2477,9 +2600,9 @@ pub mod tests {
.account_maps
.get(&pubkey)
.unwrap()
.1
.read()
.unwrap()
.1
.len(),
2
);
@@ -2529,6 +2652,7 @@ pub mod tests {
let hash = accounts.update_accounts_hash(current_slot, &ancestors);
accounts.clean_accounts();
accounts.process_dead_slots();
assert_eq!(
accounts.update_accounts_hash(current_slot, &ancestors),
@@ -3333,6 +3457,7 @@ pub mod tests {
current_slot += 1;
assert_eq!(4, accounts.ref_count_for_pubkey(&pubkey1));
accounts.store(current_slot, &[(&pubkey1, &zero_lamport_account)]);
accounts.process_dead_slots();
assert_eq!(
3, /* == 4 - 2 + 1 */
accounts.ref_count_for_pubkey(&pubkey1)