From 43053dcc900089b9e5963bc5b8d42e61891f9d3f Mon Sep 17 00:00:00 2001 From: sakridge Date: Wed, 4 Nov 2020 09:17:05 -0800 Subject: [PATCH] Re-use accounts stores (#12885) * Re-use accounts_db stores Creating files and dropping mmap areas can be expensive * Add test for storage finder Can encounter an infinite loop when the store is too small, but smaller than the normal store size. * Fix storage finding * Check for strong_count == 1 * try_recycle helper --- runtime/src/accounts_db.rs | 503 +++++++++++++++++++++++---- runtime/src/bank.rs | 1 + runtime/src/serde_snapshot.rs | 9 +- runtime/src/serde_snapshot/future.rs | 4 +- runtime/src/serde_snapshot/tests.rs | 9 +- runtime/src/snapshot_utils.rs | 10 +- 6 files changed, 459 insertions(+), 77 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 3f1a6a18a3..25fc8deb8a 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -56,6 +56,8 @@ const PAGE_SIZE: u64 = 4 * 1024; pub const DEFAULT_FILE_SIZE: u64 = PAGE_SIZE * 1024; pub const DEFAULT_NUM_THREADS: u32 = 8; pub const DEFAULT_NUM_DIRS: u32 = 4; +const MAX_RECYCLE_STORES: usize = 5000; +const STORE_META_OVERHEAD: usize = 256; lazy_static! { // FROZEN_ACCOUNT_PANIC is used to signal local_cluster that an AccountsDB panic has occurred, @@ -172,9 +174,9 @@ pub enum BankHashVerificationError { /// Persistent storage structure holding the accounts #[derive(Debug)] pub struct AccountStorageEntry { - pub(crate) id: AppendVecId, + pub(crate) id: AtomicUsize, - pub(crate) slot: Slot, + pub(crate) slot: AtomicU64, /// storage holding the accounts pub(crate) accounts: AppendVec, @@ -202,8 +204,8 @@ impl AccountStorageEntry { let accounts = AppendVec::new(&path, true, file_size as usize); Self { - id, - slot, + id: AtomicUsize::new(id), + slot: AtomicU64::new(slot), accounts, count_and_status: RwLock::new((0, AccountStorageStatus::Available)), approx_store_count: AtomicUsize::new(0), @@ -212,8 +214,8 @@ impl AccountStorageEntry { pub(crate) fn new_empty_map(id: AppendVecId, accounts_current_len: usize) -> Self { Self { - id, - slot: 0, + id: AtomicUsize::new(id), + slot: AtomicU64::new(0), accounts: AppendVec::new_empty_map(accounts_current_len), count_and_status: RwLock::new((0, AccountStorageStatus::Available)), approx_store_count: AtomicUsize::new(0), @@ -241,6 +243,15 @@ impl AccountStorageEntry { *count_and_status = (count, status); } + pub fn recycle(&self, slot: Slot, id: usize) { + let mut count_and_status = self.count_and_status.write().unwrap(); + self.accounts.reset(); + *count_and_status = (0, AccountStorageStatus::Available); + self.slot.store(slot, Ordering::Release); + self.id.store(id, Ordering::Relaxed); + self.approx_store_count.store(0, Ordering::Relaxed); + } + pub fn status(&self) -> AccountStorageStatus { self.count_and_status.read().unwrap().1 } @@ -258,11 +269,11 @@ impl AccountStorageEntry { } pub fn slot(&self) -> Slot { - self.slot + self.slot.load(Ordering::Acquire) } pub fn append_vec_id(&self) -> AppendVecId { - self.id + self.id.load(Ordering::Relaxed) } pub fn flush(&self) -> Result<(), IOError> { @@ -321,8 +332,8 @@ impl AccountStorageEntry { assert!( count > 0, "double remove of account in slot: {}/store: {}!!", - self.slot, - self.id + self.slot(), + self.append_vec_id(), ); count -= 1; @@ -405,6 +416,8 @@ pub struct AccountsDB { pub storage: AccountStorage, + recycle_stores: RwLock>>, + /// distribute the accounts across storage lists pub next_id: AtomicUsize, pub shrink_candidate_slots: Mutex>, @@ -455,6 +468,11 @@ struct AccountsStats { store_find_store: AtomicU64, store_num_accounts: AtomicU64, store_total_data: AtomicU64, + recycle_store_count: AtomicU64, + create_store_count: AtomicU64, + store_get_slot_store: AtomicU64, + store_find_existing: AtomicU64, + dropped_stores: AtomicU64, } fn make_min_priority_thread_pool() -> ThreadPool { @@ -491,6 +509,7 @@ impl Default for AccountsDB { AccountsDB { accounts_index: AccountsIndex::default(), storage: AccountStorage(DashMap::new()), + recycle_stores: RwLock::new(Vec::new()), next_id: AtomicUsize::new(0), shrink_candidate_slots: Mutex::new(Vec::new()), write_version: AtomicU64::new(0), @@ -1009,7 +1028,7 @@ impl AccountsDB { account.clone_account(), *account.hash, next - start, - (store.id, account.offset), + (store.append_vec_id(), account.offset), account.meta.write_version, )); start = next; @@ -1055,7 +1074,7 @@ impl AccountsDB { }, ) .sum(); - let aligned_total: u64 = (alive_total + (PAGE_SIZE - 1)) & !(PAGE_SIZE - 1); + let aligned_total: u64 = self.page_align(alive_total); debug!( "shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})", @@ -1090,7 +1109,13 @@ impl AccountsDB { find_alive_elapsed = start.as_us(); let mut start = Measure::start("create_and_insert_store_elapsed"); - let shrunken_store = self.create_and_insert_store(slot, aligned_total); + let shrunken_store = if let Some(new_store) = + self.try_recycle_and_insert_store(slot, aligned_total, aligned_total + 1024) + { + new_store + } else { + self.create_and_insert_store(slot, aligned_total, "shrink") + }; start.stop(); create_and_insert_store_elapsed = start.as_us(); @@ -1102,7 +1127,7 @@ impl AccountsDB { slot, &accounts, &hashes, - |_| shrunken_store.clone(), + |_, _| shrunken_store.clone(), write_versions.into_iter(), ); start.stop(); @@ -1132,8 +1157,21 @@ impl AccountsDB { } rewrite_elapsed.stop(); + let mut recycle_stores_write_time = Measure::start("recycle_stores_write_time"); + let mut recycle_stores = self.recycle_stores.write().unwrap(); + recycle_stores_write_time.stop(); + let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed"); - drop(dead_storages); + if recycle_stores.len() < MAX_RECYCLE_STORES { + recycle_stores.extend(dead_storages); + drop(recycle_stores); + } else { + self.stats + .dropped_stores + .fetch_add(recycle_stores.len() as u64, Ordering::Relaxed); + drop(recycle_stores); + drop(dead_storages); + } drop_storage_entries_elapsed.stop(); datapoint_info!( @@ -1156,6 +1194,11 @@ impl AccountsDB { drop_storage_entries_elapsed.as_us(), i64 ), + ( + "recycle_stores_write_time", + recycle_stores_write_time.as_us(), + i64 + ), ); alive_accounts.len() } @@ -1287,7 +1330,7 @@ impl AccountsDB { let accounts = storage.accounts.accounts(0); let mut retval = B::default(); accounts.iter().for_each(|stored_account| { - scan_func(stored_account, storage.id, &mut retval) + scan_func(stored_account, storage.append_vec_id(), &mut retval) }); retval }) @@ -1381,9 +1424,69 @@ impl AccountsDB { .and_then(|account_storage_entry| account_storage_entry.get_account(account_info)) } - fn find_storage_candidate(&self, slot: Slot) -> Arc { + fn try_recycle_and_insert_store( + &self, + slot: Slot, + min_size: u64, + max_size: u64, + ) -> Option> { + let store = self.try_recycle_store(slot, min_size, max_size)?; + self.insert_store(slot, store.clone()); + Some(store) + } + + fn try_recycle_store( + &self, + slot: Slot, + min_size: u64, + max_size: u64, + ) -> Option> { + let mut max = 0; + let mut min = std::u64::MAX; + let mut avail = 0; + let mut recycle_stores = self.recycle_stores.write().unwrap(); + for (i, store) in recycle_stores.iter().enumerate() { + if Arc::strong_count(store) == 1 { + max = std::cmp::max(store.accounts.capacity(), max); + min = std::cmp::min(store.accounts.capacity(), min); + avail += 1; + + if store.accounts.capacity() >= min_size && store.accounts.capacity() < max_size { + let ret = recycle_stores.swap_remove(i); + drop(recycle_stores); + let old_id = ret.append_vec_id(); + ret.recycle(slot, self.next_id.fetch_add(1, Ordering::Relaxed)); + debug!( + "recycling store: {} {:?} old_id: {}", + ret.append_vec_id(), + ret.get_path(), + old_id + ); + return Some(ret); + } + } + } + debug!( + "no recycle stores max: {} min: {} len: {} looking: {}, {} avail: {}", + max, + min, + recycle_stores.len(), + min_size, + max_size, + avail, + ); + None + } + + fn find_storage_candidate(&self, slot: Slot, size: usize) -> Arc { let mut create_extra = false; + let mut get_slot_stores = Measure::start("get_slot_stores"); let slot_stores_lock = self.storage.get_slot_stores(slot); + get_slot_stores.stop(); + self.stats + .store_get_slot_store + .fetch_add(get_slot_stores.as_us(), Ordering::Relaxed); + let mut find_existing = Measure::start("find_existing"); if let Some(slot_stores_lock) = slot_stores_lock { let slot_stores = slot_stores_lock.read().unwrap(); if !slot_stores.is_empty() { @@ -1407,8 +1510,24 @@ impl AccountsDB { let ret = store.clone(); drop(slot_stores); if create_extra { - self.create_and_insert_store(slot, self.file_size); + if self + .try_recycle_and_insert_store(slot, size as u64, std::u64::MAX) + .is_none() + { + self.stats + .create_store_count + .fetch_add(1, Ordering::Relaxed); + self.create_and_insert_store(slot, self.file_size, "store extra"); + } else { + self.stats + .recycle_store_count + .fetch_add(1, Ordering::Relaxed); + } } + find_existing.stop(); + self.stats + .store_find_existing + .fetch_add(find_existing.as_us(), Ordering::Relaxed); return ret; } // looked at every store, bail... @@ -1418,18 +1537,84 @@ impl AccountsDB { } } } + find_existing.stop(); + self.stats + .store_find_existing + .fetch_add(find_existing.as_us(), Ordering::Relaxed); - let store = self.create_and_insert_store(slot, self.file_size); - store.try_available(); + let store = if let Some(store) = self.try_recycle_store(slot, size as u64, std::u64::MAX) { + self.stats + .recycle_store_count + .fetch_add(1, Ordering::Relaxed); + store + } else { + self.stats + .create_store_count + .fetch_add(1, Ordering::Relaxed); + self.create_store(slot, self.file_size, "store") + }; + + // try_available is like taking a lock on the store, + // preventing other threads from using it. + // It must succeed here and happen before insert, + // otherwise another thread could also grab it from the index. + assert!(store.try_available()); + self.insert_store(slot, store.clone()); store } - fn create_and_insert_store(&self, slot: Slot, size: u64) -> Arc { + fn page_align(&self, size: u64) -> u64 { + (size + (PAGE_SIZE - 1)) & !(PAGE_SIZE - 1) + } + + fn has_space_available(&self, slot: Slot, size: u64) -> bool { + let slot_storage = self.storage.get_slot_stores(slot).unwrap(); + let slot_storage_r = slot_storage.read().unwrap(); + for (_id, store) in slot_storage_r.iter() { + if store.status() == AccountStorageStatus::Available + && (store.accounts.capacity() - store.accounts.len() as u64) > size + { + return true; + } + } + false + } + + fn create_store(&self, slot: Slot, size: u64, from: &str) -> Arc { let path_index = thread_rng().gen_range(0, self.paths.len()); - let store = - Arc::new(self.new_storage_entry(slot, &Path::new(&self.paths[path_index]), size)); + let store = Arc::new(self.new_storage_entry( + slot, + &Path::new(&self.paths[path_index]), + self.page_align(size), + )); + + debug!( + "creating store: {} slot: {} len: {} size: {} from: {} path: {:?}", + store.append_vec_id(), + slot, + store.accounts.len(), + store.accounts.capacity(), + from, + store.accounts.get_path() + ); + + store + } + + fn create_and_insert_store( + &self, + slot: Slot, + size: u64, + from: &str, + ) -> Arc { + let store = self.create_store(slot, size, from); let store_for_index = store.clone(); + self.insert_store(slot, store_for_index); + store + } + + fn insert_store(&self, slot: Slot, store: Arc) { let slot_storages: SlotStores = self.storage.get_slot_stores(slot).unwrap_or_else(|| // DashMap entry.or_insert() returns a RefMut, essentially a write lock, // which is dropped after this block ends, minimizing time held by the lock. @@ -1444,8 +1629,7 @@ impl AccountsDB { slot_storages .write() .unwrap() - .insert(store.id, store_for_index); - store + .insert(store.append_vec_id(), store); } pub fn purge_slot(&self, slot: Slot) { @@ -1454,6 +1638,34 @@ impl AccountsDB { self.purge_slots(&slots); } + fn recycle_slot_stores( + &self, + total_removed_storage_entries: usize, + slot_stores: &[SlotStores], + ) -> u64 { + let mut recycled_count = 0; + + let mut recycle_stores_write_time = Measure::start("recycle_stores_write_time"); + let mut recycle_stores = self.recycle_stores.write().unwrap(); + recycle_stores_write_time.stop(); + + for slot_entries in slot_stores { + let entry = slot_entries.read().unwrap(); + for (_store_id, stores) in entry.iter() { + if recycle_stores.len() > MAX_RECYCLE_STORES { + let dropped_count = total_removed_storage_entries - recycled_count; + self.stats + .dropped_stores + .fetch_add(dropped_count as u64, Ordering::Relaxed); + return recycle_stores_write_time.as_us(); + } + recycle_stores.push(stores.clone()); + recycled_count += 1; + } + } + recycle_stores_write_time.as_us() + } + fn purge_slots(&self, slots: &HashSet) { //add_root should be called first let non_roots: Vec<_> = slots @@ -1475,13 +1687,16 @@ impl AccountsDB { .map(|i| i.accounts.capacity()) .sum::(); } - all_removed_slot_storages.push(slot_removed_storages); + all_removed_slot_storages.push(slot_removed_storages.clone()); } } remove_storages_elapsed.stop(); let num_slots_removed = all_removed_slot_storages.len(); + let recycle_stores_write_time = + self.recycle_slot_stores(total_removed_storage_entries, &all_removed_slot_storages); + let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed"); // Backing mmaps for removed storages entries explicitly dropped here outside // of any locks @@ -1507,6 +1722,11 @@ impl AccountsDB { i64 ), ("total_removed_bytes", total_removed_bytes, i64), + ( + "recycle_stores_write_elapsed", + recycle_stores_write_time, + i64 + ), ); } @@ -1737,7 +1957,7 @@ impl AccountsDB { Some(ret) }); - let storage_finder = |slot| self.find_storage_candidate(slot); + let storage_finder = |slot, size| self.find_storage_candidate(slot, size); self.store_accounts_to( slot, accounts, @@ -1747,7 +1967,10 @@ impl AccountsDB { ) } - fn store_accounts_to Arc, P: Iterator>( + fn store_accounts_to< + F: FnMut(Slot, usize) -> Arc, + P: Iterator, + >( &self, slot: Slot, accounts: &[(&Pubkey, &Account)], @@ -1779,7 +2002,10 @@ impl AccountsDB { let mut total_storage_find_us = 0; while infos.len() < with_meta.len() { let mut storage_find = Measure::start("storage_finder"); - let storage = storage_finder(slot); + let storage = storage_finder( + slot, + with_meta[infos.len()].1.data.len() + STORE_META_OVERHEAD, + ); storage_find.stop(); total_storage_find_us += storage_find.as_us(); let mut append_accounts = Measure::start("append_accounts"); @@ -1791,17 +2017,30 @@ impl AccountsDB { if rvs.is_empty() { storage.set_status(AccountStorageStatus::Full); - // See if an account overflows the default append vec size. - let data_len = (with_meta[infos.len()].1.data.len() + 4096) as u64; - if data_len > self.file_size { - self.create_and_insert_store(slot, data_len * 2); + // See if an account overflows the append vecs in the slot. + let data_len = (with_meta[infos.len()].1.data.len() + STORE_META_OVERHEAD) as u64; + if !self.has_space_available(slot, data_len) { + let special_store_size = std::cmp::max(data_len * 2, self.file_size); + if self + .try_recycle_and_insert_store(slot, special_store_size, std::u64::MAX) + .is_none() + { + self.stats + .create_store_count + .fetch_add(1, Ordering::Relaxed); + self.create_and_insert_store(slot, special_store_size, "large create"); + } else { + self.stats + .recycle_store_count + .fetch_add(1, Ordering::Relaxed); + } } continue; } for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) { storage.add_account(); infos.push(AccountInfo { - store_id: storage.id, + store_id: storage.append_vec_id(), offset: *offset, lamports: account.lamports, }); @@ -1850,7 +2089,15 @@ impl AccountsDB { } info!("total_stores: {}, newest_slot: {}, oldest_slot: {}, max_slot: {} (num={}), min_slot: {} (num={})", total_count, newest_slot, oldest_slot, max_slot, max, min_slot, min); - datapoint_info!("accounts_db-stores", ("total_count", total_count, i64)); + datapoint_info!( + "accounts_db-stores", + ("total_count", total_count, i64), + ( + "recycle_count", + self.recycle_stores.read().unwrap().len() as u64, + i64 + ), + ); datapoint_info!( "accounts_db-perf-stats", ( @@ -1911,26 +2158,41 @@ impl AccountsDB { hasher.result() } - fn accumulate_account_hashes(hashes: Vec<(Pubkey, Hash, u64)>) -> Hash { - let (hash, ..) = Self::do_accumulate_account_hashes_and_capitalization(hashes, false); + fn accumulate_account_hashes( + hashes: Vec<(Pubkey, Hash, u64)>, + slot: Slot, + debug: bool, + ) -> Hash { + let (hash, ..) = + Self::do_accumulate_account_hashes_and_capitalization(hashes, false, slot, debug); hash } fn accumulate_account_hashes_and_capitalization( hashes: Vec<(Pubkey, Hash, u64)>, + slot: Slot, + debug: bool, ) -> (Hash, u64) { - let (hash, cap) = Self::do_accumulate_account_hashes_and_capitalization(hashes, true); + let (hash, cap) = + Self::do_accumulate_account_hashes_and_capitalization(hashes, true, slot, debug); (hash, cap.unwrap()) } fn do_accumulate_account_hashes_and_capitalization( mut hashes: Vec<(Pubkey, Hash, u64)>, calculate_cap: bool, + slot: Slot, + debug: bool, ) -> (Hash, Option) { let mut sort_time = Measure::start("sort"); hashes.par_sort_by(|a, b| a.0.cmp(&b.0)); sort_time.stop(); + if debug { + for (key, hash, _lamports) in &hashes { + info!("slot: {} key {} hash {}", slot, key, hash); + } + } let mut sum_time = Measure::start("cap"); let cap = if calculate_cap { Some(Self::checked_sum_for_capitalization( @@ -2049,7 +2311,7 @@ impl AccountsDB { let mut accumulate = Measure::start("accumulate"); let (accumulated_hash, total_lamports) = - Self::accumulate_account_hashes_and_capitalization(hashes); + Self::accumulate_account_hashes_and_capitalization(hashes, slot, false); accumulate.stop(); datapoint_info!( "update_accounts_hash", @@ -2136,7 +2398,7 @@ impl AccountsDB { .into_iter() .map(|(pubkey, (_, hash))| (pubkey, hash, 0)) .collect(); - let ret = Self::accumulate_account_hashes(hashes); + let ret = Self::accumulate_account_hashes(hashes, slot, false); accumulate.stop(); self.stats .delta_hash_scan_time_total_us @@ -2188,9 +2450,9 @@ impl AccountsDB { .get_account_storage_entry(*slot, account_info.store_id) { assert_eq!( - *slot, store.slot, + *slot, store.slot(), "AccountDB::accounts_index corrupted. Storage pointed to: {}, expected: {}, should only point to one slot", - store.slot, *slot + store.slot(), *slot ); let count = store.remove_account(); if count == 0 { @@ -2236,7 +2498,7 @@ impl AccountsDB { let accounts = store.accounts.accounts(0); accounts .into_iter() - .map(|account| (store.slot, account.meta.pubkey)) + .map(|account| (store.slot(), account.meta.pubkey)) .collect::>() }) .reduce(HashSet::new, |mut reduced, store_pubkeys| { @@ -2346,6 +2608,11 @@ impl AccountsDB { /// Store the account update. pub fn store(&self, slot: Slot, accounts: &[(&Pubkey, &Account)]) { + // If all transactions in a batch are errored, + // it's possible to get a store with no accounts. + if accounts.is_empty() { + return; + } self.assert_frozen_accounts(accounts); let mut hash_time = Measure::start("hash_accounts"); let hashes = self.hash_accounts( @@ -2416,6 +2683,35 @@ impl AccountsDB { i64 ), ); + + datapoint_info!( + "accounts_db_store_timings2", + ( + "recycle_store_count", + self.stats.recycle_store_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "create_store_count", + self.stats.create_store_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "store_get_slot_store", + self.stats.store_get_slot_store.swap(0, Ordering::Relaxed), + i64 + ), + ( + "store_find_existing", + self.stats.store_find_existing.swap(0, Ordering::Relaxed), + i64 + ), + ( + "dropped_stores", + self.stats.dropped_stores.swap(0, Ordering::Relaxed), + i64 + ), + ); } } @@ -2581,12 +2877,25 @@ impl AccountsDB { } } - pub(crate) fn print_accounts_stats(&self, label: &'static str) { + pub(crate) fn print_accounts_stats(&self, label: &str) { self.print_index(label); self.print_count_and_status(label); + info!("recycle_stores:"); + let recycle_stores = self.recycle_stores.read().unwrap(); + for entry in recycle_stores.iter() { + info!( + " slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {}", + entry.slot(), + entry.append_vec_id(), + *entry.count_and_status.read().unwrap(), + entry.approx_store_count.load(Ordering::Relaxed), + entry.accounts.len(), + entry.accounts.capacity(), + ); + } } - fn print_index(&self, label: &'static str) { + fn print_index(&self, label: &str) { let mut roots: Vec<_> = self.accounts_index.all_roots(); roots.sort(); info!("{}: accounts_index roots: {:?}", label, roots,); @@ -2599,7 +2908,7 @@ impl AccountsDB { } } - fn print_count_and_status(&self, label: &'static str) { + fn print_count_and_status(&self, label: &str) { let mut slots: Vec<_> = self.storage.all_slots(); slots.sort(); info!("{}: count_and status for {} slots:", label, slots.len()); @@ -2903,6 +3212,7 @@ pub mod tests { #[test] fn test_remove_unrooted_slot_snapshot() { + solana_logger::setup(); let unrooted_slot = 9; let db = AccountsDB::new(Vec::new(), &ClusterType::Development); let key = solana_sdk::pubkey::new_rand(); @@ -3097,7 +3407,7 @@ pub mod tests { all_storages.extend(slot_storage.read().unwrap().values().cloned()) } for storage in all_storages { - *append_vec_histogram.entry(storage.slot).or_insert(0) += 1; + *append_vec_histogram.entry(storage.slot()).or_insert(0) += 1; } for count in append_vec_histogram.values() { assert!(*count >= 2); @@ -3108,7 +3418,6 @@ pub mod tests { fn test_account_grow() { let accounts = AccountsDB::new_single(); - let count = [0, 1]; let status = [AccountStorageStatus::Available, AccountStorageStatus::Full]; let pubkey1 = solana_sdk::pubkey::new_rand(); let account1 = Account::new(1, DEFAULT_FILE_SIZE as usize / 2, &pubkey1); @@ -3145,20 +3454,14 @@ pub mod tests { ); // lots of stores, but 3 storages should be enough for everything - for i in 0..25 { - let index = i % 2; + for _ in 0..25 { accounts.store(0, &[(&pubkey1, &account1)]); { assert_eq!(accounts.storage.0.len(), 1); let stores = &accounts.storage.get_slot_stores(0).unwrap(); let r_stores = stores.read().unwrap(); - assert_eq!(r_stores.len(), 3); - assert_eq!(r_stores[&0].count(), count[index]); + assert!(r_stores.len() <= 5); assert_eq!(r_stores[&0].status(), status[0]); - assert_eq!(r_stores[&1].count(), 1); - assert_eq!(r_stores[&1].status(), status[1]); - assert_eq!(r_stores[&2].count(), count[index ^ 1]); - assert_eq!(r_stores[&2].status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!( @@ -4083,11 +4386,10 @@ pub mod tests { let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value()); for storage in storage_entries.iter().flatten() { let storage_path = storage.get_path(); - let output_path = output_dir.as_ref().join( - storage_path - .file_name() - .expect("Invalid AppendVec file path"), - ); + let output_path = output_dir.as_ref().join(AppendVec::new_relative_path( + storage.slot(), + storage.append_vec_id(), + )); fs::copy(storage_path, output_path)?; } @@ -4497,6 +4799,19 @@ pub mod tests { } } + #[test] + fn test_storage_finder() { + solana_logger::setup(); + let db = AccountsDB::new_sized(Vec::new(), 16 * 1024); + let key = solana_sdk::pubkey::new_rand(); + let lamports = 100; + let data_len = 8190; + let account = Account::new(lamports, data_len, &solana_sdk::pubkey::new_rand()); + // pre-populate with a smaller empty store + db.create_and_insert_store(1, 8192, "test_storage_finder"); + db.store(1, &[(&key, &account)]); + } + #[test] fn test_get_snapshot_storages_empty() { let db = AccountsDB::new(Vec::new(), &ClusterType::Development); @@ -5176,4 +5491,70 @@ pub mod tests { 3 ); } + + #[test] + fn test_store_overhead() { + solana_logger::setup(); + let accounts = AccountsDB::new_single(); + let account = Account::default(); + let pubkey = solana_sdk::pubkey::new_rand(); + accounts.store(0, &[(&pubkey, &account)]); + let slot_stores = accounts.storage.get_slot_stores(0).unwrap(); + let mut total_len = 0; + for (_id, store) in slot_stores.read().unwrap().iter() { + total_len += store.accounts.len(); + } + info!("total: {}", total_len); + assert!(total_len < STORE_META_OVERHEAD); + } + + #[test] + fn test_store_reuse() { + solana_logger::setup(); + let accounts = AccountsDB::new_sized(vec![], 4096); + + let size = 100; + let num_accounts: usize = 100; + let mut keys = Vec::new(); + for i in 0..num_accounts { + let account = Account::new((i + 1) as u64, size, &Pubkey::default()); + let pubkey = solana_sdk::pubkey::new_rand(); + accounts.store(0, &[(&pubkey, &account)]); + keys.push(pubkey); + } + accounts.add_root(0); + + for (i, key) in keys[1..].iter().enumerate() { + let account = Account::new((1 + i + num_accounts) as u64, size, &Pubkey::default()); + accounts.store(1, &[(key, &account)]); + } + accounts.add_root(1); + accounts.clean_accounts(None); + accounts.shrink_all_slots(); + accounts.print_accounts_stats("post-shrink"); + let num_stores = accounts.recycle_stores.read().unwrap().len(); + assert!(num_stores > 0); + + let mut account_refs = Vec::new(); + let num_to_store = 20; + for (i, key) in keys[..num_to_store].iter().enumerate() { + let account = Account::new( + (1 + i + 2 * num_accounts) as u64, + i + 20, + &Pubkey::default(), + ); + accounts.store(2, &[(key, &account)]); + account_refs.push(account); + } + assert!(accounts.recycle_stores.read().unwrap().len() < num_stores); + + accounts.print_accounts_stats("post-store"); + + let mut ancestors = HashMap::new(); + ancestors.insert(1, 0); + ancestors.insert(2, 1); + for (key, account_ref) in keys[..num_to_store].iter().zip(account_refs) { + assert_eq!(accounts.load_slow(&ancestors, key).unwrap().0, account_ref); + } + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index bef149a850..a6466f7b7c 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -7732,6 +7732,7 @@ mod tests { #[test] fn test_status_cache_ancestors() { + solana_logger::setup(); let (genesis_config, _mint_keypair) = create_genesis_config(500); let parent = Arc::new(Bank::new(&genesis_config)); let bank1 = Arc::new(new_from_parent(&parent)); diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 7bce4531e2..b388b7c8ac 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -275,9 +275,9 @@ where for (slot, entries) in storage.into_iter() { let sub_map = map.entry(slot).or_insert_with(HashMap::new); for entry in entries.into_iter() { - let mut entry: AccountStorageEntry = entry.into(); - entry.slot = slot; - sub_map.insert(entry.id, Arc::new(entry)); + let entry: AccountStorageEntry = entry.into(); + entry.slot.store(slot, Ordering::Relaxed); + sub_map.insert(entry.append_vec_id(), Arc::new(entry)); } } map @@ -306,7 +306,8 @@ where // Move the corresponding AppendVec from the snapshot into the directory pointed // at by `local_dir` - let append_vec_relative_path = AppendVec::new_relative_path(slot, storage_entry.id); + let append_vec_relative_path = + AppendVec::new_relative_path(slot, storage_entry.append_vec_id()); let append_vec_abs_path = stream_append_vecs_path .as_ref() .join(&append_vec_relative_path); diff --git a/runtime/src/serde_snapshot/future.rs b/runtime/src/serde_snapshot/future.rs index effa073a60..837efb8ba3 100644 --- a/runtime/src/serde_snapshot/future.rs +++ b/runtime/src/serde_snapshot/future.rs @@ -18,7 +18,7 @@ impl solana_frozen_abi::abi_example::IgnoreAsHelper for SerializableAccountStora impl From<&AccountStorageEntry> for SerializableAccountStorageEntry { fn from(rhs: &AccountStorageEntry) -> Self { Self { - id: rhs.id, + id: rhs.append_vec_id(), accounts_current_len: rhs.accounts.len(), } } @@ -235,7 +235,7 @@ impl<'a> TypeContext<'a> for Context { serialize_iter_as_map(serializable_db.account_storage_entries.iter().map(|x| { *entry_count.borrow_mut() += x.len(); ( - x.first().unwrap().slot, + x.first().unwrap().slot(), serialize_iter_as_seq( x.iter() .map(|x| Self::SerializableAccountStorageEntry::from(x.as_ref())), diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index 9b623bbf3b..eeb633ba38 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -27,11 +27,10 @@ fn copy_append_vecs>( let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value()); for storage in storage_entries.iter().flatten() { let storage_path = storage.get_path(); - let output_path = output_dir.as_ref().join( - storage_path - .file_name() - .expect("Invalid AppendVec file path"), - ); + let output_path = output_dir.as_ref().join(AppendVec::new_relative_path( + storage.slot(), + storage.append_vec_id(), + )); std::fs::copy(storage_path, output_path)?; } diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 9eaeed8e04..59a9a31af4 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -250,11 +250,11 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<() for storage in snapshot_package.storages.iter().flatten() { storage.flush()?; let storage_path = storage.get_path(); - let output_path = staging_accounts_dir.join( - storage_path - .file_name() - .expect("Invalid AppendVec file path"), - ); + let output_path = + staging_accounts_dir.join(crate::append_vec::AppendVec::new_relative_path( + storage.slot(), + storage.append_vec_id(), + )); // `storage_path` - The file path where the AppendVec itself is located // `output_path` - The directory where the AppendVec will be placed in the staging directory.