diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 18386e2a89..dd5245fba9 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -275,6 +275,10 @@ impl AccountsBackgroundService { let snapshot_block_height = request_handler.handle_snapshot_requests(accounts_db_caching_enabled); if accounts_db_caching_enabled { + // Note that the flush will do an internal clean of the + // cache up to bank.slot(), so should be safe as long + // as any later snapshots that are taken are of + // slots >= bank.slot() bank.flush_accounts_cache_if_needed(); } @@ -300,6 +304,10 @@ impl AccountsBackgroundService { > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10)) { if accounts_db_caching_enabled { + // Note that the flush will do an internal clean of the + // cache up to bank.slot(), so should be safe as long + // as any later snapshots that are taken are of + // slots >= bank.slot() bank.force_flush_accounts_cache(); } bank.clean_accounts(true); diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index daac6d8cb1..61890bc531 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -1,7 +1,7 @@ use dashmap::DashMap; use solana_sdk::{account::Account, clock::Slot, hash::Hash, pubkey::Pubkey}; use std::{ - collections::HashSet, + collections::BTreeSet, ops::Deref, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -90,7 +90,7 @@ pub struct AccountsCache { cache: DashMap, // Queue of potentially unflushed roots. Random eviction + cache too large // could have triggered a flush of this slot already - maybe_unflushed_roots: RwLock>, + maybe_unflushed_roots: RwLock>, max_flushed_root: AtomicU64, } @@ -147,14 +147,25 @@ impl AccountsCache { } pub fn add_root(&self, root: Slot) { - self.maybe_unflushed_roots.write().unwrap().insert(root); + let max_flushed_root = self.fetch_max_flush_root(); + if root > max_flushed_root || (root == max_flushed_root && root == 0) { + self.maybe_unflushed_roots.write().unwrap().insert(root); + } } - pub fn clear_roots(&self) -> HashSet { - std::mem::replace( - &mut self.maybe_unflushed_roots.write().unwrap(), - HashSet::new(), - ) + pub fn clear_roots(&self, max_root: Option) -> BTreeSet { + let mut w_maybe_unflushed_roots = self.maybe_unflushed_roots.write().unwrap(); + if let Some(max_root) = max_root { + // `greater_than_max_root` contains all slots >= `max_root + 1`, or alternatively, + // all slots > `max_root`. Meanwhile, `w_maybe_unflushed_roots` is left with all slots + // <= `max_root`. + let greater_than_max_root = w_maybe_unflushed_roots.split_off(&(max_root + 1)); + // After the replace, `w_maybe_unflushed_roots` contains slots > `max_root`, and + // we return all slots <= `max_root` + std::mem::replace(&mut w_maybe_unflushed_roots, greater_than_max_root) + } else { + std::mem::replace(&mut *w_maybe_unflushed_roots, BTreeSet::new()) + } } // Removes slots less than or equal to `max_root`. Only safe to pass in a rooted slot, diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 296aa96534..d765823d65 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -49,7 +49,7 @@ use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ borrow::Cow, boxed::Box, - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashMap, HashSet}, convert::{TryFrom, TryInto}, io::{Error as IOError, Result as IOResult}, ops::RangeBounds, @@ -931,23 +931,30 @@ impl AccountsDB { reclaims } + fn max_clean_root(&self, proposed_clean_root: Option) -> Option { + match ( + self.accounts_index.min_ongoing_scan_root(), + proposed_clean_root, + ) { + (None, None) => None, + (Some(min_scan_root), None) => Some(min_scan_root), + (None, Some(proposed_clean_root)) => Some(proposed_clean_root), + (Some(min_scan_root), Some(proposed_clean_root)) => { + Some(std::cmp::min(min_scan_root, proposed_clean_root)) + } + } + } + // Purge zero lamport accounts and older rooted account states as garbage // collection // Only remove those accounts where the entire rooted history of the account // can be purged because there are no live append vecs in the ancestors pub fn clean_accounts(&self, max_clean_root: Option) { + let max_clean_root = self.max_clean_root(max_clean_root); + // hold a lock to prevent slot shrinking from running because it might modify some rooted // slot storages which can not happen as long as we're cleaning accounts because we're also // modifying the rooted slot storages! - let max_clean_root = match (self.accounts_index.min_ongoing_scan_root(), max_clean_root) { - (None, None) => None, - (Some(min_scan_root), None) => Some(min_scan_root), - (None, Some(max_clean_root)) => Some(max_clean_root), - (Some(min_scan_root), Some(max_clean_root)) => { - Some(std::cmp::min(min_scan_root, max_clean_root)) - } - }; - let mut candidates_v1 = self.shrink_candidate_slots_v1.lock().unwrap(); self.report_store_stats(); @@ -2408,21 +2415,37 @@ impl AccountsDB { ); } - fn purge_slot_cache_keys(&self, dead_slot: Slot, slot_cache: SlotCache) { - // Slot purged from cache should not exist in the backing store - assert!(self.storage.get_slot_stores(dead_slot).is_none()); + fn purge_slot_cache(&self, purged_slot: Slot, slot_cache: SlotCache) { let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new(); let pubkey_to_slot_set: Vec<(Pubkey, Slot)> = slot_cache .iter() .map(|account| { - purged_slot_pubkeys.insert((dead_slot, *account.key())); - (*account.key(), dead_slot) + purged_slot_pubkeys.insert((purged_slot, *account.key())); + (*account.key(), purged_slot) }) .collect(); + self.purge_slot_cache_pubkeys(purged_slot, purged_slot_pubkeys, pubkey_to_slot_set, true); + } + + fn purge_slot_cache_pubkeys( + &self, + purged_slot: Slot, + purged_slot_pubkeys: HashSet<(Slot, Pubkey)>, + pubkey_to_slot_set: Vec<(Pubkey, Slot)>, + is_dead: bool, + ) { + // Slot purged from cache should not exist in the backing store + assert!(self.storage.get_slot_stores(purged_slot).is_none()); let num_purged_keys = pubkey_to_slot_set.len(); let reclaims = self.purge_keys_exact(&pubkey_to_slot_set); assert_eq!(reclaims.len(), num_purged_keys); - self.finalize_dead_slot_removal(std::iter::once(&dead_slot), purged_slot_pubkeys, None); + if is_dead { + self.finalize_dead_slot_removal( + std::iter::once(&purged_slot), + purged_slot_pubkeys, + None, + ); + } } fn purge_slots(&self, slots: &HashSet) { @@ -2440,8 +2463,10 @@ impl AccountsDB { for remove_slot in non_roots { if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) { // If the slot is still in the cache, remove the backing storages for - // the slot and from the Accounts Index - self.purge_slot_cache_keys(*remove_slot, slot_cache); + // the slot. The accounts index cleaning (removing from the slot list, + // decrementing the account ref count), is handled in + // clean_accounts() -> purge_older_root_entries() + self.purge_slot_cache(*remove_slot, slot_cache); } else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) { // Because AccountsBackgroundService synchronously flushes from the accounts cache // and handles all Bank::drop() (the cleanup function that leads to this @@ -2518,7 +2543,7 @@ impl AccountsDB { if let Some(slot_cache) = self.accounts_cache.remove_slot(remove_slot) { // If the slot is still in the cache, remove it from the cache - self.purge_slot_cache_keys(remove_slot, slot_cache); + self.purge_slot_cache(remove_slot, slot_cache); } // TODO: Handle if the slot was flushed to storage while we were removing the cached @@ -2839,32 +2864,29 @@ impl AccountsDB { self.accounts_cache.report_size(); } - // Force flush the cached roots, flush any unrooted frozen slots as well if there are - // > MAX_CACHE_SLOTS of them. - pub fn force_flush_accounts_cache(&self) { - self.flush_accounts_cache(true); - } + // `force_flush` flushes all the cached roots `<= max_clean_root`. It also then + // flushes: + // 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache, + // 2) It there are still > MAX_CACHE_SLOTS remaining slots in the cache, the excess + // unrooted slots + pub fn flush_accounts_cache(&self, force_flush: bool, max_clean_root: Option) { + #[cfg(not(test))] + assert!(max_clean_root.is_some()); - pub fn flush_accounts_cache_if_needed(&self) { - self.flush_accounts_cache(false); - } - - fn flush_accounts_cache(&self, force_flush: bool) { if !force_flush && self.accounts_cache.num_slots() <= MAX_CACHE_SLOTS { return; } - // Flush all roots + // Flush only the roots <= max_clean_root, so that snapshotting has all + // the relevant roots in storage. let mut flush_roots_elapsed = Measure::start("flush_roots_elapsed"); - let cached_roots = self.accounts_cache.clear_roots(); - for root in &cached_roots { - self.flush_slot_cache(*root); - self.accounts_cache.set_max_flush_root(*root); - } - - // Only add to the uncleaned roots set *after* we've flushed the previous roots, - // so that clean will actually be able to clean the slots. - self.accounts_index.add_uncleaned_roots(cached_roots); + let mut account_bytes_saved = 0; + let mut num_accounts_saved = 0; + let (total_new_cleaned_roots, num_cleaned_roots_flushed) = self + .flush_rooted_accounts_cache( + max_clean_root, + Some((&mut account_bytes_saved, &mut num_accounts_saved)), + ); flush_roots_elapsed.stop(); // Note we don't purge unrooted slots here because there may be ongoing scans/references @@ -2872,27 +2894,49 @@ impl AccountsDB { // banks // If there are > MAX_CACHE_SLOTS, then flush the excess ones to storage + let (total_new_excess_roots, num_excess_roots_flushed) = + if self.accounts_cache.num_slots() > MAX_CACHE_SLOTS { + // Start by flushing the roots + // + // Cannot do any cleaning on roots past `max_clean_root` because future + // snapshots may need updates from those later slots, hence we pass `None` + // for `should_clean`. + self.flush_rooted_accounts_cache(None, None) + } else { + (0, 0) + }; let old_slots = self.accounts_cache.find_older_frozen_slots(MAX_CACHE_SLOTS); - let total_excess_slot_count = old_slots.len(); + let excess_slot_count = old_slots.len(); let mut unflushable_unrooted_slot_count = 0; let max_flushed_root = self.accounts_cache.fetch_max_flush_root(); for old_slot in old_slots { // Don't flush slots that are known to be unrooted if old_slot > max_flushed_root { - self.flush_slot_cache(old_slot); + self.flush_slot_cache(old_slot, None::<&mut fn(&_, &_) -> bool>); } else { unflushable_unrooted_slot_count += 1; } } datapoint_info!( - "accounts_db-cache-limit-slots", - ("total_excess_slot_count", total_excess_slot_count, i64), + "accounts_db-flush_accounts_cache", + ("total_new_cleaned_roots", total_new_cleaned_roots, i64), + ("num_cleaned_roots_flushed", num_cleaned_roots_flushed, i64), + ("total_new_excess_roots", total_new_excess_roots, i64), + ("num_excess_roots_flushed", num_excess_roots_flushed, i64), + ("excess_slot_count", excess_slot_count, i64), ( "unflushable_unrooted_slot_count", unflushable_unrooted_slot_count, i64 ), + ( + "flush_roots_elapsed", + flush_roots_elapsed.as_us() as i64, + i64 + ), + ("account_bytes_saved", account_bytes_saved, i64), + ("num_accounts_saved", num_accounts_saved, i64), ); // Flush a random slot out after every force flush to catch any inconsistencies @@ -2910,58 +2954,158 @@ impl AccountsDB { "Flushing random slot: {}, num_remaining: {}", *rand_slot, num_slots_remaining ); - self.flush_slot_cache(*rand_slot); + self.flush_slot_cache(*rand_slot, None::<&mut fn(&_, &_) -> bool>); } } - - inc_new_counter_info!("flush_roots_elapsed", flush_roots_elapsed.as_us() as usize); } - fn flush_slot_cache(&self, slot: Slot) { + fn flush_rooted_accounts_cache( + &self, + mut max_flush_root: Option, + should_clean: Option<(&mut usize, &mut usize)>, + ) -> (usize, usize) { + if should_clean.is_some() { + max_flush_root = self.max_clean_root(max_flush_root); + } + + // If there is a long running scan going on, this could prevent any cleaning + // past `max_flush_root`. + let cached_roots: BTreeSet = self.accounts_cache.clear_roots(max_flush_root); + + // Use HashMap because HashSet doesn't provide Entry api + let mut written_accounts = HashMap::new(); + + // If `should_clean` is None, then`should_flush_f` is also None, which will cause + // `flush_slot_cache` to flush all accounts to storage without cleaning any accounts. + let mut should_flush_f = should_clean.map(|(account_bytes_saved, num_accounts_saved)| { + move |&pubkey: &Pubkey, account: &Account| { + use std::collections::hash_map::Entry::{Occupied, Vacant}; + + let should_flush = match written_accounts.entry(pubkey) { + Vacant(vacant_entry) => { + vacant_entry.insert(()); + true + } + Occupied(_occupied_entry) => { + *account_bytes_saved += account.data.len(); + *num_accounts_saved += 1; + // If a later root already wrote this account, no point + // in flushing it + false + } + }; + should_flush + } + }); + + // Iterate from highest to lowest so that we don't need to flush earlier + // outdated updates in earlier roots + let mut num_roots_flushed = 0; + for &root in cached_roots.iter().rev() { + if self.flush_slot_cache(root, should_flush_f.as_mut()) { + num_roots_flushed += 1; + } + + // Regardless of whether this slot was *just* flushed from the cache by the above + // `flush_slot_cache()`, we should update the `max_flush_root`. + // This is because some rooted slots may be flushed to storage *before* they are marked as root. + // This can occur for instance when: + // 1) The cache is overwhelmed, we we flushed some yet to be rooted frozen slots + // 2) Random evictions + // These slots may then *later* be marked as root, so we still need to handle updating the + // `max_flush_root` in the accounts cache. + self.accounts_cache.set_max_flush_root(root); + } + + // Only add to the uncleaned roots set *after* we've flushed the previous roots, + // so that clean will actually be able to clean the slots. + let num_new_roots = cached_roots.len(); + self.accounts_index.add_uncleaned_roots(cached_roots); + (num_new_roots, num_roots_flushed) + } + + // `should_flush_f` is an optional closure that determines wehther a given + // account should be flushed. Passing `None` will by default flush all + // accounts + fn flush_slot_cache( + &self, + slot: Slot, + mut should_flush_f: Option<&mut impl FnMut(&Pubkey, &Account) -> bool>, + ) -> bool { info!("flush_slot_cache slot: {}", slot); - if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) { + let slot_cache = self.accounts_cache.slot_cache(slot); + if let Some(slot_cache) = slot_cache { let iter_items: Vec<_> = slot_cache.iter().collect(); let mut total_size = 0; + let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new(); + let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![]; let (accounts, hashes): (Vec<(&Pubkey, &Account)>, Vec) = iter_items .iter() - .map(|iter_item| { + .filter_map(|iter_item| { let key = iter_item.key(); let account = &iter_item.value().account; - let hash = iter_item.value().hash; - total_size += (account.data.len() + STORE_META_OVERHEAD) as u64; - ((key, account), hash) + let should_flush = should_flush_f + .as_mut() + .map(|should_flush_f| should_flush_f(key, account)) + .unwrap_or(true); + if should_flush { + let hash = iter_item.value().hash; + total_size += (account.data.len() + STORE_META_OVERHEAD) as u64; + Some(((key, account), hash)) + } else { + // If we don't flush, we have to remove the entry from the + // index, since it's equivalent to purging + purged_slot_pubkeys.insert((slot, *key)); + pubkey_to_slot_set.push((*key, slot)); + None + } }) .unzip(); - let aligned_total_size = self.page_align(total_size); - // This ensures that all updates are written to an AppendVec, before any - // updates to the index happen, so anybody that sees a real entry in the index, - // will be able to find the account in storage - let flushed_store = - self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache"); - self.store_accounts_custom( + let is_dead_slot = accounts.is_empty(); + // Remove the account index entries from earlier roots that are outdated by later roots. + // Safe because queries to the index will be reading updates from later roots. + self.purge_slot_cache_pubkeys( slot, - &accounts, - &hashes, - Some(Box::new(move |_, _| flushed_store.clone())), - None, - false, - ); - // If the above sizing function is correct, just one AppendVec is enough to hold - // all the data for the slot - assert_eq!( - self.storage - .get_slot_stores(slot) - .unwrap() - .read() - .unwrap() - .len(), - 1 + purged_slot_pubkeys, + pubkey_to_slot_set, + is_dead_slot, ); + if !is_dead_slot { + let aligned_total_size = self.page_align(total_size); + // This ensures that all updates are written to an AppendVec, before any + // updates to the index happen, so anybody that sees a real entry in the index, + // will be able to find the account in storage + let flushed_store = + self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache"); + self.store_accounts_custom( + slot, + &accounts, + &hashes, + Some(Box::new(move |_, _| flushed_store.clone())), + None, + false, + ); + // If the above sizing function is correct, just one AppendVec is enough to hold + // all the data for the slot + assert_eq!( + self.storage + .get_slot_stores(slot) + .unwrap() + .read() + .unwrap() + .len(), + 1 + ); + } + // Remove this slot from the cache, which will to AccountsDb readers should look like an // atomic switch from the cache to storage assert!(self.accounts_cache.remove_slot(slot).is_some()); + true + } else { + false } } @@ -6959,7 +7103,7 @@ pub mod tests { // No root was added yet, requires an ancestor to find // the account - db.force_flush_accounts_cache(); + db.flush_accounts_cache(true, None); let ancestors = vec![(slot, 1)].into_iter().collect(); assert_eq!( db.load_slow(&ancestors, &key), @@ -6968,7 +7112,7 @@ pub mod tests { // Add root then flush db.add_root(slot); - db.force_flush_accounts_cache(); + db.flush_accounts_cache(true, None); assert_eq!(db.load_slow(&HashMap::new(), &key), Some((account0, slot))); } @@ -6999,7 +7143,7 @@ pub mod tests { db.load_slow(&ancestors, &unrooted_key), Some((account0.clone(), unrooted_slot)) ); - db.force_flush_accounts_cache(); + db.flush_accounts_cache(true, None); // After the flush, the unrooted slot is still in the cache assert!(db.load_slow(&ancestors, &unrooted_key).is_some()); @@ -7048,7 +7192,7 @@ pub mod tests { } } - db.flush_accounts_cache_if_needed(); + db.flush_accounts_cache(false, None); let total_slots = num_roots + num_unrooted; // If there's <= the max size, then nothing will be flushed from the slot @@ -7110,7 +7254,7 @@ pub mod tests { // Flush, then clean again. Should not need another root to initiate the cleaning // because `accounts_index.uncleaned_roots` should be correct - db.force_flush_accounts_cache(); + db.flush_accounts_cache(true, None); db.clean_accounts(None); assert!(db .do_load(&Ancestors::default(), &account_key, Some(0)) @@ -7187,7 +7331,7 @@ pub mod tests { db.add_root(2); // Flush the cache, slot 1 should remain in the cache, everything else should be flushed - db.force_flush_accounts_cache(); + db.flush_accounts_cache(true, None); assert_eq!(db.accounts_cache.num_slots(), 1); assert!(db.accounts_cache.slot_cache(1).is_some()); @@ -7234,7 +7378,7 @@ pub mod tests { } accounts_db.add_root(slot); - accounts_db.force_flush_accounts_cache(); + accounts_db.flush_accounts_cache(true, None); let mut storage_maps: Vec> = accounts_db .storage @@ -7268,4 +7412,320 @@ pub mod tests { assert_eq!(before_size, after_size + account.stored_size); } } + + fn setup_accounts_db_cache_clean(num_slots: usize) -> (AccountsDB, Vec, Vec) { + let caching_enabled = true; + let accounts_db = AccountsDB::new_with_config( + Vec::new(), + &ClusterType::Development, + HashSet::new(), + caching_enabled, + ); + let slots: Vec<_> = (0..num_slots as Slot).into_iter().collect(); + let keys: Vec = std::iter::repeat_with(Pubkey::new_unique) + .take(num_slots) + .collect(); + + // Store some subset of the keys in slots 0..num_slots + for slot in &slots { + for key in &keys[*slot as usize..] { + accounts_db.store_cached(*slot, &[(key, &Account::new(1, 0, &Pubkey::default()))]); + } + accounts_db.add_root(*slot as Slot); + } + + // If there's <= MAX_CACHE_SLOTS, no slots should be flushed + if accounts_db.accounts_cache.num_slots() <= MAX_CACHE_SLOTS { + accounts_db.flush_accounts_cache(false, None); + assert_eq!(accounts_db.accounts_cache.num_slots(), num_slots); + } + + (accounts_db, keys, slots) + } + + #[test] + fn test_accounts_db_cache_clean_dead_slots() { + let num_slots = 10; + let (accounts_db, keys, mut slots) = setup_accounts_db_cache_clean(num_slots); + let last_dead_slot = (num_slots - 1) as Slot; + assert_eq!(*slots.last().unwrap(), last_dead_slot); + let alive_slot = last_dead_slot as Slot + 1; + slots.push(alive_slot); + for key in &keys { + // Store a slot that overwrites all previous keys, rendering all previous keys dead + accounts_db.store_cached( + alive_slot, + &[(key, &Account::new(1, 0, &Pubkey::default()))], + ); + accounts_db.add_root(alive_slot); + } + + // Before the flush, we can find entries in the database for slots < alive_slot if we specify + // a smaller max root + for key in &keys { + assert!(accounts_db + .do_load(&Ancestors::default(), key, Some(last_dead_slot)) + .is_some()); + } + + // If no `max_clean_root` is specified, cleaning should purge all flushed slots + accounts_db.flush_accounts_cache(true, None); + assert_eq!(accounts_db.accounts_cache.num_slots(), 0); + let mut uncleaned_roots = accounts_db + .accounts_index + .clear_uncleaned_roots(None) + .into_iter() + .collect::>(); + uncleaned_roots.sort_unstable(); + assert_eq!(uncleaned_roots, slots); + assert_eq!( + accounts_db.accounts_cache.fetch_max_flush_root(), + alive_slot, + ); + + // Specifying a max_root < alive_slot, should not return any more entries, + // as those have been purged from the accounts index for the dead slots. + for key in &keys { + assert!(accounts_db + .do_load(&Ancestors::default(), key, Some(last_dead_slot)) + .is_none()); + } + // Each slot should only have one entry in the storage, since all other accounts were + // cleaned due to later updates + for slot in &slots { + if let ScanStorageResult::Stored(slot_accounts) = accounts_db.scan_account_storage( + *slot as Slot, + |_| Some(0), + |slot_accounts: &DashSet, loaded_account: LoadedAccount| { + slot_accounts.insert(*loaded_account.pubkey()); + }, + ) { + if *slot == alive_slot { + assert_eq!(slot_accounts.len(), keys.len()); + } else { + assert!(slot_accounts.is_empty()); + } + } else { + panic!("Expected slot to be in storage, not cache"); + } + } + } + + #[test] + fn test_accounts_db_cache_clean() { + let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(10); + + // If no `max_clean_root` is specified, cleaning should purge all flushed slots + accounts_db.flush_accounts_cache(true, None); + assert_eq!(accounts_db.accounts_cache.num_slots(), 0); + let mut uncleaned_roots = accounts_db + .accounts_index + .clear_uncleaned_roots(None) + .into_iter() + .collect::>(); + uncleaned_roots.sort_unstable(); + assert_eq!(uncleaned_roots, slots); + assert_eq!( + accounts_db.accounts_cache.fetch_max_flush_root(), + *slots.last().unwrap() + ); + + // Each slot should only have one entry in the storage, since all other accounts were + // cleaned due to later updates + for slot in &slots { + if let ScanStorageResult::Stored(slot_account) = accounts_db.scan_account_storage( + *slot as Slot, + |_| Some(0), + |slot_account: &Arc>, loaded_account: LoadedAccount| { + *slot_account.write().unwrap() = *loaded_account.pubkey(); + }, + ) { + assert_eq!(*slot_account.read().unwrap(), keys[*slot as usize]); + } else { + panic!("Everything should have been flushed") + } + } + } + + fn run_test_accounts_db_cache_clean_max_root(num_slots: usize, max_clean_root: Slot) { + assert!(max_clean_root < (num_slots as Slot)); + let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(num_slots); + let is_cache_at_limit = num_slots - max_clean_root as usize - 1 > MAX_CACHE_SLOTS; + // If: + // 1) `max_clean_root` is specified, + // 2) not at the cache limit, i.e. `is_cache_at_limit == false`, then + // `flush_accounts_cache()` should clean and flushed only slots < max_clean_root, + accounts_db.flush_accounts_cache(true, Some(max_clean_root)); + + if !is_cache_at_limit { + // Should flush all slots between 0..=max_clean_root + assert_eq!( + accounts_db.accounts_cache.num_slots(), + slots.len() - max_clean_root as usize - 1 + ); + } else { + // Otherwise, if we are at the cache limit, all roots will be flushed + assert_eq!(accounts_db.accounts_cache.num_slots(), 0,); + } + + let mut uncleaned_roots = accounts_db + .accounts_index + .clear_uncleaned_roots(None) + .into_iter() + .collect::>(); + uncleaned_roots.sort_unstable(); + + let expected_max_clean_root = if !is_cache_at_limit { + // Should flush all slots between 0..=max_clean_root + max_clean_root + } else { + // Otherwise, if we are at the cache limit, all roots will be flushed + num_slots as Slot - 1 + }; + + assert_eq!( + uncleaned_roots, + slots[0..=expected_max_clean_root as usize].to_vec() + ); + assert_eq!( + accounts_db.accounts_cache.fetch_max_flush_root(), + expected_max_clean_root, + ); + + // Updates from slots > max_clean_root should still be flushed to storage + for slot in &slots { + let slot_accounts = accounts_db.scan_account_storage( + *slot as Slot, + |loaded_account: LoadedAccount| { + if is_cache_at_limit { + panic!( + "When cache is at limit, all roots should have been flushed to storage" + ); + } + assert!(*slot > max_clean_root); + Some(*loaded_account.pubkey()) + }, + |slot_accounts: &DashSet, loaded_account: LoadedAccount| { + slot_accounts.insert(*loaded_account.pubkey()); + if !is_cache_at_limit { + // Only true when the limit hasn't been reached and there are still + // slots left in the cache + assert!(*slot <= max_clean_root); + } + }, + ); + + let slot_accounts = match slot_accounts { + ScanStorageResult::Cached(slot_accounts) => { + slot_accounts.into_iter().collect::>() + } + ScanStorageResult::Stored(slot_accounts) => { + slot_accounts.into_iter().collect::>() + } + }; + if *slot >= max_clean_root { + // 1) If slot > `max_clean_root`, then either: + // a) If `is_cache_at_limit == true`, still in the cache + // b) if `is_cache_at_limit == false`, were not cleaned before being flushed to storage. + // + // In both cases all the *original* updates at index `slot` were uncleaned and thus + // should be discoverable by this scan. + // + // 2) If slot == `max_clean_root`, the slot was not cleaned before being flushed to storage, + // so it also contains all the original updates. + assert_eq!( + slot_accounts, + keys[*slot as usize..] + .iter() + .cloned() + .collect::>() + ); + } else { + // Slots less than `max_clean_root` were cleaned in the cache before being flushed + // to storage, should only contain one account + assert_eq!( + slot_accounts, + std::iter::once(keys[*slot as usize]) + .into_iter() + .collect::>() + ); + } + } + } + + #[test] + fn test_accounts_db_cache_clean_max_root() { + let max_clean_root = 5; + run_test_accounts_db_cache_clean_max_root(10, max_clean_root); + } + + #[test] + fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit() { + let max_clean_root = 5; + // Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots + // will be flushed + run_test_accounts_db_cache_clean_max_root( + MAX_CACHE_SLOTS + max_clean_root as usize + 2, + max_clean_root, + ); + } + + fn run_flush_rooted_accounts_cache(should_clean: bool) { + let num_slots = 10; + let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(num_slots); + let mut cleaned_bytes = 0; + let mut cleaned_accounts = 0; + let should_clean_tracker = if should_clean { + Some((&mut cleaned_bytes, &mut cleaned_accounts)) + } else { + None + }; + + // If no cleaning is specified, then flush everything + accounts_db.flush_rooted_accounts_cache(None, should_clean_tracker); + for slot in &slots { + let slot_accounts = if let ScanStorageResult::Stored(slot_accounts) = accounts_db + .scan_account_storage( + *slot as Slot, + |_| Some(0), + |slot_account: &DashSet, loaded_account: LoadedAccount| { + slot_account.insert(*loaded_account.pubkey()); + }, + ) { + slot_accounts.into_iter().collect::>() + } else { + panic!("All roots should have been flushed to storage"); + }; + if !should_clean || slot == slots.last().unwrap() { + // The slot was not cleaned before being flushed to storage, + // so it also contains all the original updates. + assert_eq!( + slot_accounts, + keys[*slot as usize..] + .iter() + .cloned() + .collect::>() + ); + } else { + // If clean was specified, only the latest slot should have all the updates. + // All these other slots have been cleaned before flush + assert_eq!( + slot_accounts, + std::iter::once(keys[*slot as usize]) + .into_iter() + .collect::>() + ); + } + } + } + + #[test] + fn test_flush_rooted_accounts_cache_with_clean() { + run_flush_rooted_accounts_cache(true); + } + + #[test] + fn test_flush_rooted_accounts_cache_without_clean() { + run_flush_rooted_accounts_cache(false); + } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 8ecfe75c0e..31d3b0bbe7 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -956,6 +956,23 @@ impl AccountsIndex { std::mem::replace(&mut w_roots_tracker.previous_uncleaned_roots, cleaned_roots) } + #[cfg(test)] + pub fn clear_uncleaned_roots(&self, max_clean_root: Option) -> HashSet { + let mut cleaned_roots = HashSet::new(); + let mut w_roots_tracker = self.roots_tracker.write().unwrap(); + w_roots_tracker.uncleaned_roots.retain(|root| { + let is_cleaned = max_clean_root + .map(|max_clean_root| *root <= max_clean_root) + .unwrap_or(true); + if is_cleaned { + cleaned_roots.insert(*root); + } + // Only keep the slots that have yet to be cleaned + !is_cleaned + }); + cleaned_roots + } + pub fn is_uncleaned_root(&self, slot: Slot) -> bool { self.roots_tracker .read() diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 29d7423ec7..6045229d4c 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3909,14 +3909,17 @@ impl Bank { } pub fn force_flush_accounts_cache(&self) { - self.rc.accounts.accounts_db.force_flush_accounts_cache() + self.rc + .accounts + .accounts_db + .flush_accounts_cache(true, Some(self.slot())) } pub fn flush_accounts_cache_if_needed(&self) { self.rc .accounts .accounts_db - .flush_accounts_cache_if_needed() + .flush_accounts_cache(false, Some(self.slot())) } fn store_account_and_update_capitalization(&self, pubkey: &Pubkey, new_account: &Account) { @@ -10241,7 +10244,8 @@ pub(crate) mod tests { fn get_shrink_account_size() -> usize { let (genesis_config, _mint_keypair) = create_genesis_config(1_000_000_000); - // Set root for bank 0, with caching enabled + // Set root for bank 0, with caching disabled so we can get the size + // of the storage for this slot let mut bank0 = Arc::new(Bank::new_with_config( &genesis_config, HashSet::new(), @@ -10279,12 +10283,16 @@ pub(crate) mod tests { bank0.restore_old_behavior_for_fragile_tests(); let pubkey0_size = get_shrink_account_size(); + let account0 = Account::new(1000, pubkey0_size as usize, &Pubkey::new_unique()); bank0.store_account(&pubkey0, &account0); goto_end_of_slot(Arc::::get_mut(&mut bank0).unwrap()); bank0.freeze(); bank0.squash(); + // Flush now so that accounts cache cleaning doesn't clean up bank 0 when later + // slots add updates to the cache + bank0.force_flush_accounts_cache(); // Store some lamports in bank 1 let some_lamports = 123; @@ -10292,6 +10300,11 @@ pub(crate) mod tests { bank1.deposit(&pubkey1, some_lamports); bank1.deposit(&pubkey2, some_lamports); goto_end_of_slot(Arc::::get_mut(&mut bank1).unwrap()); + bank1.freeze(); + bank1.squash(); + // Flush now so that accounts cache cleaning doesn't clean up bank 0 when later + // slots add updates to the cache + bank1.force_flush_accounts_cache(); // Store some lamports for pubkey1 in bank 2, root bank 2 let mut bank2 = Arc::new(new_from_parent(&bank1)); @@ -11809,43 +11822,45 @@ pub(crate) mod tests { #[test] fn test_store_scan_consistency_root() { - test_store_scan_consistency( - false, - |bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| { - let mut current_bank = bank0.clone(); - let mut prev_bank = bank0; - loop { - let lamports_this_round = current_bank.slot() + starting_lamports + 1; - let account = Account::new(lamports_this_round, 0, &program_id); - for key in pubkeys_to_modify.iter() { - current_bank.store_account(key, &account); + for accounts_db_caching_enabled in &[false, true] { + test_store_scan_consistency( + *accounts_db_caching_enabled, + |bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| { + let mut current_bank = bank0.clone(); + let mut prev_bank = bank0; + loop { + let lamports_this_round = current_bank.slot() + starting_lamports + 1; + let account = Account::new(lamports_this_round, 0, &program_id); + for key in pubkeys_to_modify.iter() { + current_bank.store_account(key, &account); + } + current_bank.freeze(); + // Send the previous bank to the scan thread to perform the scan. + // Meanwhile this thread will squash and update roots immediately after + // so the roots will update while scanning. + // + // The capacity of the channel is 1 so that this thread will wait for the scan to finish before starting + // the next iteration, allowing the scan to stay in sync with these updates + // such that every scan will see this interruption. + if bank_to_scan_sender.send(prev_bank).is_err() { + // Channel was disconnected, exit + return; + } + current_bank.squash(); + if current_bank.slot() % 2 == 0 { + current_bank.force_flush_accounts_cache(); + current_bank.clean_accounts(true); + } + prev_bank = current_bank.clone(); + current_bank = Arc::new(Bank::new_from_parent( + ¤t_bank, + &solana_sdk::pubkey::new_rand(), + current_bank.slot() + 1, + )); } - current_bank.freeze(); - // Send the previous bank to the scan thread to perform the scan. - // Meanwhile this thread will squash and update roots immediately after - // so the roots will update while scanning. - // - // The capacity of the channel is 1 so that this thread will wait for the scan to finish before starting - // the next iteration, allowing the scan to stay in sync with these updates - // such that every scan will see this interruption. - if bank_to_scan_sender.send(prev_bank).is_err() { - // Channel was disconnected, exit - return; - } - - current_bank.freeze(); - current_bank.squash(); - current_bank.force_flush_accounts_cache(); - current_bank.clean_accounts(true); - prev_bank = current_bank.clone(); - current_bank = Arc::new(Bank::new_from_parent( - ¤t_bank, - &solana_sdk::pubkey::new_rand(), - current_bank.slot() + 1, - )); - } - }, - ); + }, + ); + } } #[test]