From 4b397d15b3e6193ecb4059ea0c53cab6bcf052c8 Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 23 Mar 2020 08:50:23 -0700 Subject: [PATCH] Accounts cleanup service and perf improvements (#8799) * Use atomic for ref count instead of taking rwlock * Accounts cleanup service * Review comments --- Cargo.lock | 10 ++ core/src/accounts_cleanup_service.rs | 38 ++++++ core/src/lib.rs | 1 + core/src/tvu.rs | 6 + runtime/Cargo.toml | 1 + runtime/src/accounts_db.rs | 195 ++++++++++++++++++++++----- runtime/src/accounts_index.rs | 58 ++++---- runtime/src/bank.rs | 4 + 8 files changed, 249 insertions(+), 64 deletions(-) create mode 100644 core/src/accounts_cleanup_service.rs diff --git a/Cargo.lock b/Cargo.lock index e3d9db5b5d..1768c5c043 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4511,6 +4511,7 @@ dependencies = [ "solana-vote-program 1.1.0", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "thiserror 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)", + "thread-priority 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -5360,6 +5361,14 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "thread-priority" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "thread-scoped" version = "1.0.2" @@ -6667,6 +6676,7 @@ dependencies = [ "checksum thiserror-impl 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a3ecbaa927a1d5a73d14a20af52463fa433c0727d07ef5e208f0546841d2efd" "checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03" "checksum thread-id 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c7fbf4c9d56b320106cd64fd024dadfa0be7cb4706725fc44a7d7ce952d820c1" +"checksum thread-priority 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "52c084e908948709a7f7f6d44b5368e0134aa322e0e569431a92c989bf855188" "checksum thread-scoped 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bcbb6aa301e5d3b0b5ef639c9a9c7e2f1c944f177b460c04dc24c69b1fa2bd99" "checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5" "checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b" diff --git a/core/src/accounts_cleanup_service.rs b/core/src/accounts_cleanup_service.rs new file mode 100644 index 0000000000..d05595972b --- /dev/null +++ b/core/src/accounts_cleanup_service.rs @@ -0,0 +1,38 @@ +// Service to clean up dead slots in accounts_db +// +// This can be expensive since we have to walk the append vecs being cleaned up. + +use solana_ledger::bank_forks::BankForks; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, +}; +use std::thread::{self, sleep, Builder, JoinHandle}; +use std::time::Duration; + +pub struct AccountsCleanupService { + t_cleanup: JoinHandle<()>, +} + +impl AccountsCleanupService { + pub fn new(bank_forks: Arc>, exit: &Arc) -> Self { + info!("AccountsCleanupService active"); + let exit = exit.clone(); + let t_cleanup = Builder::new() + .name("solana-accounts-cleanup".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + let bank = bank_forks.read().unwrap().working_bank(); + bank.clean_dead_slots(); + sleep(Duration::from_millis(100)); + }) + .unwrap(); + Self { t_cleanup } + } + + pub fn join(self) -> thread::Result<()> { + self.t_cleanup.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 429ffe3de2..0648f007de 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -5,6 +5,7 @@ //! command-line tools to spin up validators and a Rust library //! +pub mod accounts_cleanup_service; pub mod accounts_hash_verifier; pub mod banking_stage; pub mod broadcast_stage; diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 62d82ce26f..978fbb0dcc 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -2,6 +2,7 @@ //! validation pipeline in software. use crate::{ + accounts_cleanup_service::AccountsCleanupService, accounts_hash_verifier::AccountsHashVerifier, broadcast_stage::RetransmitSlotsSender, cluster_info::ClusterInfo, @@ -47,6 +48,7 @@ pub struct Tvu { retransmit_stage: RetransmitStage, replay_stage: ReplayStage, ledger_cleanup_service: Option, + accounts_cleanup_service: AccountsCleanupService, storage_stage: StorageStage, accounts_hash_verifier: AccountsHashVerifier, } @@ -205,6 +207,8 @@ impl Tvu { ) }); + let accounts_cleanup_service = AccountsCleanupService::new(bank_forks.clone(), &exit); + let storage_stage = StorageStage::new( storage_state, root_bank_receiver, @@ -222,6 +226,7 @@ impl Tvu { retransmit_stage, replay_stage, ledger_cleanup_service, + accounts_cleanup_service, storage_stage, accounts_hash_verifier, } @@ -235,6 +240,7 @@ impl Tvu { if self.ledger_cleanup_service.is_some() { self.ledger_cleanup_service.unwrap().join()?; } + self.accounts_cleanup_service.join()?; self.replay_stage.join()?; self.accounts_hash_verifier.join()?; Ok(()) diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index e28a3e800c..93eeee5479 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -37,6 +37,7 @@ solana-storage-program = { path = "../programs/storage", version = "1.1.0" } solana-vote-program = { path = "../programs/vote", version = "1.1.0" } tempfile = "3.1.0" thiserror = "1.0" +thread-priority = "0.1.1" [lib] diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 60e15a1394..bdaf437289 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -467,11 +467,37 @@ pub struct AccountsDB { /// Thread pool used for par_iter pub thread_pool: ThreadPool, + pub thread_pool_clean: ThreadPool, + /// Number of append vecs to create to maximize parallelism when scanning /// the accounts min_num_stores: usize, pub bank_hashes: RwLock>, + + pub dead_slots: RwLock>, +} + +fn make_min_priority_thread_pool() -> ThreadPool { + use thread_priority::{ + set_thread_priority, thread_native_id, NormalThreadSchedulePolicy, ThreadPriority, + ThreadSchedulePolicy, + }; + let num_threads = get_thread_count(); + rayon::ThreadPoolBuilder::new() + .start_handler(|_id| { + let thread_id = thread_native_id(); + set_thread_priority( + thread_id, + ThreadPriority::Min, + ThreadSchedulePolicy::Normal(NormalThreadSchedulePolicy::Normal), + ) + .unwrap(); + }) + .thread_name(|i| format!("solana-accounts-cleanup-{}", i)) + .num_threads(num_threads) + .build() + .unwrap() } impl Default for AccountsDB { @@ -490,11 +516,14 @@ impl Default for AccountsDB { file_size: DEFAULT_FILE_SIZE, thread_pool: rayon::ThreadPoolBuilder::new() .num_threads(num_threads) + .thread_name(|i| format!("solana-accounts-db-{}", i)) .build() .unwrap(), + thread_pool_clean: make_min_priority_thread_pool(), min_num_stores: num_threads, bank_hashes: RwLock::new(bank_hashes), frozen_accounts: HashMap::new(), + dead_slots: RwLock::new(HashSet::new()), } } } @@ -653,7 +682,7 @@ impl AccountsDB { // the hot loop will be the order of ~Xms. const INDEX_CLEAN_BULK_COUNT: usize = 4096; - let mut measure = Measure::start("clean_old_root-ms"); + let mut clean_rooted = Measure::start("clean_old_root-ms"); let reclaim_vecs = purges_in_root .par_chunks(INDEX_CLEAN_BULK_COUNT) @@ -666,19 +695,19 @@ impl AccountsDB { reclaims }); let reclaims: Vec<_> = reclaim_vecs.flatten().collect(); - measure.stop(); - inc_new_counter_info!("clean-old-root-par-clean-ms", measure.as_ms() as usize); + clean_rooted.stop(); + inc_new_counter_info!("clean-old-root-par-clean-ms", clean_rooted.as_ms() as usize); - let mut measure = Measure::start("clean_old_root-ms"); + let mut measure = Measure::start("clean_old_root_reclaims"); self.handle_reclaims(&reclaims); measure.stop(); + debug!("{} {}", clean_rooted, measure); inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize); } fn clear_uncleaned_roots(&self) { let mut accounts_index = self.accounts_index.write().unwrap(); accounts_index.uncleaned_roots.clear(); - drop(accounts_index); } // Purge zero lamport accounts and older rooted account states as garbage @@ -687,25 +716,55 @@ impl AccountsDB { // can be purged because there are no live append vecs in the ancestors pub fn clean_accounts(&self) { self.report_store_stats(); - let mut purges = HashMap::new(); - let mut purges_in_root = Vec::new(); + let no_ancestors = HashMap::new(); + let mut accounts_scan = Measure::start("accounts_scan"); let accounts_index = self.accounts_index.read().unwrap(); + let pubkeys: Vec = accounts_index.account_maps.keys().cloned().collect(); + // parallel scan the index. + let (mut purges, purges_in_root) = pubkeys + .par_chunks(4096) + .map(|pubkeys: &[Pubkey]| { + let mut purges_in_root = Vec::new(); + let mut purges = HashMap::new(); + for pubkey in pubkeys { + if let Some((list, index)) = accounts_index.get(pubkey, &no_ancestors) { + let (slot, account_info) = &list[index]; + if account_info.lamports == 0 { + purges.insert(*pubkey, accounts_index.would_purge(pubkey)); + } else if accounts_index.uncleaned_roots.contains(slot) { + purges_in_root.push(*pubkey); + } + } + } + (purges, purges_in_root) + }) + .reduce( + || (HashMap::new(), Vec::new()), + |m1, m2| { + // Collapse down the hashmaps/vecs into one. + let x = m2.0.iter().fold(m1.0, |mut acc, (k, vs)| { + acc.insert(k.clone(), vs.clone()); + acc + }); + let mut y = vec![]; + y.extend(m1.1); + y.extend(m2.1); + (x, y) + }, + ); - accounts_index.scan_accounts(&no_ancestors, |pubkey, (account_info, slot)| { - if account_info.lamports == 0 { - purges.insert(*pubkey, accounts_index.would_purge(pubkey)); - } else if accounts_index.uncleaned_roots.contains(&slot) { - purges_in_root.push(*pubkey); - } - }); drop(accounts_index); + accounts_scan.stop(); + let mut clean_old_rooted = Measure::start("clean_old_roots"); if !purges_in_root.is_empty() { self.clean_old_rooted_accounts(purges_in_root); } self.clear_uncleaned_roots(); + clean_old_rooted.stop(); + let mut store_counts_time = Measure::start("store_counts"); let accounts_index = self.accounts_index.read().unwrap(); // Calculate store counts as if everything was purged @@ -741,9 +800,11 @@ impl AccountsDB { } } } + store_counts_time.stop(); // Only keep purges where the entire history of the account in the root set // can be purged. All AppendVecs for those updates are dead. + let mut purge_filter = Measure::start("purge_filter"); purges.retain(|pubkey, account_infos| { let mut would_unref_count = 0; for (_slot_id, account_info) in account_infos { @@ -756,7 +817,9 @@ impl AccountsDB { would_unref_count == accounts_index.ref_count_from_storage(&pubkey) }); + purge_filter.stop(); + let mut reclaims_time = Measure::start("reclaims"); // Recalculate reclaims with new purge set let mut reclaims = Vec::new(); let mut dead_keys = Vec::new(); @@ -768,8 +831,8 @@ impl AccountsDB { reclaims.extend(new_reclaims); } - drop(accounts_index); drop(storage); + drop(accounts_index); if !dead_keys.is_empty() { let mut accounts_index = self.accounts_index.write().unwrap(); @@ -779,22 +842,47 @@ impl AccountsDB { } self.handle_reclaims(&reclaims); + reclaims_time.stop(); + debug!( + "clean_accounts: {} {} {} {}", + accounts_scan, store_counts_time, purge_filter, reclaims_time + ); } fn handle_reclaims(&self, reclaims: &[(Slot, AccountInfo)]) { let mut dead_accounts = Measure::start("reclaims::remove_dead_accounts"); - let mut dead_slots = self.remove_dead_accounts(reclaims); + let dead_slots = self.remove_dead_accounts(reclaims); dead_accounts.stop(); + let dead_slots_len = { + let mut dead_slots_w = self.dead_slots.write().unwrap(); + dead_slots_w.extend(dead_slots); + dead_slots_w.len() + }; + if dead_slots_len > 5000 { + self.process_dead_slots(); + } + } + + pub fn process_dead_slots(&self) { + let empty = HashSet::new(); + let mut dead_slots_w = self.dead_slots.write().unwrap(); + let dead_slots = std::mem::replace(&mut *dead_slots_w, empty); + drop(dead_slots_w); let mut clean_dead_slots = Measure::start("reclaims::purge_slots"); - self.clean_dead_slots(&mut dead_slots); + self.clean_dead_slots(&dead_slots); clean_dead_slots.stop(); let mut purge_slots = Measure::start("reclaims::purge_slots"); - for slot in dead_slots { - self.purge_slot(slot); - } + self.purge_slots(&dead_slots); purge_slots.stop(); + + debug!( + "process_dead_slots({}): {} {}", + dead_slots.len(), + clean_dead_slots, + purge_slots + ); } pub fn scan_accounts(&self, ancestors: &HashMap, scan_func: F) -> A @@ -884,10 +972,10 @@ impl AccountsDB { pubkey: &Pubkey, ) -> Option<(Account, Slot)> { let (lock, index) = accounts_index.get(pubkey, ancestors)?; - let slot = lock.1[index].0; + let slot = lock[index].0; //TODO: thread this as a ref if let Some(slot_storage) = storage.0.get(&slot) { - let info = &lock.1[index].1; + let info = &lock[index].1; slot_storage .get(&info.store_id) .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) @@ -965,10 +1053,22 @@ impl AccountsDB { } pub fn purge_slot(&self, slot: Slot) { + let mut slots = HashSet::new(); + slots.insert(slot); + self.purge_slots(&slots); + } + + pub fn purge_slots(&self, slots: &HashSet) { //add_root should be called first - let is_root = self.accounts_index.read().unwrap().is_root(slot); - if !is_root { - self.storage.write().unwrap().0.remove(&slot); + let accounts_index = self.accounts_index.read().unwrap(); + let non_roots: Vec<_> = slots + .iter() + .filter(|slot| !accounts_index.is_root(**slot)) + .collect(); + drop(accounts_index); + let mut storage = self.storage.write().unwrap(); + for slot in non_roots { + storage.0.remove(&slot); } } @@ -1197,7 +1297,7 @@ impl AccountsDB { .par_iter() .filter_map(|pubkey| { if let Some((list, index)) = accounts_index.get(pubkey, ancestors) { - let (slot, account_info) = &list.1[index]; + let (slot, account_info) = &list[index]; if account_info.lamports != 0 { storage .0 @@ -1369,20 +1469,39 @@ impl AccountsDB { dead_slots } - fn clean_dead_slots(&self, dead_slots: &mut HashSet) { + pub fn clean_dead_slots(&self, dead_slots: &HashSet) { if !dead_slots.is_empty() { { let mut measure = Measure::start("clean_dead_slots-ms"); - let index = self.accounts_index.read().unwrap(); let storage = self.storage.read().unwrap(); + let mut stores: Vec> = vec![]; for slot in dead_slots.iter() { for store in storage.0.get(slot).unwrap().values() { - for account in store.accounts.accounts(0) { - index.unref_from_storage(&account.meta.pubkey); - } + stores.push(store.clone()); } } drop(storage); + datapoint_info!("clean_dead_slots", ("stores", stores.len(), i64)); + let pubkeys: Vec> = { + self.thread_pool_clean.install(|| { + stores + .into_par_iter() + .map(|store| { + let accounts = store.accounts.accounts(0); + accounts + .into_iter() + .map(|account| account.meta.pubkey) + .collect::>() + }) + .collect() + }) + }; + let index = self.accounts_index.read().unwrap(); + for pubkey_v in pubkeys { + for pubkey in pubkey_v { + index.unref_from_storage(&pubkey); + } + } drop(index); measure.stop(); inc_new_counter_info!("clean_dead_slots-unref-ms", measure.as_ms() as usize); @@ -1594,7 +1713,7 @@ impl AccountsDB { let mut counts = HashMap::new(); for slot_list in accounts_index.account_maps.values() { - for (_slot, account_entry) in slot_list.read().unwrap().1.iter() { + for (_slot, account_entry) in slot_list.1.read().unwrap().iter() { *counts.entry(account_entry.store_id).or_insert(0) += 1; } } @@ -2072,6 +2191,7 @@ pub mod tests { #[test] fn test_lazy_gc_slot() { + solana_logger::setup(); //This test is pedantic //A slot is purged when a non root bank is cleaned up. If a slot is behind root but it is //not root, it means we are retaining dead banks. @@ -2084,7 +2204,7 @@ pub mod tests { let id = { let index = accounts.accounts_index.read().unwrap(); let (list, idx) = index.get(&pubkey, &ancestors).unwrap(); - list.1[idx].1.store_id + list[idx].1.store_id }; accounts.add_root(1); @@ -2095,6 +2215,9 @@ pub mod tests { accounts.store(1, &[(&pubkey, &account)]); //slot is gone + print_accounts("pre-clean", &accounts); + accounts.clean_accounts(); + accounts.process_dead_slots(); assert!(accounts.storage.read().unwrap().0.get(&0).is_none()); //new value is there @@ -2270,7 +2393,7 @@ pub mod tests { info!("{}: accounts.accounts_index roots: {:?}", label, roots,); for (pubkey, list) in &accounts.accounts_index.read().unwrap().account_maps { info!(" key: {}", pubkey); - info!(" slots: {:?}", *list.read().unwrap()); + info!(" slots: {:?}", *list.1.read().unwrap()); } } @@ -2477,9 +2600,9 @@ pub mod tests { .account_maps .get(&pubkey) .unwrap() + .1 .read() .unwrap() - .1 .len(), 2 ); @@ -2529,6 +2652,7 @@ pub mod tests { let hash = accounts.update_accounts_hash(current_slot, &ancestors); accounts.clean_accounts(); + accounts.process_dead_slots(); assert_eq!( accounts.update_accounts_hash(current_slot, &ancestors), @@ -3333,6 +3457,7 @@ pub mod tests { current_slot += 1; assert_eq!(4, accounts.ref_count_for_pubkey(&pubkey1)); accounts.store(current_slot, &[(&pubkey1, &zero_lamport_account)]); + accounts.process_dead_slots(); assert_eq!( 3, /* == 4 - 2 + 1 */ accounts.ref_count_for_pubkey(&pubkey1) diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index d9d69f0325..243f85531d 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1,4 +1,5 @@ use solana_sdk::pubkey::Pubkey; +use std::sync::atomic::{AtomicU64, Ordering}; use std::{ collections::{HashMap, HashSet}, sync::{RwLock, RwLockReadGuard}, @@ -7,11 +8,11 @@ use std::{ pub type Slot = u64; type SlotList = Vec<(Slot, T)>; pub type RefCount = u64; -type AccountMapEntry = (RefCount, SlotList); +type AccountMapEntry = (AtomicU64, RwLock>); #[derive(Debug, Default)] pub struct AccountsIndex { - pub account_maps: HashMap>>, + pub account_maps: HashMap>, pub roots: HashSet, pub uncleaned_roots: HashSet, @@ -24,7 +25,7 @@ impl AccountsIndex { F: FnMut(&Pubkey, (&T, Slot)) -> (), { for (pubkey, list) in self.account_maps.iter() { - let list_r = &list.read().unwrap().1; + let list_r = &list.1.read().unwrap(); if let Some(index) = self.latest_slot(ancestors, &list_r) { func(pubkey, (&list_r[index].1, list_r[index].0)); } @@ -39,14 +40,14 @@ impl AccountsIndex { } pub fn would_purge(&self, pubkey: &Pubkey) -> Vec<(Slot, T)> { - let list = &self.account_maps.get(&pubkey).unwrap().read().unwrap().1; + let list = &self.account_maps.get(&pubkey).unwrap().1.read().unwrap(); self.get_rooted_entries(&list) } // filter any rooted entries and return them along with a bool that indicates // if this account has no more entries. pub fn purge(&self, pubkey: &Pubkey) -> (Vec<(Slot, T)>, bool) { - let list = &mut self.account_maps.get(&pubkey).unwrap().write().unwrap().1; + let list = &mut self.account_maps.get(&pubkey).unwrap().1.write().unwrap(); let reclaims = self.get_rooted_entries(&list); list.retain(|(slot, _)| !self.is_root(*slot)); (reclaims, list.is_empty()) @@ -72,10 +73,10 @@ impl AccountsIndex { &self, pubkey: &Pubkey, ancestors: &HashMap, - ) -> Option<(RwLockReadGuard>, usize)> { + ) -> Option<(RwLockReadGuard>, usize)> { self.account_maps.get(pubkey).and_then(|list| { - let list_r = list.read().unwrap(); - let lock = &list_r.1; + let list_r = list.1.read().unwrap(); + let lock = &list_r; let found_index = self.latest_slot(ancestors, &lock)?; Some((list_r, found_index)) }) @@ -101,7 +102,7 @@ impl AccountsIndex { let _slot_vec = self .account_maps .entry(*pubkey) - .or_insert_with(|| RwLock::new((0 as RefCount, Vec::with_capacity(32)))); + .or_insert_with(|| (AtomicU64::new(0), RwLock::new(Vec::with_capacity(32)))); self.update(slot, pubkey, account_info, reclaims); } @@ -117,15 +118,15 @@ impl AccountsIndex { reclaims: &mut Vec<(Slot, T)>, ) -> Option { if let Some(lock) = self.account_maps.get(pubkey) { - let slot_vec = &mut lock.write().unwrap(); + let mut slot_vec = &mut lock.1.write().unwrap(); // filter out other dirty entries - reclaims.extend(slot_vec.1.iter().filter(|(f, _)| *f == slot).cloned()); - slot_vec.1.retain(|(f, _)| *f != slot); + reclaims.extend(slot_vec.iter().filter(|(f, _)| *f == slot).cloned()); + slot_vec.retain(|(f, _)| *f != slot); - slot_vec.0 += 1 as RefCount; - slot_vec.1.push((slot, account_info)); + lock.0.fetch_add(1, Ordering::Relaxed); + slot_vec.push((slot, account_info)); // now, do lazy clean - self.purge_older_root_entries(&mut slot_vec.1, reclaims); + self.purge_older_root_entries(&mut slot_vec, reclaims); None } else { @@ -136,15 +137,14 @@ impl AccountsIndex { pub fn unref_from_storage(&self, pubkey: &Pubkey) { let locked_slot_vec = self.account_maps.get(pubkey); if let Some(slot_vec) = locked_slot_vec { - let mut slot_vec = slot_vec.write().unwrap(); - slot_vec.0 -= 1 as RefCount; + slot_vec.0.fetch_sub(1, Ordering::Relaxed); } } pub fn ref_count_from_storage(&self, pubkey: &Pubkey) -> RefCount { let locked_slot_vec = self.account_maps.get(pubkey); if let Some(slot_vec) = locked_slot_vec { - slot_vec.read().unwrap().0 + slot_vec.0.load(Ordering::Relaxed) } else { 0 } @@ -170,8 +170,8 @@ impl AccountsIndex { pub fn clean_rooted_entries(&self, pubkey: &Pubkey, reclaims: &mut Vec<(Slot, T)>) { if let Some(lock) = self.account_maps.get(pubkey) { - let mut slot_vec = lock.write().unwrap(); - self.purge_older_root_entries(&mut slot_vec.1, reclaims); + let mut slot_vec = lock.1.write().unwrap(); + self.purge_older_root_entries(&mut slot_vec, reclaims); } } @@ -179,8 +179,8 @@ impl AccountsIndex { let entry = self .account_maps .entry(*pubkey) - .or_insert_with(|| RwLock::new((1 as RefCount, vec![]))); - entry.write().unwrap().1.push((slot, account_info)); + .or_insert_with(|| (AtomicU64::new(1), RwLock::new(vec![]))); + entry.1.write().unwrap().push((slot, account_info)); } pub fn can_purge(max_root: Slot, slot: Slot) -> bool { @@ -262,7 +262,7 @@ mod tests { let ancestors = vec![(0, 0)].into_iter().collect(); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list.1[idx], (0, true)); + assert_eq!(list[idx], (0, true)); let mut num = 0; let mut found_key = false; @@ -295,7 +295,7 @@ mod tests { let ancestors = vec![].into_iter().collect(); index.add_root(0); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list.1[idx], (0, true)); + assert_eq!(list[idx], (0, true)); } #[test] @@ -338,14 +338,14 @@ mod tests { index.insert(0, &key.pubkey(), true, &mut gc); assert!(gc.is_empty()); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list.1[idx], (0, true)); + assert_eq!(list[idx], (0, true)); drop(list); let mut gc = Vec::new(); index.insert(0, &key.pubkey(), false, &mut gc); assert_eq!(gc, vec![(0, true)]); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list.1[idx], (0, false)); + assert_eq!(list[idx], (0, false)); } #[test] @@ -360,10 +360,10 @@ mod tests { index.insert(1, &key.pubkey(), false, &mut gc); assert!(gc.is_empty()); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list.1[idx], (0, true)); + assert_eq!(list[idx], (0, true)); let ancestors = vec![(1, 0)].into_iter().collect(); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list.1[idx], (1, false)); + assert_eq!(list[idx], (1, false)); } #[test] @@ -383,7 +383,7 @@ mod tests { assert_eq!(gc, vec![(0, true), (1, false), (2, true)]); let ancestors = vec![].into_iter().collect(); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list.1[idx], (3, true)); + assert_eq!(list[idx], (3, true)); let mut num = 0; let mut found_key = false; diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index bdfcfe98d0..01e0dd2f2b 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -2127,6 +2127,10 @@ impl Bank { pub fn clean_accounts(&self) { self.rc.accounts.accounts_db.clean_accounts(); } + + pub fn clean_dead_slots(&self) { + self.rc.accounts.accounts_db.process_dead_slots(); + } } impl Drop for Bank {