diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 064a08da8e..d7951e9879 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -493,11 +493,15 @@ impl AccountStorageEntry { } } - fn remove_account(&self, num_bytes: usize) -> usize { + pub fn all_accounts(&self) -> Vec { + self.accounts.accounts(0) + } + + fn remove_account(&self, num_bytes: usize, reset_accounts: bool) -> usize { let mut count_and_status = self.count_and_status.write().unwrap(); let (mut count, mut status) = *count_and_status; - if count == 1 && status == AccountStorageStatus::Full { + if count == 1 && status == AccountStorageStatus::Full && reset_accounts { // this case arises when we remove the last account from the // storage, but we've learned from previous write attempts that // the storage is full @@ -1109,8 +1113,19 @@ impl AccountsDB { inc_new_counter_info!("clean-old-root-par-clean-ms", clean_rooted.as_ms() as usize); let mut measure = Measure::start("clean_old_root_reclaims"); + + // Don't reset from clean, since the pubkeys in those stores may need to be unref'ed + // and those stores may be used for background hashing. + let reset_accounts = false; + let mut reclaim_result = (HashMap::new(), HashMap::new()); - self.handle_reclaims(&reclaims, None, false, Some(&mut reclaim_result)); + self.handle_reclaims( + &reclaims, + None, + false, + Some(&mut reclaim_result), + reset_accounts, + ); measure.stop(); debug!("{} {}", clean_rooted, measure); inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize); @@ -1486,7 +1501,10 @@ impl AccountsDB { let reclaims = self.purge_keys_exact(&pubkey_to_slot_set); - self.handle_reclaims(&reclaims, None, false, None); + // Don't reset from clean, since the pubkeys in those stores may need to be unref'ed + // and those stores may be used for background hashing. + let reset_accounts = false; + self.handle_reclaims(&reclaims, None, false, None, reset_accounts); reclaims_time.stop(); @@ -1515,32 +1533,38 @@ impl AccountsDB { ); } - // Removes the accounts in the input `reclaims` from the tracked "count" of - // their corresponding storage entries. Note this does not actually free - // the memory from the storage entries until all the storage entries for - // a given slot `S` are empty, at which point `process_dead_slots` will - // remove all the storage entries for `S`. - // + /// Removes the accounts in the input `reclaims` from the tracked "count" of + /// their corresponding storage entries. Note this does not actually free + /// the memory from the storage entries until all the storage entries for + /// a given slot `S` are empty, at which point `process_dead_slots` will + /// remove all the storage entries for `S`. + /// /// # Arguments /// * `reclaims` - The accounts to remove from storage entries' "count" + /// /// * `expected_single_dead_slot` - A correctness assertion. If this is equal to `Some(S)`, - /// then the function will check that the only slot being cleaned up in `reclaims` - /// is the slot == `S`. This is true for instance when `handle_reclaims` is called - /// from store or slot shrinking, as those should only touch the slot they are - /// currently storing to or shrinking. + /// then the function will check that the only slot being cleaned up in `reclaims` + /// is the slot == `S`. This is true for instance when `handle_reclaims` is called + /// from store or slot shrinking, as those should only touch the slot they are + /// currently storing to or shrinking. + /// /// * `no_dead_slot` - A correctness assertion. If this is equal to - /// `false`, the function will check that no slots are cleaned up/removed via - /// `process_dead_slots`. For instance, on store, no slots should be cleaned up, - /// but during the background clean accounts purges accounts from old rooted slots, - /// so outdated slots may be removed. + /// `false`, the function will check that no slots are cleaned up/removed via + /// `process_dead_slots`. For instance, on store, no slots should be cleaned up, + /// but during the background clean accounts purges accounts from old rooted slots, + /// so outdated slots may be removed. /// * `reclaim_result` - Information about accounts that were removed from storage, does - /// not include accounts that were removed from the cache + /// not include accounts that were removed from the cache + /// * `reset_accounts` - Reset the append_vec store when the store is dead (count==0) + /// From the clean and shrink paths it should be false since there may be an in-progress + /// hash operation and the stores may hold accounts that need to be unref'ed. fn handle_reclaims( &self, reclaims: SlotSlice, expected_single_dead_slot: Option, no_dead_slot: bool, reclaim_result: Option<&mut ReclaimResult>, + reset_accounts: bool, ) { if reclaims.is_empty() { return; @@ -1551,8 +1575,12 @@ impl AccountsDB { } else { (None, None) }; - let dead_slots = - self.remove_dead_accounts(reclaims, expected_single_dead_slot, reclaimed_offsets); + let dead_slots = self.remove_dead_accounts( + reclaims, + expected_single_dead_slot, + reclaimed_offsets, + reset_accounts, + ); if no_dead_slot { assert!(dead_slots.is_empty()); } else if let Some(expected_single_dead_slot) = expected_single_dead_slot { @@ -1714,16 +1742,15 @@ impl AccountsDB { // here, we're writing back alive_accounts. That should be an atomic operation // without use of rather wide locks in this whole function, because we're // mutating rooted slots; There should be no writers to them. - store_accounts_timing = self.store_accounts_custom( + store_accounts_timing = self.store_accounts_frozen( slot, &accounts, &hashes, Some(Box::new(move |_, _| shrunken_store.clone())), Some(Box::new(write_versions.into_iter())), - false, ); - // `store_accounts_custom()` above may have purged accounts from some + // `store_accounts_frozen()` above may have purged accounts from some // other storage entries (the ones that were just overwritten by this // new storage entry). This means some of those stores might have caused // this slot to be readded to `self.shrink_candidate_slots`, so delete @@ -2024,7 +2051,7 @@ impl AccountsDB { self.thread_pool.install(|| { storage_maps .par_iter() - .flat_map(|storage| storage.accounts.accounts(0)) + .flat_map(|storage| storage.all_accounts()) .for_each(|account| storage_scan_func(&retval, LoadedAccount::Stored(account))); }); @@ -2609,9 +2636,10 @@ impl AccountsDB { } } - // 1) Remove old bank hash from self.bank_hashes - // 2) Purge this slot's storage entries from self.storage - self.handle_reclaims(&reclaims, Some(remove_slot), false, None); + self.handle_reclaims(&reclaims, Some(remove_slot), false, None, false); + + // After handling the reclaimed entries, this slot's + // storage entries should be purged from self.storage assert!(self.storage.get_slot_stores(remove_slot).is_none()); } @@ -3116,13 +3144,12 @@ impl AccountsDB { // will be able to find the account in storage let flushed_store = self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache"); - self.store_accounts_custom( + self.store_accounts_frozen( slot, &accounts, &hashes, Some(Box::new(move |_, _| flushed_store.clone())), None, - false, ); // If the above sizing function is correct, just one AppendVec is enough to hold // all the data for the slot @@ -3664,6 +3691,7 @@ impl AccountsDB { reclaims: SlotSlice, expected_slot: Option, mut reclaimed_offsets: Option<&mut AppendVecOffsets>, + reset_accounts: bool, ) -> HashSet { let mut dead_slots = HashSet::new(); let mut new_shrink_candidates: ShrinkCandidates = HashMap::new(); @@ -3688,7 +3716,7 @@ impl AccountsDB { "AccountDB::accounts_index corrupted. Storage pointed to: {}, expected: {}, should only point to one slot", store.slot(), *slot ); - let count = store.remove_account(account_info.stored_size); + let count = store.remove_account(account_info.stored_size, reset_accounts); if count == 0 { dead_slots.insert(*slot); } else if self.caching_enabled @@ -3789,7 +3817,7 @@ impl AccountsDB { stores .into_par_iter() .map(|store| { - let accounts = store.accounts.accounts(0); + let accounts = store.all_accounts(); accounts .into_iter() .map(|account| (store.slot(), account.meta.pubkey)) @@ -3917,7 +3945,7 @@ impl AccountsDB { self.stats .store_hash_accounts .fetch_add(hash_time.as_us(), Ordering::Relaxed); - self.store_accounts_default(slot, accounts, &hashes, is_cached_store); + self.store_accounts_unfrozen(slot, accounts, &hashes, is_cached_store); self.report_store_timings(); } @@ -4008,13 +4036,21 @@ impl AccountsDB { } } - fn store_accounts_default( + fn store_accounts_unfrozen( &self, slot: Slot, accounts: &[(&Pubkey, &Account)], hashes: &[Hash], is_cached_store: bool, ) { + // This path comes from a store to a non-frozen slot. + // If a store is dead here, then a newer update for + // each pubkey in the store must exist in another + // store in the slot. Thus it is safe to reset the store and + // re-use it for a future store op. The pubkey ref counts should still + // hold just 1 ref from this slot. + let reset_accounts = true; + self.store_accounts_custom( slot, accounts, @@ -4022,9 +4058,34 @@ impl AccountsDB { None::, None::>>, is_cached_store, + reset_accounts, ); } + fn store_accounts_frozen<'a>( + &'a self, + slot: Slot, + accounts: &[(&Pubkey, &Account)], + hashes: &[Hash], + storage_finder: Option>, + write_version_producer: Option>>, + ) -> StoreAccountsTiming { + // stores on a frozen slot should not reset + // the append vec so that hashing could happen on the store + // and accounts in the append_vec can be unrefed correctly + let reset_accounts = false; + let is_cached_store = false; + self.store_accounts_custom( + slot, + accounts, + hashes, + storage_finder, + write_version_producer, + is_cached_store, + reset_accounts, + ) + } + fn store_accounts_custom<'a>( &'a self, slot: Slot, @@ -4033,6 +4094,7 @@ impl AccountsDB { storage_finder: Option>, write_version_producer: Option>>, is_cached_store: bool, + reset_accounts: bool, ) -> StoreAccountsTiming { let storage_finder: StorageFinder<'a> = storage_finder .unwrap_or_else(|| Box::new(move |slot, size| self.find_storage_candidate(slot, size))); @@ -4098,7 +4160,7 @@ impl AccountsDB { // // From 1) and 2) we guarantee passing Some(slot), true is safe let mut handle_reclaims_time = Measure::start("handle_reclaims"); - self.handle_reclaims(&reclaims, Some(slot), true, None); + self.handle_reclaims(&reclaims, Some(slot), true, None, reset_accounts); handle_reclaims_time.stop(); self.stats .store_handle_reclaims @@ -4164,7 +4226,7 @@ impl AccountsDB { .sum(); let mut accounts_map: AccountsMap = AccountsMap::with_capacity(num_accounts); storage_maps.iter().for_each(|storage| { - let accounts = storage.accounts.accounts(0); + let accounts = storage.all_accounts(); accounts.into_iter().for_each(|stored_account| { let entry = accounts_map .entry(stored_account.meta.pubkey) @@ -4310,6 +4372,14 @@ impl AccountsDB { } } + pub fn new_sized_no_extra_stores(paths: Vec, file_size: u64) -> Self { + AccountsDB { + file_size, + min_num_stores: 0, + ..AccountsDB::new(paths, &ClusterType::Development) + } + } + pub fn get_append_vec_id(&self, pubkey: &Pubkey, slot: Slot) -> Option { let ancestors = vec![(slot, 1)].into_iter().collect(); let result = self.accounts_index.get(&pubkey, Some(&ancestors), None); @@ -4483,13 +4553,12 @@ impl AccountsDB { // here, we're writing back alive_accounts. That should be an atomic operation // without use of rather wide locks in this whole function, because we're // mutating rooted slots; There should be no writers to them. - store_accounts_timing = self.store_accounts_custom( + store_accounts_timing = self.store_accounts_frozen( slot, &accounts, &hashes, Some(Box::new(move |_, _| shrunken_store.clone())), Some(Box::new(write_versions.into_iter())), - false, ); let mut start = Measure::start("write_storage_elapsed"); @@ -4584,7 +4653,6 @@ impl AccountsDB { fn do_shrink_stale_slot_v1(&self, slot: Slot) -> usize { self.do_shrink_slot_v1(slot, false) } - fn do_shrink_slot_forced_v1(&self, slot: Slot) { self.do_shrink_slot_v1(slot, true); } @@ -5150,7 +5218,7 @@ pub mod tests { .sum(), r_slot_storages .values() - .map(|s| s.accounts.accounts(0).len()) + .map(|s| s.all_accounts().len()) .sum(), ); assert_eq!(expected_store_count, actual_store_count); @@ -5372,7 +5440,7 @@ pub mod tests { let r_slot_storage = slot_storage.read().unwrap(); let count = r_slot_storage .values() - .map(|store| store.accounts.accounts(0).len()) + .map(|store| store.all_accounts().len()) .sum(); let stored_count: usize = r_slot_storage .values() @@ -6578,7 +6646,7 @@ pub mod tests { db.hash_accounts(some_slot, accounts, &ClusterType::Development); // provide bogus account hashes let some_hash = Hash::new(&[0xca; HASH_BYTES]); - db.store_accounts_default(some_slot, accounts, &[some_hash], false); + db.store_accounts_unfrozen(some_slot, accounts, &[some_hash], false); db.add_root(some_slot); assert_matches!( db.verify_bank_hash_and_lamports(some_slot, &ancestors, 1, true), @@ -6683,7 +6751,7 @@ pub mod tests { .values() .next() .unwrap() - .remove_account(0); + .remove_account(0, true); assert!(db.get_snapshot_storages(after_slot).is_empty()); } @@ -6704,8 +6772,8 @@ pub mod tests { .next() .unwrap() .clone(); - storage_entry.remove_account(0); - storage_entry.remove_account(0); + storage_entry.remove_account(0, true); + storage_entry.remove_account(0, true); } #[test] @@ -6769,6 +6837,135 @@ pub mod tests { assert_load_account(&accounts, current_slot, purged_pubkey2, 0); } + fn do_full_clean_refcount(store1_first: bool, store_size: u64) { + let pubkey1 = Pubkey::from_str("My11111111111111111111111111111111111111111").unwrap(); + let pubkey2 = Pubkey::from_str("My22211111111111111111111111111111111111111").unwrap(); + let pubkey3 = Pubkey::from_str("My33311111111111111111111111111111111111111").unwrap(); + + let old_lamport = 223; + let zero_lamport = 0; + let dummy_lamport = 999_999; + + // size data so only 1 fits in a 4k store + let data_size = 2200; + + let owner = Account::default().owner; + + let account = Account::new(old_lamport, data_size, &owner); + let account2 = Account::new(old_lamport + 100_001, data_size, &owner); + let account3 = Account::new(old_lamport + 100_002, data_size, &owner); + let account4 = Account::new(dummy_lamport, data_size, &owner); + let zero_lamport_account = Account::new(zero_lamport, data_size, &owner); + + let mut current_slot = 0; + let accounts = AccountsDB::new_sized_no_extra_stores(Vec::new(), store_size); + + // A: Initialize AccountsDB with pubkey1 and pubkey2 + current_slot += 1; + if store1_first { + accounts.store_uncached(current_slot, &[(&pubkey1, &account)]); + accounts.store_uncached(current_slot, &[(&pubkey2, &account)]); + } else { + accounts.store_uncached(current_slot, &[(&pubkey2, &account)]); + accounts.store_uncached(current_slot, &[(&pubkey1, &account)]); + } + accounts.get_accounts_delta_hash(current_slot); + accounts.add_root(current_slot); + + info!("post A"); + accounts.print_accounts_stats("Post-A"); + + // B: Test multiple updates to pubkey1 in a single slot/storage + current_slot += 1; + assert_eq!(0, accounts.alive_account_count_in_slot(current_slot)); + assert_eq!(1, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.store_uncached(current_slot, &[(&pubkey1, &account2)]); + accounts.store_uncached(current_slot, &[(&pubkey1, &account2)]); + assert_eq!(1, accounts.alive_account_count_in_slot(current_slot)); + // Stores to same pubkey, same slot only count once towards the + // ref count + assert_eq!(2, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.get_accounts_delta_hash(current_slot); + accounts.add_root(current_slot); + + accounts.print_accounts_stats("Post-B pre-clean"); + + accounts.clean_accounts(None); + + info!("post B"); + accounts.print_accounts_stats("Post-B"); + + // C: more updates to trigger clean of previous updates + current_slot += 1; + assert_eq!(2, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.store_uncached(current_slot, &[(&pubkey1, &account3)]); + accounts.store_uncached(current_slot, &[(&pubkey2, &account3)]); + accounts.store_uncached(current_slot, &[(&pubkey3, &account4)]); + assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.get_accounts_delta_hash(current_slot); + accounts.add_root(current_slot); + + info!("post C"); + + accounts.print_accounts_stats("Post-C"); + + // D: Make all keys 0-lamport, cleans all keys + current_slot += 1; + assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.store_uncached(current_slot, &[(&pubkey1, &zero_lamport_account)]); + accounts.store_uncached(current_slot, &[(&pubkey2, &zero_lamport_account)]); + accounts.store_uncached(current_slot, &[(&pubkey3, &zero_lamport_account)]); + + let snapshot_stores = accounts.get_snapshot_storages(current_slot); + let total_accounts: usize = snapshot_stores + .iter() + .flatten() + .map(|s| s.all_accounts().len()) + .sum(); + assert!(!snapshot_stores.is_empty()); + assert!(total_accounts > 0); + + info!("post D"); + accounts.print_accounts_stats("Post-D"); + + accounts.get_accounts_delta_hash(current_slot); + accounts.add_root(current_slot); + accounts.clean_accounts(None); + + accounts.print_accounts_stats("Post-D clean"); + + let total_accounts_post_clean: usize = snapshot_stores + .iter() + .flatten() + .map(|s| s.all_accounts().len()) + .sum(); + assert_eq!(total_accounts, total_accounts_post_clean); + + // should clean all 3 pubkeys + assert_eq!(accounts.ref_count_for_pubkey(&pubkey1), 0); + assert_eq!(accounts.ref_count_for_pubkey(&pubkey2), 0); + assert_eq!(accounts.ref_count_for_pubkey(&pubkey3), 0); + } + + #[test] + fn test_full_clean_refcount() { + solana_logger::setup(); + + // Setup 3 scenarios which try to differentiate between pubkey1 being in an + // Available slot or a Full slot which would cause a different reset behavior + // when pubkey1 is cleaned and therefor cause the ref count to be incorrect + // preventing a removal of that key. + // + // do stores with a 4mb size so only 1 store is created per slot + do_full_clean_refcount(false, 4 * 1024 * 1024); + + // do stores with a 4k size and store pubkey1 first + do_full_clean_refcount(false, 4096); + + // do stores with a 4k size and store pubkey1 2nd + do_full_clean_refcount(true, 4096); + } + #[test] fn test_accounts_clean_after_snapshot_restore_then_old_revives() { solana_logger::setup(); @@ -7872,7 +8069,7 @@ pub mod tests { // Flushing cache should only create one storage entry assert_eq!(storage_maps.len(), 1); let storage0 = storage_maps.pop().unwrap(); - let accounts = storage0.accounts.accounts(0); + let accounts = storage0.all_accounts(); for account in accounts { let before_size = storage0.alive_bytes.load(Ordering::Relaxed); @@ -7890,7 +8087,7 @@ pub mod tests { assert_eq!(removed_data_size, account.stored_size); assert_eq!(account_info.0, slot); let reclaims = vec![account_info]; - accounts_db.remove_dead_accounts(&reclaims, None, None); + accounts_db.remove_dead_accounts(&reclaims, None, None, true); let after_size = storage0.alive_bytes.load(Ordering::Relaxed); assert_eq!(before_size, after_size + account.stored_size); }