diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index fe8b3a0cf3..432225bc27 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -50,7 +50,7 @@ use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ borrow::Cow, boxed::Box, - collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet}, convert::{TryFrom, TryInto}, io::{Error as IOError, Result as IOResult}, ops::RangeBounds, @@ -332,6 +332,11 @@ impl AccountStorage { self.0.get(&slot).map(|result| result.value().clone()) } + fn get_slot_storage_entries(&self, slot: Slot) -> Option>> { + self.get_slot_stores(slot) + .map(|res| res.read().unwrap().values().cloned().collect()) + } + fn slot_store_count(&self, slot: Slot, store_id: AppendVecId) -> Option { self.get_account_storage_entry(slot, store_id) .map(|store| store.count()) @@ -1663,61 +1668,83 @@ impl AccountsDB { where I: Iterator>, { + struct FoundStoredAccount { + account: Account, + account_hash: Hash, + account_size: usize, + store_id: AppendVecId, + offset: usize, + write_version: u64, + } debug!("do_shrink_slot_stores: slot: {}", slot); - let mut stored_accounts = vec![]; + let mut stored_accounts: HashMap = HashMap::new(); let mut original_bytes = 0; for store in stores { let mut start = 0; original_bytes += store.total_bytes(); while let Some((account, next)) = store.accounts.get_account(start) { - stored_accounts.push(( - account.meta.pubkey, - account.clone_account(), - *account.hash, - next - start, - (store.append_vec_id(), account.offset), - account.meta.write_version, - )); + match stored_accounts.entry(account.meta.pubkey) { + Entry::Occupied(mut occupied_entry) => { + if account.meta.write_version > occupied_entry.get().write_version { + occupied_entry.insert(FoundStoredAccount { + account: account.clone_account(), + account_hash: *account.hash, + account_size: next - start, + store_id: store.append_vec_id(), + offset: account.offset, + write_version: account.meta.write_version, + }); + } + } + Entry::Vacant(vacant_entry) => { + vacant_entry.insert(FoundStoredAccount { + account: account.clone_account(), + account_hash: *account.hash, + account_size: next - start, + store_id: store.append_vec_id(), + offset: account.offset, + write_version: account.meta.write_version, + }); + } + } start = next; } } let mut index_read_elapsed = Measure::start("index_read_elapsed"); + let mut alive_total = 0; let alive_accounts: Vec<_> = { stored_accounts .iter() - .filter( - |( - pubkey, - _account, - _account_hash, - _storage_size, - (store_id, offset), - _write_version, - )| { - if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None) - { - locked_entry - .slot_list() - .iter() - .any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset) + .filter(|(pubkey, stored_account)| { + let FoundStoredAccount { + account_size, + store_id, + offset, + .. + } = stored_account; + if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None) { + let is_alive = locked_entry + .slot_list() + .iter() + .any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset); + if !is_alive { + // This pubkey was found in the storage, but no longer exists in the index. + // It would have had a ref to the storage from the initial store, but it will + // not exist in the re-written slot. Unref it to keep the index consistent with + // rewriting the storage entries. + locked_entry.unref() } else { - false + alive_total += *account_size as u64; } - }, - ) + is_alive + } else { + false + } + }) .collect() }; index_read_elapsed.stop(); - - let alive_total: u64 = alive_accounts - .iter() - .map( - |(_pubkey, _account, _account_hash, account_size, _location, _write_version)| { - *account_size as u64 - }, - ) - .sum(); let aligned_total: u64 = self.page_align(alive_total); let total_starting_accounts = stored_accounts.len(); @@ -1743,11 +1770,10 @@ impl AccountsDB { let mut hashes = Vec::with_capacity(alive_accounts.len()); let mut write_versions = Vec::with_capacity(alive_accounts.len()); - for (pubkey, account, account_hash, _size, _location, write_version) in &alive_accounts - { - accounts.push((pubkey, account)); - hashes.push(*account_hash); - write_versions.push(*write_version); + for (pubkey, alive_account) in alive_accounts { + accounts.push((pubkey, &alive_account.account)); + hashes.push(alive_account.account_hash); + write_versions.push(alive_account.write_version); } start.stop(); find_alive_elapsed = start.as_us(); @@ -2095,8 +2121,7 @@ impl AccountsDB { // the cache *after* we've finished flushing in `flush_slot_cache`. let storage_maps: Vec> = self .storage - .get_slot_stores(slot) - .map(|res| res.read().unwrap().values().cloned().collect()) + .get_slot_storage_entries(slot) .unwrap_or_default(); self.thread_pool.install(|| { storage_maps @@ -4494,8 +4519,7 @@ impl AccountsDB { } let storage_maps: Vec> = self .storage - .get_slot_stores(*slot) - .map(|res| res.read().unwrap().values().cloned().collect()) + .get_slot_storage_entries(*slot) .unwrap_or_default(); let num_accounts = storage_maps .iter() @@ -4681,9 +4705,17 @@ impl AccountsDB { // Requires all stores in the slot to be re-written otherwise the accounts_index // store ref count could become incorrect. fn do_shrink_slot_v1(&self, slot: Slot, forced: bool) -> usize { + struct FoundStoredAccount { + account: Account, + account_hash: Hash, + account_size: usize, + store_id: AppendVecId, + offset: usize, + write_version: u64, + } trace!("shrink_stale_slot: slot: {}", slot); - let mut stored_accounts = vec![]; + let mut stored_accounts: HashMap = HashMap::new(); let mut storage_read_elapsed = Measure::start("storage_read_elapsed"); { if let Some(stores_lock) = self.storage.get_slot_stores(slot) { @@ -4723,14 +4755,30 @@ impl AccountsDB { for store in stores.values() { let mut start = 0; while let Some((account, next)) = store.accounts.get_account(start) { - stored_accounts.push(( - account.meta.pubkey, - account.clone_account(), - *account.hash, - next - start, - (store.append_vec_id(), account.offset), - account.meta.write_version, - )); + match stored_accounts.entry(account.meta.pubkey) { + Entry::Occupied(mut occupied_entry) => { + if account.meta.write_version > occupied_entry.get().write_version { + occupied_entry.insert(FoundStoredAccount { + account: account.clone_account(), + account_hash: *account.hash, + account_size: next - start, + store_id: store.append_vec_id(), + offset: account.offset, + write_version: account.meta.write_version, + }); + } + } + Entry::Vacant(vacant_entry) => { + vacant_entry.insert(FoundStoredAccount { + account: account.clone_account(), + account_hash: *account.hash, + account_size: next - start, + store_id: store.append_vec_id(), + offset: account.offset, + write_version: account.meta.write_version, + }); + } + } start = next; } } @@ -4739,48 +4787,46 @@ impl AccountsDB { storage_read_elapsed.stop(); let mut index_read_elapsed = Measure::start("index_read_elapsed"); + let mut alive_total = 0; let alive_accounts: Vec<_> = { stored_accounts .iter() - .filter( - |( - pubkey, - _account, - _account_hash, - _storage_size, - (store_id, offset), - _write_version, - )| { - if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None) - { - locked_entry - .slot_list() - .iter() - .any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset) + .filter(|(pubkey, stored_account)| { + let FoundStoredAccount { + account_size, + store_id, + offset, + .. + } = stored_account; + if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None) { + let is_alive = locked_entry + .slot_list() + .iter() + .any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset); + if !is_alive { + // This pubkey was found in the storage, but no longer exists in the index. + // It would have had a ref to the storage from the initial store, but it will + // not exist in the re-written slot. Unref it to keep the index consistent with + // rewriting the storage entries. + locked_entry.unref() } else { - false + alive_total += *account_size as u64; } - }, - ) + is_alive + } else { + false + } + }) .collect() }; index_read_elapsed.stop(); - - let alive_total: u64 = alive_accounts - .iter() - .map( - |(_pubkey, _account, _account_hash, account_size, _location, _write_verion)| { - *account_size as u64 - }, - ) - .sum(); let aligned_total: u64 = self.page_align(alive_total); - + let alive_accounts_len = alive_accounts.len(); debug!( "shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})", slot, stored_accounts.len(), - alive_accounts.len(), + alive_accounts_len, alive_total, aligned_total ); @@ -4793,15 +4839,14 @@ impl AccountsDB { let mut store_accounts_timing = StoreAccountsTiming::default(); if aligned_total > 0 { let mut start = Measure::start("find_alive_elapsed"); - let mut accounts = Vec::with_capacity(alive_accounts.len()); - let mut hashes = Vec::with_capacity(alive_accounts.len()); - let mut write_versions = Vec::with_capacity(alive_accounts.len()); + let mut accounts = Vec::with_capacity(alive_accounts_len); + let mut hashes = Vec::with_capacity(alive_accounts_len); + let mut write_versions = Vec::with_capacity(alive_accounts_len); - for (pubkey, account, account_hash, _size, _location, write_version) in &alive_accounts - { - accounts.push((pubkey, account)); - hashes.push(*account_hash); - write_versions.push(*write_version); + for (pubkey, alive_account) in alive_accounts { + accounts.push((pubkey, &alive_account.account)); + hashes.push(alive_account.account_hash); + write_versions.push(alive_account.write_version); } start.stop(); find_alive_elapsed = start.as_us(); @@ -4910,7 +4955,7 @@ impl AccountsDB { .fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed); self.shrink_stats.report(); - alive_accounts.len() + alive_accounts_len } fn do_reset_uncleaned_roots_v1( @@ -8410,11 +8455,9 @@ pub mod tests { } fn slot_stores(db: &AccountsDB, slot: Slot) -> Vec> { - if let Some(x) = db.storage.get_slot_stores(slot) { - x.read().unwrap().values().cloned().collect() - } else { - vec![] - } + db.storage + .get_slot_storage_entries(slot) + .unwrap_or_default() } #[test] @@ -8610,8 +8653,7 @@ pub mod tests { let mut storage_maps: Vec> = accounts_db .storage - .get_slot_stores(slot) - .map(|res| res.read().unwrap().values().cloned().collect()) + .get_slot_storage_entries(slot) .unwrap_or_default(); // Flushing cache should only create one storage entry @@ -9024,6 +9066,90 @@ pub mod tests { run_flush_rooted_accounts_cache(false); } + fn run_test_shrink_unref(do_intra_cache_clean: bool) { + // Enable caching so that we use the straightforward implementation + // of shrink that will shrink all candidate slots + let caching_enabled = true; + let db = AccountsDB::new_with_config( + Vec::new(), + &ClusterType::Development, + HashSet::default(), + caching_enabled, + ); + let account_key1 = Pubkey::new_unique(); + let account_key2 = Pubkey::new_unique(); + let account1 = Account::new(1, 0, &Account::default().owner); + + // Store into slot 0 + db.store_cached(0, &[(&account_key1, &account1)]); + db.store_cached(0, &[(&account_key2, &account1)]); + db.add_root(0); + if !do_intra_cache_clean { + // If we don't want the cache doing purges before flush, + // then we cannot flush multiple roots at once, otherwise the later + // roots will clean the earlier roots before they are stored. + // Thus flush the roots individually + db.flush_accounts_cache(true, None); + + // Add an additional ref within the same slot to pubkey 1 + db.store_uncached(0, &[(&account_key1, &account1)]); + } + + // Make account_key1 in slot 0 outdated by updating in rooted slot 1 + db.store_cached(1, &[(&account_key1, &account1)]); + db.add_root(1); + // Flushes all roots + db.flush_accounts_cache(true, None); + db.get_accounts_delta_hash(0); + db.get_accounts_delta_hash(1); + + // Clean to remove outdated entry from slot 0 + db.clean_accounts(Some(1)); + + // Shrink Slot 0 + let mut slot0_stores = db.storage.get_slot_storage_entries(0).unwrap(); + assert_eq!(slot0_stores.len(), 1); + let slot0_store = slot0_stores.pop().unwrap(); + { + let mut shrink_candidate_slots = db.shrink_candidate_slots.lock().unwrap(); + shrink_candidate_slots + .entry(0) + .or_default() + .insert(slot0_store.append_vec_id(), slot0_store); + } + db.shrink_candidate_slots(); + + // Make slot 0 dead by updating the remaining key + db.store_cached(2, &[(&account_key2, &account1)]); + db.add_root(2); + + // Flushes all roots + db.flush_accounts_cache(true, None); + + // Should be one store before clean for slot 0 + assert_eq!(db.storage.get_slot_storage_entries(0).unwrap().len(), 1); + db.get_accounts_delta_hash(2); + db.clean_accounts(Some(2)); + + // No stores should exist for slot 0 after clean + assert!(db.storage.get_slot_storage_entries(0).is_none()); + + // Ref count for `account_key1` (account removed earlier by shrink) + // should be 1, since it was only stored in slot 0 and 1, and slot 0 + // is now dead + assert_eq!(db.accounts_index.ref_count_from_storage(&account_key1), 1); + } + + #[test] + fn test_shrink_unref() { + run_test_shrink_unref(false) + } + + #[test] + fn test_shrink_unref_with_intra_slot_cleaning() { + run_test_shrink_unref(true) + } + #[test] fn test_partial_clean() { solana_logger::setup(); diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index de453fb2e4..5143a2896b 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -101,6 +101,10 @@ impl ReadAccountMapEntry { pub fn ref_count(&self) -> &AtomicU64 { &self.borrow_owned_entry_contents().ref_count } + + pub fn unref(&self) { + self.ref_count().fetch_sub(1, Ordering::Relaxed); + } } #[self_referencing] @@ -110,7 +114,7 @@ pub struct WriteAccountMapEntry { slot_list_guard: RwLockWriteGuard<'this, SlotList>, } -impl WriteAccountMapEntry { +impl WriteAccountMapEntry { pub fn from_account_map_entry(account_map_entry: AccountMapEntry) -> Self { WriteAccountMapEntryBuilder { owned_entry: account_map_entry, @@ -147,12 +151,18 @@ impl WriteAccountMapEntry { .collect(); assert!(same_slot_previous_updates.len() <= 1); if let Some((list_index, (s, previous_update_value))) = same_slot_previous_updates.pop() { + let is_flush_from_cache = + previous_update_value.is_cached() && !account_info.is_cached(); reclaims.push((*s, previous_update_value.clone())); self.slot_list_mut(|list| list.remove(list_index)); - } else { - // Only increment ref count if the account was not prevously updated in this slot + if is_flush_from_cache { + self.ref_count().fetch_add(1, Ordering::Relaxed); + } + } else if !account_info.is_cached() { + // If it's the first non-cache insert, also bump the stored ref count self.ref_count().fetch_add(1, Ordering::Relaxed); } + self.slot_list_mut(|list| list.push((slot, account_info))); } } @@ -915,7 +925,7 @@ impl AccountsIndex { pub fn unref_from_storage(&self, pubkey: &Pubkey) { if let Some(locked_entry) = self.get_account_read_entry(pubkey) { - locked_entry.ref_count().fetch_sub(1, Ordering::Relaxed); + locked_entry.unref(); } }