diff --git a/core/src/tvu.rs b/core/src/tvu.rs index de0a1cf63e..5cb7ee9cb2 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -35,8 +35,7 @@ use solana_rpc::{ }; use solana_runtime::{ accounts_background_service::{ - AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SendDroppedBankCallback, - SnapshotRequestHandler, + AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SnapshotRequestHandler, }, bank_forks::{BankForks, SnapshotConfig}, commitment::BlockCommitmentCache, @@ -235,10 +234,18 @@ impl Tvu { let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); // Before replay starts, set the callbacks in each of the banks in BankForks + // Note after this callback is created, only the AccountsBackgroundService should be calling + // AccountsDb::purge_slot() to clean up dropped banks. + let callback = bank_forks + .read() + .unwrap() + .root_bank() + .rc + .accounts + .accounts_db + .create_drop_bank_callback(pruned_banks_sender); for bank in bank_forks.read().unwrap().banks().values() { - bank.set_callback(Some(Box::new(SendDroppedBankCallback::new( - pruned_banks_sender.clone(), - )))); + bank.set_callback(Some(Box::new(callback.clone()))); } let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender); diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 1e15a31feb..601d4c3884 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -912,8 +912,9 @@ impl Accounts { /// Purge a slot if it is not a root /// Root slots cannot be purged - pub fn purge_slot(&self, slot: Slot) { - self.accounts_db.purge_slot(slot); + /// `is_from_abs` is true if the caller is the AccountsBackgroundService + pub fn purge_slot(&self, slot: Slot, is_from_abs: bool) { + self.accounts_db.purge_slot(slot, is_from_abs); } /// Add a slot to root. Root slots cannot be purged diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 8a946d5211..277e25f918 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -263,11 +263,12 @@ impl AbsRequestHandler { }) } - pub fn handle_pruned_banks(&self, bank: &Bank) -> usize { + /// `is_from_abs` is true if the caller is the AccountsBackgroundService + pub fn handle_pruned_banks(&self, bank: &Bank, is_from_abs: bool) -> usize { let mut count = 0; for pruned_slot in self.pruned_banks_receiver.try_iter() { count += 1; - bank.rc.accounts.purge_slot(pruned_slot); + bank.rc.accounts.purge_slot(pruned_slot, is_from_abs); } count @@ -393,7 +394,7 @@ impl AccountsBackgroundService { total_remove_slots_time: &mut u64, ) { let mut remove_slots_time = Measure::start("remove_slots_time"); - *removed_slots_count += request_handler.handle_pruned_banks(&bank); + *removed_slots_count += request_handler.handle_pruned_banks(&bank, true); remove_slots_time.stop(); *total_remove_slots_time += remove_slots_time.as_us(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index e09c192ace..2bfc637750 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -19,6 +19,7 @@ //! commit for each slot entry would be indexed. use crate::{ + accounts_background_service::{DroppedSlotsSender, SendDroppedBankCallback}, accounts_cache::{AccountsCache, CachedAccount, SlotCache}, accounts_hash::{AccountsHash, CalculateHashIntermediate, HashStats, PreviousPass}, accounts_index::{ @@ -828,6 +829,8 @@ pub struct AccountsDb { #[cfg(test)] load_limit: AtomicU64, + + is_bank_drop_callback_enabled: AtomicBool, } #[derive(Debug, Default)] @@ -858,7 +861,8 @@ struct AccountsStats { struct PurgeStats { last_report: AtomicU64, safety_checks_elapsed: AtomicU64, - remove_storages_elapsed: AtomicU64, + remove_cache_elapsed: AtomicU64, + remove_storage_entries_elapsed: AtomicU64, drop_storage_entries_elapsed: AtomicU64, num_cached_slots_removed: AtomicUsize, num_stored_slots_removed: AtomicUsize, @@ -866,6 +870,9 @@ struct PurgeStats { total_removed_cached_bytes: AtomicU64, total_removed_stored_bytes: AtomicU64, recycle_stores_write_elapsed: AtomicU64, + scan_storages_elasped: AtomicU64, + purge_accounts_index_elapsed: AtomicU64, + handle_reclaims_elapsed: AtomicU64, } impl PurgeStats { @@ -893,8 +900,14 @@ impl PurgeStats { i64 ), ( - "remove_storages_elapsed", - self.remove_storages_elapsed.swap(0, Ordering::Relaxed) as i64, + "remove_cache_elapsed", + self.remove_cache_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "remove_storage_entries_elapsed", + self.remove_storage_entries_elapsed + .swap(0, Ordering::Relaxed) as i64, i64 ), ( @@ -933,6 +946,21 @@ impl PurgeStats { self.recycle_stores_write_elapsed.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "scan_storages_elasped", + self.scan_storages_elasped.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "purge_accounts_index_elapsed", + self.purge_accounts_index_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "handle_reclaims_elapsed", + self.handle_reclaims_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), ); } } @@ -1232,6 +1260,7 @@ impl Default for AccountsDb { load_delay: u64::default(), #[cfg(test)] load_limit: AtomicU64::default(), + is_bank_drop_callback_enabled: AtomicBool::default(), } } } @@ -1359,7 +1388,7 @@ impl AccountsDb { self.handle_reclaims( &reclaims, None, - false, + Some(&self.clean_accounts_stats.purge_stats), Some(&mut reclaim_result), reset_accounts, ); @@ -1472,7 +1501,7 @@ impl AccountsDb { fn purge_keys_exact<'a, C: 'a>( &'a self, - pubkey_to_slot_set: &'a [(Pubkey, C)], + pubkey_to_slot_set: impl Iterator, ) -> Vec<(u64, AccountInfo)> where C: Contains<'a, Slot>, @@ -1780,14 +1809,20 @@ impl AccountsDb { }) .collect(); - let reclaims = self.purge_keys_exact(&pubkey_to_slot_set); + let reclaims = self.purge_keys_exact(pubkey_to_slot_set.iter()); // Don't reset from clean, since the pubkeys in those stores may need to be unref'ed // and those stores may be used for background hashing. let reset_accounts = false; let mut reclaim_result = ReclaimResult::default(); let reclaim_result = Some(&mut reclaim_result); - self.handle_reclaims(&reclaims, None, false, reclaim_result, reset_accounts); + self.handle_reclaims( + &reclaims, + None, + Some(&self.clean_accounts_stats.purge_stats), + reclaim_result, + reset_accounts, + ); reclaims_time.stop(); @@ -1823,7 +1858,9 @@ impl AccountsDb { /// remove all the storage entries for `S`. /// /// # Arguments - /// * `reclaims` - The accounts to remove from storage entries' "count" + /// * `reclaims` - The accounts to remove from storage entries' "count". Note here + /// that we should not remove cache entries, only entries for accounts actually + /// stored in a storage entry. /// /// * `expected_single_dead_slot` - A correctness assertion. If this is equal to `Some(S)`, /// then the function will check that the only slot being cleaned up in `reclaims` @@ -1831,13 +1868,16 @@ impl AccountsDb { /// from store or slot shrinking, as those should only touch the slot they are /// currently storing to or shrinking. /// - /// * `no_dead_slot` - A correctness assertion. If this is equal to - /// `false`, the function will check that no slots are cleaned up/removed via - /// `process_dead_slots`. For instance, on store, no slots should be cleaned up, - /// but during the background clean accounts purges accounts from old rooted slots, - /// so outdated slots may be removed. + /// * `purge_stats` - The stats used to track performance of purging dead slots. This + /// also serves a correctness assertion. If `purge_stats.is_none()`, this implies + /// there can be no dead slots that happen as a result of this call, and the function + /// will check that no slots are cleaned up/removed via `process_dead_slots`. For instance, + /// on store, no slots should be cleaned up, but during the background clean accounts + /// purges accounts from old rooted slots, so outdated slots may be removed. + /// /// * `reclaim_result` - Information about accounts that were removed from storage, does /// not include accounts that were removed from the cache + /// /// * `reset_accounts` - Reset the append_vec store when the store is dead (count==0) /// From the clean and shrink paths it should be false since there may be an in-progress /// hash operation and the stores may hold accounts that need to be unref'ed. @@ -1845,7 +1885,9 @@ impl AccountsDb { &self, reclaims: SlotSlice, expected_single_dead_slot: Option, - no_dead_slot: bool, + // TODO: coalesce `purge_stats` and `reclaim_result` together into one option, as they + // are both either Some or None + purge_stats: Option<&PurgeStats>, reclaim_result: Option<&mut ReclaimResult>, reset_accounts: bool, ) { @@ -1864,7 +1906,7 @@ impl AccountsDb { reclaimed_offsets, reset_accounts, ); - if no_dead_slot { + if purge_stats.is_none() { assert!(dead_slots.is_empty()); } else if let Some(expected_single_dead_slot) = expected_single_dead_slot { assert!(dead_slots.len() <= 1); @@ -1872,7 +1914,10 @@ impl AccountsDb { assert!(dead_slots.contains(&expected_single_dead_slot)); } } - self.process_dead_slots(&dead_slots, purged_account_slots); + + if let Some(purge_stats) = purge_stats { + self.process_dead_slots(&dead_slots, purged_account_slots, purge_stats); + } } // Must be kept private!, does sensitive cleanup that should only be called from @@ -1881,6 +1926,7 @@ impl AccountsDb { &self, dead_slots: &HashSet, purged_account_slots: Option<&mut AccountSlots>, + purge_stats: &PurgeStats, ) { if dead_slots.is_empty() { return; @@ -1890,7 +1936,7 @@ impl AccountsDb { clean_dead_slots.stop(); let mut purge_removed_slots = Measure::start("reclaims::purge_removed_slots"); - self.purge_storage_slots(&dead_slots); + self.purge_dead_slots_from_storage(dead_slots.iter(), purge_stats); purge_removed_slots.stop(); // If the slot is dead, remove the need to shrink the storages as @@ -2586,7 +2632,11 @@ impl AccountsDb { // | | // V | // P3 purge_slots_from_cache_and_store()/ | index - // purge_slot_cache_pubkeys() | (removes existing store_id, offset for caches) + // purge_slot_cache()/ | + // purge_slot_cache_pubkeys() | (removes existing store_id, offset for cache) + // purge_slot_storage()/ | + // purge_keys_exact() | (removes accounts index entries) + // handle_reclaims() | (removes storage entries) // OR | // clean_accounts()/ | // clean_accounts_older_than_root()| (removes existing store_id, offset for stores) @@ -3071,7 +3121,20 @@ impl AccountsDb { .is_none()); } - pub fn purge_slot(&self, slot: Slot) { + pub fn create_drop_bank_callback( + &self, + pruned_banks_sender: DroppedSlotsSender, + ) -> SendDroppedBankCallback { + self.is_bank_drop_callback_enabled + .store(true, Ordering::SeqCst); + SendDroppedBankCallback::new(pruned_banks_sender) + } + + /// `is_from_abs` is true if the caller is the AccountsBackgroundService + pub fn purge_slot(&self, slot: Slot, is_from_abs: bool) { + if self.is_bank_drop_callback_enabled.load(Ordering::SeqCst) && !is_from_abs { + panic!("bad drop callpath detected; Bank::drop() must run serially with other logic in ABS like clean_accounts()") + } let mut slots = HashSet::new(); slots.insert(slot); self.purge_slots(&slots); @@ -3105,56 +3168,90 @@ impl AccountsDb { recycle_stores_write_elapsed.as_us() } + /// Purges every slot in `removed_slots` from both the cache and storage. This includes + /// entries in the accounts index, cache entries, and any backing storage entries. fn purge_slots_from_cache_and_store<'a>( &'a self, - can_exist_in_cache: bool, removed_slots: impl Iterator, purge_stats: &PurgeStats, ) { - let mut remove_storages_elapsed = Measure::start("remove_storages_elapsed"); - let mut all_removed_slot_storages = vec![]; + let mut remove_cache_elapsed_across_slots = 0; let mut num_cached_slots_removed = 0; let mut total_removed_cached_bytes = 0; - let mut total_removed_storage_entries = 0; - let mut total_removed_stored_bytes = 0; for remove_slot in removed_slots { + // This function is only currently safe with respect to `flush_slot_cache()` because + // both functions run serially in AccountsBackgroundService. + let mut remove_cache_elapsed = Measure::start("remove_cache_elapsed"); if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) { // If the slot is still in the cache, remove the backing storages for // the slot and from the Accounts Index - if !can_exist_in_cache { - panic!("The removed slot must alrady have been flushed from the cache"); - } num_cached_slots_removed += 1; total_removed_cached_bytes += slot_cache.total_bytes(); self.purge_slot_cache(*remove_slot, slot_cache); - } else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) { - // Because AccountsBackgroundService synchronously flushes from the accounts cache - // and handles all Bank::drop() (the cleanup function that leads to this - // function call), then we don't need to worry above an overlapping cache flush - // with this function call. This means, if we get into this case, we can be - // confident that the entire state for this slot has been flushed to the storage - // already. + remove_cache_elapsed.stop(); + remove_cache_elapsed_across_slots += remove_cache_elapsed.as_us(); + } else { + self.purge_slot_storage(*remove_slot, purge_stats); + } + // It should not be possible that a slot is neither in the cache or storage. Even in + // a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars + // on bank creation. + } - // Note this only cleans up the storage entries. The accounts index cleaning - // (removing from the slot list, decrementing the account ref count), is handled in - // clean_accounts() -> purge_older_root_entries() + purge_stats + .remove_cache_elapsed + .fetch_add(remove_cache_elapsed_across_slots, Ordering::Relaxed); + purge_stats + .num_cached_slots_removed + .fetch_add(num_cached_slots_removed, Ordering::Relaxed); + purge_stats + .total_removed_cached_bytes + .fetch_add(total_removed_cached_bytes, Ordering::Relaxed); + } + + /// Purge the backing storage entries for the given slot, does not purge from + /// the cache! + fn purge_dead_slots_from_storage<'a>( + &'a self, + removed_slots: impl Iterator + Clone, + purge_stats: &PurgeStats, + ) { + // Check all slots `removed_slots` are no longer "relevant" roots. + // Note that the slots here could have been rooted slots, but if they're passed here + // for removal it means: + // 1) All updates in that old root have been outdated by updates in newer roots + // 2) Those slots/roots should have already been purged from the accounts index root + // tracking metadata via `accounts_index.clean_dead_slot()`. + let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed"); + assert!(self + .accounts_index + .get_rooted_from_list(removed_slots.clone()) + .is_empty()); + safety_checks_elapsed.stop(); + purge_stats + .safety_checks_elapsed + .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed); + + let mut total_removed_storage_entries = 0; + let mut total_removed_stored_bytes = 0; + let mut all_removed_slot_storages = vec![]; + + let mut remove_storage_entries_elapsed = Measure::start("remove_storage_entries_elapsed"); + for remove_slot in removed_slots { + // Remove the storage entries and collect some metrics + if let Some((_, slot_storages_to_be_removed)) = self.storage.0.remove(&remove_slot) { { - let r_slot_removed_storages = slot_removed_storages.read().unwrap(); + let r_slot_removed_storages = slot_storages_to_be_removed.read().unwrap(); total_removed_storage_entries += r_slot_removed_storages.len(); total_removed_stored_bytes += r_slot_removed_storages .values() .map(|i| i.accounts.capacity()) .sum::(); } - all_removed_slot_storages.push(slot_removed_storages.clone()); + all_removed_slot_storages.push(slot_storages_to_be_removed.clone()); } - - // It should not be possible that a slot is neither in the cache or storage. Even in - // a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars - // on bank creation. } - remove_storages_elapsed.stop(); - + remove_storage_entries_elapsed.stop(); let num_stored_slots_removed = all_removed_slot_storages.len(); let recycle_stores_write_elapsed = @@ -3165,19 +3262,12 @@ impl AccountsDb { // of any locks drop(all_removed_slot_storages); drop_storage_entries_elapsed.stop(); - purge_stats - .remove_storages_elapsed - .fetch_add(remove_storages_elapsed.as_us(), Ordering::Relaxed); + .remove_storage_entries_elapsed + .fetch_add(remove_storage_entries_elapsed.as_us(), Ordering::Relaxed); purge_stats .drop_storage_entries_elapsed .fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed); - purge_stats - .num_cached_slots_removed - .fetch_add(num_cached_slots_removed, Ordering::Relaxed); - purge_stats - .total_removed_cached_bytes - .fetch_add(total_removed_cached_bytes, Ordering::Relaxed); purge_stats .num_stored_slots_removed .fetch_add(num_stored_slots_removed, Ordering::Relaxed); @@ -3192,24 +3282,6 @@ impl AccountsDb { .fetch_add(recycle_stores_write_elapsed, Ordering::Relaxed); } - fn purge_storage_slots(&self, removed_slots: &HashSet) { - // Check all slots `removed_slots` are no longer rooted - let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed"); - for slot in removed_slots.iter() { - assert!(!self.accounts_index.is_root(*slot)) - } - safety_checks_elapsed.stop(); - self.clean_accounts_stats - .purge_stats - .safety_checks_elapsed - .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed); - self.purge_slots_from_cache_and_store( - false, - removed_slots.iter(), - &self.clean_accounts_stats.purge_stats, - ); - } - fn purge_slot_cache(&self, purged_slot: Slot, slot_cache: SlotCache) { let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new(); let pubkey_to_slot_set: Vec<(Pubkey, Slot)> = slot_cache @@ -3232,10 +3304,10 @@ impl AccountsDb { // Slot purged from cache should not exist in the backing store assert!(self.storage.get_slot_stores(purged_slot).is_none()); let num_purged_keys = pubkey_to_slot_set.len(); - let reclaims = self.purge_keys_exact(&pubkey_to_slot_set); + let reclaims = self.purge_keys_exact(pubkey_to_slot_set.iter()); assert_eq!(reclaims.len(), num_purged_keys); if is_dead { - self.finalize_dead_slot_removal( + self.remove_dead_slots_metadata( std::iter::once(&purged_slot), purged_slot_pubkeys, None, @@ -3243,6 +3315,67 @@ impl AccountsDb { } } + fn purge_slot_storage(&self, remove_slot: Slot, purge_stats: &PurgeStats) { + // Because AccountsBackgroundService synchronously flushes from the accounts cache + // and handles all Bank::drop() (the cleanup function that leads to this + // function call), then we don't need to worry above an overlapping cache flush + // with this function call. This means, if we get into this case, we can be + // confident that the entire state for this slot has been flushed to the storage + // already. + let mut scan_storages_elasped = Measure::start("scan_storages_elasped"); + type ScanResult = ScanStorageResult>>>; + let scan_result: ScanResult = self.scan_account_storage( + remove_slot, + |loaded_account: LoadedAccount| Some(*loaded_account.pubkey()), + |accum: &Arc>>, loaded_account: LoadedAccount| { + accum + .lock() + .unwrap() + .insert((*loaded_account.pubkey(), remove_slot)); + }, + ); + scan_storages_elasped.stop(); + purge_stats + .scan_storages_elasped + .fetch_add(scan_storages_elasped.as_us(), Ordering::Relaxed); + + let mut purge_accounts_index_elapsed = Measure::start("purge_accounts_index_elapsed"); + let reclaims; + match scan_result { + ScanStorageResult::Cached(_) => { + panic!("Should not see cached keys in this `else` branch, since we checked this slot did not exist in the cache above"); + } + ScanStorageResult::Stored(stored_keys) => { + // Purge this slot from the accounts index + reclaims = self.purge_keys_exact(stored_keys.lock().unwrap().iter()); + } + } + purge_accounts_index_elapsed.stop(); + purge_stats + .purge_accounts_index_elapsed + .fetch_add(purge_accounts_index_elapsed.as_us(), Ordering::Relaxed); + + // `handle_reclaims()` should remove all the account index entries and + // storage entries + let mut handle_reclaims_elapsed = Measure::start("handle_reclaims_elapsed"); + // Slot should be dead after removing all its account entries + let expected_dead_slot = Some(remove_slot); + self.handle_reclaims( + &reclaims, + expected_dead_slot, + Some(purge_stats), + Some(&mut ReclaimResult::default()), + false, + ); + handle_reclaims_elapsed.stop(); + purge_stats + .handle_reclaims_elapsed + .fetch_add(handle_reclaims_elapsed.as_us(), Ordering::Relaxed); + // After handling the reclaimed entries, this slot's + // storage entries should be purged from self.storage + assert!(self.storage.get_slot_stores(remove_slot).is_none()); + } + #[allow(clippy::needless_collect)] fn purge_slots(&self, slots: &HashSet) { // `add_root()` should be called first @@ -3256,7 +3389,6 @@ impl AccountsDb { .safety_checks_elapsed .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed); self.purge_slots_from_cache_and_store( - true, non_roots.into_iter(), &self.external_purge_slots_stats, ); @@ -3273,11 +3405,6 @@ impl AccountsDb { panic!("Trying to remove accounts for rooted slot {}", remove_slot); } - if let Some(slot_cache) = self.accounts_cache.remove_slot(remove_slot) { - // If the slot is still in the cache, remove it from the cache - self.purge_slot_cache(remove_slot, slot_cache); - } - // TODO: Handle if the slot was flushed to storage while we were removing the cached // slot above, i.e. it's possible the storage contains partial version of the current // slot. One way to handle this is to augment slots to contain a "version", That way, @@ -3287,37 +3414,12 @@ impl AccountsDb { // Reads will then always read the latest version of a slot. Scans will also know // which version their parents because banks will also be augmented with this version, // which handles cases where a deletion of one version happens in the middle of the scan. - let scan_result: ScanStorageResult> = self.scan_account_storage( - remove_slot, - |loaded_account: LoadedAccount| Some(*loaded_account.pubkey()), - |accum: &DashSet, loaded_account: LoadedAccount| { - accum.insert(*loaded_account.pubkey()); - }, + let remove_unrooted_purge_stats = PurgeStats::default(); + self.purge_slots_from_cache_and_store( + std::iter::once(&remove_slot), + &remove_unrooted_purge_stats, ); - - // Purge this slot from the accounts index - let purge_slot: HashSet = vec![remove_slot].into_iter().collect(); - let mut reclaims = vec![]; - match scan_result { - ScanStorageResult::Cached(cached_keys) => { - for pubkey in cached_keys.iter() { - self.accounts_index - .purge_exact(pubkey, &purge_slot, &mut reclaims); - } - } - ScanStorageResult::Stored(stored_keys) => { - for set_ref in stored_keys.iter() { - self.accounts_index - .purge_exact(set_ref.key(), &purge_slot, &mut reclaims); - } - } - } - - self.handle_reclaims(&reclaims, Some(remove_slot), false, None, false); - - // After handling the reclaimed entries, this slot's - // storage entries should be purged from self.storage - assert!(self.storage.get_slot_stores(remove_slot).is_none()); + remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", Some(0)); } pub fn hash_stored_account(slot: Slot, account: &StoredAccountMeta) -> Hash { @@ -4534,7 +4636,27 @@ impl AccountsDb { dead_slots } - fn finalize_dead_slot_removal<'a>( + fn remove_dead_slots_metadata<'a>( + &'a self, + dead_slots_iter: impl Iterator + Clone, + purged_slot_pubkeys: HashSet<(Slot, Pubkey)>, + // Should only be `Some` for non-cached slots + purged_stored_account_slots: Option<&mut AccountSlots>, + ) { + self.clean_dead_slots_from_accounts_index( + dead_slots_iter.clone(), + purged_slot_pubkeys, + purged_stored_account_slots, + ); + { + let mut bank_hashes = self.bank_hashes.write().unwrap(); + for slot in dead_slots_iter { + bank_hashes.remove(slot); + } + } + } + + fn clean_dead_slots_from_accounts_index<'a>( &'a self, dead_slots_iter: impl Iterator + Clone, purged_slot_pubkeys: HashSet<(Slot, Pubkey)>, @@ -4555,7 +4677,6 @@ impl AccountsDb { let mut rooted_cleaned_count = 0; let mut unrooted_cleaned_count = 0; let dead_slots: Vec<_> = dead_slots_iter - .clone() .map(|slot| { if let Some(latest) = self.accounts_index.clean_dead_slot(*slot) { rooted_cleaned_count += 1; @@ -4566,7 +4687,7 @@ impl AccountsDb { *slot }) .collect(); - info!("finalize_dead_slot_removal: slots {:?}", dead_slots); + info!("remove_dead_slots_metadata: slots {:?}", dead_slots); accounts_index_root_stats.rooted_cleaned_count += rooted_cleaned_count; accounts_index_root_stats.unrooted_cleaned_count += unrooted_cleaned_count; @@ -4574,13 +4695,6 @@ impl AccountsDb { self.clean_accounts_stats .latest_accounts_index_roots_stats .update(&accounts_index_root_stats); - - { - let mut bank_hashes = self.bank_hashes.write().unwrap(); - for slot in dead_slots_iter { - bank_hashes.remove(slot); - } - } } fn clean_stored_dead_slots( @@ -4614,7 +4728,7 @@ impl AccountsDb { }) }) }; - self.finalize_dead_slot_removal( + self.remove_dead_slots_metadata( dead_slots.iter(), purged_slot_pubkeys, purged_account_slots, @@ -4957,9 +5071,11 @@ impl AccountsDb { // a) this slot has at least one account (the one being stored), // b)From 1) we know no other slots are included in the "reclaims" // - // From 1) and 2) we guarantee passing Some(slot), true is safe + // From 1) and 2) we guarantee passing `no_purge_stats` == None, which is + // equivalent to asserting there will be no dead slots, is safe. + let no_purge_stats = None; let mut handle_reclaims_time = Measure::start("handle_reclaims"); - self.handle_reclaims(&reclaims, Some(slot), true, None, reset_accounts); + self.handle_reclaims(&reclaims, Some(slot), no_purge_stats, None, reset_accounts); handle_reclaims_time.stop(); self.stats .store_handle_reclaims @@ -6297,15 +6413,18 @@ pub mod tests { ); } - #[test] - fn test_remove_unrooted_slot() { + fn run_test_remove_unrooted_slot(is_cached: bool) { let unrooted_slot = 9; let mut db = AccountsDb::new(Vec::new(), &ClusterType::Development); db.caching_enabled = true; let key = Pubkey::default(); let account0 = AccountSharedData::new(1, 0, &key); let ancestors = vec![(unrooted_slot, 1)].into_iter().collect(); - db.store_cached(unrooted_slot, &[(&key, &account0)]); + if is_cached { + db.store_cached(unrooted_slot, &[(&key, &account0)]); + } else { + db.store_uncached(unrooted_slot, &[(&key, &account0)]); + } db.bank_hashes .write() .unwrap() @@ -6320,12 +6439,9 @@ pub mod tests { db.remove_unrooted_slot(unrooted_slot); assert!(db.load_without_fixed_root(&ancestors, &key).is_none()); assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none()); + assert!(db.accounts_cache.slot_cache(unrooted_slot).is_none()); assert!(db.storage.0.get(&unrooted_slot).is_none()); - assert!(db - .accounts_index - .get_account_read_entry(&key) - .map(|locked_entry| locked_entry.slot_list().is_empty()) - .unwrap_or(true)); + assert!(db.accounts_index.get_account_read_entry(&key).is_none()); assert!(db .accounts_index .get(&key, Some(&ancestors), None) @@ -6337,6 +6453,16 @@ pub mod tests { assert_load_account(&db, unrooted_slot, key, 2); } + #[test] + fn test_remove_unrooted_slot_cached() { + run_test_remove_unrooted_slot(true); + } + + #[test] + fn test_remove_unrooted_slot_storage() { + run_test_remove_unrooted_slot(false); + } + #[test] fn test_remove_unrooted_slot_snapshot() { solana_logger::setup(); @@ -7628,7 +7754,7 @@ pub mod tests { let slots: HashSet = vec![1].into_iter().collect(); let purge_keys = vec![(key1, slots)]; - db.purge_keys_exact(&purge_keys); + db.purge_keys_exact(purge_keys.iter()); let account2 = AccountSharedData::new(3, 0, &key); db.store_uncached(2, &[(&key1, &account2)]); @@ -9493,7 +9619,7 @@ pub mod tests { assert_eq!(account.0.lamports(), slot1_account.lamports()); // Simulate dropping the bank, which finally removes the slot from the cache - db.purge_slot(1); + db.purge_slot(1, false); assert!(db .do_load( &scan_ancestors, @@ -10364,76 +10490,6 @@ pub mod tests { do_test_load_account_and_shrink_race(false); } - fn do_test_load_account_and_purge_race(with_retry: bool) { - let caching_enabled = true; - let mut db = AccountsDb::new_with_config( - Vec::new(), - &ClusterType::Development, - AccountSecondaryIndexes::default(), - caching_enabled, - ); - db.load_delay = RACY_SLEEP_MS; - let db = Arc::new(db); - let pubkey = - Arc::new(Pubkey::from_str("CiDwVBFgWV9E5MvXWoLgnEgn2hK7rJikbvfWavzAQz3").unwrap()); - let exit = Arc::new(AtomicBool::new(false)); - let slot = 1; - - // Store an account - let lamports = 42; - let mut account = AccountSharedData::new(1, 0, AccountSharedData::default().owner()); - account.set_lamports(lamports); - db.store_uncached(slot, &[(&pubkey, &account)]); - - let t_purge_slot = { - let db = db.clone(); - let exit = exit.clone(); - - std::thread::Builder::new() - .name("account-purge".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - return; - } - // Simulate purge_slots() - db.purge_slot(slot); - sleep(Duration::from_millis(RACY_SLEEP_MS)); - }) - .unwrap() - }; - - let ancestors: Ancestors = vec![(slot, 0)].into_iter().collect(); - let t_do_load = - start_load_thread(with_retry, ancestors, db, exit.clone(), pubkey, move |_| { - lamports - }); - - sleep(Duration::from_secs(RACE_TIME)); - exit.store(true, Ordering::Relaxed); - t_purge_slot.join().unwrap(); - // Propagate expected panic! occurred in the do_load thread - t_do_load.join().map_err(std::panic::resume_unwind).unwrap() - } - - #[test] - #[should_panic(expected = "assertion failed: load_hint == LoadHint::Unspecified")] - fn test_load_account_and_purge_race_with_retry() { - // this tests impossible situation in the wild, so panic is expected - // Conversely, we show that we're preventing this race condition from occurring - do_test_load_account_and_purge_race(true); - } - - #[test] - #[ignore] - #[should_panic( - expected = "Bad index entry detected (CiDwVBFgWV9E5MvXWoLgnEgn2hK7rJikbvfWavzAQz3, 1, 0, 0, Unspecified)" - )] - fn test_load_account_and_purge_race_without_retry() { - // this tests impossible situation in the wild, so panic is expected - // Conversely, we show that we're preventing this race condition from occurring - do_test_load_account_and_purge_race(false); - } - #[test] fn test_collect_uncleaned_slots_up_to_slot() { solana_logger::setup(); diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index a307108594..2e7de4f28c 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1388,6 +1388,20 @@ impl AccountsIndex { slot < max_clean_root && slot != newest_root_in_slot_list } + /// Given a list of slots, return a new list of only the slots that are rooted + pub fn get_rooted_from_list<'a>(&self, slots: impl Iterator) -> Vec { + let roots_tracker = self.roots_tracker.read().unwrap(); + slots + .filter_map(|s| { + if roots_tracker.roots.contains(s) { + Some(*s) + } else { + None + } + }) + .collect() + } + pub fn is_root(&self, slot: Slot) -> bool { self.roots_tracker.read().unwrap().roots.contains(&slot) } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 66fc7d821f..49ee0bcda5 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -5172,24 +5172,6 @@ impl Bank { .is_active(&feature_set::consistent_recent_blockhashes_sysvar::id()), } } - - /// Bank cleanup - /// - /// If the bank is unfrozen and then dropped, additional cleanup is needed. In particular, - /// cleaning up the pubkeys that are only in this bank. To do that, call into AccountsDb to - /// scan for dirty pubkeys and add them to the uncleaned pubkeys list so they will be cleaned - /// up in AccountsDb::clean_accounts(). - fn cleanup(&self) { - if self.is_frozen() { - // nothing to do here - return; - } - - self.rc - .accounts - .accounts_db - .scan_slot_and_insert_dirty_pubkeys_into_uncleaned_pubkeys(self.slot); - } } impl Drop for Bank { @@ -5198,8 +5180,6 @@ impl Drop for Bank { return; } - self.cleanup(); - if let Some(drop_callback) = self.drop_callback.read().unwrap().0.as_ref() { drop_callback.callback(self); } else { @@ -5207,7 +5187,7 @@ impl Drop for Bank { // 1. Tests // 2. At startup when replaying blockstore and there's no // AccountsBackgroundService to perform cleanups yet. - self.rc.accounts.purge_slot(self.slot()); + self.rc.accounts.purge_slot(self.slot(), false); } } } @@ -5246,6 +5226,7 @@ fn is_simple_vote_transaction(transaction: &Transaction) -> bool { pub(crate) mod tests { use super::*; use crate::{ + accounts_background_service::{AbsRequestHandler, SendDroppedBankCallback}, accounts_db::SHRINK_RATIO, accounts_index::{AccountIndex, AccountMap, AccountSecondaryIndexes, ITER_BATCH_SIZE}, ancestors::Ancestors, @@ -5257,7 +5238,7 @@ pub(crate) mod tests { native_loader::NativeLoaderError, status_cache::MAX_CACHE_ENTRIES, }; - use crossbeam_channel::bounded; + use crossbeam_channel::{bounded, unbounded}; use solana_sdk::{ account::Account, account_utils::StateMut, @@ -11826,8 +11807,11 @@ pub(crate) mod tests { assert!(!debug.is_empty()); } - fn test_store_scan_consistency(accounts_db_caching_enabled: bool, update_f: F) - where + fn test_store_scan_consistency( + accounts_db_caching_enabled: bool, + update_f: F, + drop_callback: Option>, + ) where F: Fn(Arc, crossbeam_channel::Sender>, Arc>, Pubkey, u64) + std::marker::Send, { @@ -11844,6 +11828,7 @@ pub(crate) mod tests { AccountSecondaryIndexes::default(), accounts_db_caching_enabled, )); + bank0.set_callback(drop_callback); // Set up pubkeys to write to let total_pubkeys = ITER_BATCH_SIZE * 10; @@ -11940,9 +11925,18 @@ pub(crate) mod tests { #[test] fn test_store_scan_consistency_unrooted() { for accounts_db_caching_enabled in &[false, true] { + let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); + let abs_request_handler = AbsRequestHandler { + snapshot_request_handler: None, + pruned_banks_receiver, + }; test_store_scan_consistency( *accounts_db_caching_enabled, - |bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| { + move |bank0, + bank_to_scan_sender, + pubkeys_to_modify, + program_id, + starting_lamports| { let mut current_major_fork_bank = bank0; loop { let mut current_minor_fork_bank = current_major_fork_bank.clone(); @@ -11993,7 +11987,7 @@ pub(crate) mod tests { // Send the last new bank to the scan thread to perform the scan. // Meanwhile this thread will continually set roots on a separate fork - // and squash. + // and squash/clean, purging the account entries from the minor forks /* bank 0 / \ @@ -12014,8 +12008,16 @@ pub(crate) mod tests { // Try to get cache flush/clean to overlap with the scan current_major_fork_bank.force_flush_accounts_cache(); current_major_fork_bank.clean_accounts(false, false); + // Move purge here so that Bank::drop()->purge_slots() doesn't race + // with clean. Simulates the call from AccountsBackgroundService + let is_abs_service = true; + abs_request_handler + .handle_pruned_banks(¤t_major_fork_bank, is_abs_service); } }, + Some(Box::new(SendDroppedBankCallback::new( + pruned_banks_sender.clone(), + ))), ) } } @@ -12059,6 +12061,7 @@ pub(crate) mod tests { )); } }, + None, ); } } @@ -12752,7 +12755,6 @@ pub(crate) mod tests { let key3 = Keypair::new(); // touched in both bank1 and bank2 let key4 = Keypair::new(); // in only bank1, and has zero lamports let key5 = Keypair::new(); // in both bank1 and bank2, and has zero lamports - bank0.transfer(2, &mint_keypair, &key2.pubkey()).unwrap(); bank0.freeze(); @@ -12779,12 +12781,7 @@ pub(crate) mod tests { bank2.clean_accounts(false, false); let expected_ref_count_for_cleaned_up_keys = 0; - let expected_ref_count_for_keys_only_in_slot_2 = bank2 - .rc - .accounts - .accounts_db - .accounts_index - .ref_count_from_storage(&key2.pubkey()); + let expected_ref_count_for_keys_in_both_slot1_and_slot2 = 1; assert_eq!( bank2 @@ -12820,7 +12817,7 @@ pub(crate) mod tests { .accounts_db .accounts_index .ref_count_from_storage(&key5.pubkey()), - expected_ref_count_for_keys_only_in_slot_2 + expected_ref_count_for_keys_in_both_slot1_and_slot2, ); assert_eq!(