From e4dba03e12035d92ad3362aced94d3979a43cc7e Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Sun, 3 Mar 2019 16:04:04 -0800 Subject: [PATCH] accounts shedding (#3078) * accounts shedding * fixup --- benches/{appendvec.rs => append_vec.rs} | 28 +-- runtime/src/accounts.rs | 247 +++++++++++--------- runtime/src/{appendvec.rs => append_vec.rs} | 0 runtime/src/lib.rs | 2 +- 4 files changed, 157 insertions(+), 120 deletions(-) rename benches/{appendvec.rs => append_vec.rs} (89%) rename runtime/src/{appendvec.rs => append_vec.rs} (100%) diff --git a/benches/appendvec.rs b/benches/append_vec.rs similarity index 89% rename from benches/appendvec.rs rename to benches/append_vec.rs index c4fbe7b332..b50b2027e6 100644 --- a/benches/appendvec.rs +++ b/benches/append_vec.rs @@ -4,7 +4,7 @@ extern crate test; use bincode::{deserialize, serialize_into, serialized_size}; use rand::{thread_rng, Rng}; -use solana_runtime::appendvec::{ +use solana_runtime::append_vec::{ deserialize_account, get_serialized_size, serialize_account, AppendVec, }; use solana_sdk::account::Account; @@ -20,7 +20,7 @@ use test::Bencher; const START_SIZE: u64 = 4 * 1024 * 1024; const INC_SIZE: u64 = 1 * 1024 * 1024; -fn get_appendvec_bench_path(path: &str) -> PathBuf { +fn get_append_vec_bench_path(path: &str) -> PathBuf { let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); let mut buf = PathBuf::new(); buf.push(&format!("{}/{}", out_dir, path)); @@ -28,8 +28,8 @@ fn get_appendvec_bench_path(path: &str) -> PathBuf { } #[bench] -fn appendvec_atomic_append(bencher: &mut Bencher) { - let path = get_appendvec_bench_path("bench_append"); +fn append_vec_atomic_append(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_append"); let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); bencher.iter(|| { if vec.append(AtomicUsize::new(0)).is_none() { @@ -41,8 +41,8 @@ fn appendvec_atomic_append(bencher: &mut Bencher) { } #[bench] -fn appendvec_atomic_random_access(bencher: &mut Bencher) { - let path = get_appendvec_bench_path("bench_ra"); +fn append_vec_atomic_random_access(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_ra"); let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); let size = 1_000_000; for _ in 0..size { @@ -59,8 +59,8 @@ fn appendvec_atomic_random_access(bencher: &mut Bencher) { } #[bench] -fn appendvec_atomic_random_change(bencher: &mut Bencher) { - let path = get_appendvec_bench_path("bench_rax"); +fn append_vec_atomic_random_change(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_rax"); let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); let size = 1_000_000; for _ in 0..size { @@ -83,8 +83,8 @@ fn appendvec_atomic_random_change(bencher: &mut Bencher) { } #[bench] -fn appendvec_atomic_random_read(bencher: &mut Bencher) { - let path = get_appendvec_bench_path("bench_read"); +fn append_vec_atomic_random_read(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_read"); let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); let size = 1_000_000; for _ in 0..size { @@ -103,8 +103,8 @@ fn appendvec_atomic_random_read(bencher: &mut Bencher) { } #[bench] -fn appendvec_concurrent_lock_append(bencher: &mut Bencher) { - let path = get_appendvec_bench_path("bench_lock_append"); +fn append_vec_concurrent_lock_append(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_lock_append"); let vec = Arc::new(RwLock::new(AppendVec::::new( &path, true, START_SIZE, INC_SIZE, ))); @@ -143,8 +143,8 @@ fn appendvec_concurrent_lock_append(bencher: &mut Bencher) { } #[bench] -fn appendvec_concurrent_get_append(bencher: &mut Bencher) { - let path = get_appendvec_bench_path("bench_get_append"); +fn append_vec_concurrent_get_append(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_get_append"); let vec = Arc::new(RwLock::new(AppendVec::::new( &path, true, START_SIZE, INC_SIZE, ))); diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 7d59353ae6..07738eb877 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1,4 +1,4 @@ -use crate::appendvec::AppendVec; +use crate::append_vec::AppendVec; use crate::bank::{BankError, Result}; use crate::runtime::has_duplicates; use bincode::serialize; @@ -76,9 +76,6 @@ type AppendVecId = usize; type Fork = u64; -#[derive(Debug)] -struct AccountMap(RwLock>); - #[derive(Debug, PartialEq)] enum AccountStorageStatus { StorageAvailable = 0, @@ -96,20 +93,29 @@ impl From for AccountStorageStatus { } } -struct AccountIndexInfo { - /// For each Pubkey, the account for a specific fork is in a specific - /// AppendVec at a specific index - index: RwLock>, +// in a given a Fork, which AppendVecId and offset +type AccountMap = RwLock>; + +/// information about where Accounts are stored and which vote accounts are present +/// keying hierarchy is: +/// +/// pubkey->fork->append_vec->offset +/// +struct AccountIndex { + /// For each Pubkey, the Account for a specific Fork is in a specific + /// AppendVec at a specific index. There may be an Account for Pubkey + /// in any number of Forks. + account_maps: RwLock>, /// Cached index to vote accounts for performance reasons to avoid having - /// to iterate through the entire accounts each time - vote_index: RwLock>, + /// to iterate through the entire accounts each time + vote_accounts: RwLock>, } /// Persistent storage structure holding the accounts struct AccountStorage { /// storage holding the accounts - appendvec: Arc>>, + accounts: Arc>>, /// Keeps track of the number of accounts stored in a specific AppendVec. /// This is periodically checked to reuse the stores that do not have @@ -134,19 +140,18 @@ impl AccountStorage { } #[derive(Default, Debug)] -struct AccountsForkInfo { - /// The number of transactions the bank has processed without error since the - /// start of the ledger. +struct ForkInfo { + /// The number of transactions processed without error transaction_count: u64, - /// List of all parents corresponding to this fork + /// List of all parents of this fork parents: Vec, } // This structure handles the load/store of the accounts pub struct AccountsDB { /// Keeps tracks of index into AppendVec on a per fork basis - index_info: AccountIndexInfo, + account_index: AccountIndex, /// Account storage storage: RwLock>, @@ -155,7 +160,7 @@ pub struct AccountsDB { next_id: AtomicUsize, /// Information related to the fork - fork_info: RwLock>, + fork_infos: RwLock>, } /// This structure handles synchronization for db @@ -194,15 +199,15 @@ impl Drop for Accounts { impl AccountsDB { pub fn new(fork: Fork, paths: &str) -> Self { - let index_info = AccountIndexInfo { - index: RwLock::new(HashMap::new()), - vote_index: RwLock::new(HashSet::new()), + let account_index = AccountIndex { + account_maps: RwLock::new(HashMap::new()), + vote_accounts: RwLock::new(HashSet::new()), }; let accounts_db = AccountsDB { - index_info, + account_index, storage: RwLock::new(vec![]), next_id: AtomicUsize::new(0), - fork_info: RwLock::new(HashMap::new()), + fork_infos: RwLock::new(HashMap::new()), }; accounts_db.add_storage(paths); accounts_db.add_fork(fork, None); @@ -210,15 +215,17 @@ impl AccountsDB { } pub fn add_fork(&self, fork: Fork, parent: Option) { - let mut info = self.fork_info.write().unwrap(); - let mut fork_info = AccountsForkInfo::default(); + let mut fork_infos = self.fork_infos.write().unwrap(); + let mut fork_info = ForkInfo::default(); if let Some(parent) = parent { fork_info.parents.push(parent); - if let Some(list) = info.get(&parent) { - fork_info.parents.extend_from_slice(&list.parents); + if let Some(parent_fork_info) = fork_infos.get(&parent) { + fork_info + .parents + .extend_from_slice(&parent_fork_info.parents); } } - if let Some(old_fork_info) = info.insert(fork, fork_info) { + if let Some(old_fork_info) = fork_infos.insert(fork, fork_info) { panic!("duplicate forks! {} {:?}", fork, old_fork_info); } } @@ -228,7 +235,7 @@ impl AccountsDB { let mut stores: Vec = vec![]; paths.iter().for_each(|p| { let storage = AccountStorage { - appendvec: self.new_account_storage(&p), + accounts: self.new_account_storage(&p), status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), count: AtomicUsize::new(0), path: p.to_string(), @@ -254,8 +261,8 @@ impl AccountsDB { } fn get_vote_accounts(&self, fork: Fork) -> HashMap { - self.index_info - .vote_index + self.account_index + .vote_accounts .read() .unwrap() .iter() @@ -270,11 +277,10 @@ impl AccountsDB { } pub fn has_accounts(&self, fork: Fork) -> bool { - let index = self.index_info.index.read().unwrap(); + let account_maps = self.account_index.account_maps.read().unwrap(); - for entry in index.values() { - let account_map = entry.0.read().unwrap(); - if account_map.contains_key(&fork) { + for account_map in account_maps.values() { + if account_map.read().unwrap().contains_key(&fork) { return true; } } @@ -282,20 +288,29 @@ impl AccountsDB { } pub fn hash_internal_state(&self, fork: Fork) -> Option { - let mut ordered_accounts = BTreeMap::new(); - let rindex = self.index_info.index.read().unwrap(); - rindex.iter().for_each(|(p, entry)| { - let forks = entry.0.read().unwrap(); - if let Some((id, index)) = forks.get(&fork) { - let account = self.storage.read().unwrap()[*id] - .appendvec - .read() - .unwrap() - .get_account(*index) - .unwrap(); - ordered_accounts.insert(*p, account); - } - }); + let ordered_accounts: BTreeMap<_, _> = self + .account_index + .account_maps + .read() + .unwrap() + .iter() + .filter_map(|(pubkey, account_map)| { + let account_map = account_map.read().unwrap(); + if let Some((vec_id, offset)) = account_map.get(&fork) { + Some(( + *pubkey, + self.storage.read().unwrap()[*vec_id] + .accounts + .read() + .unwrap() + .get_account(*offset) + .unwrap(), + )) + } else { + None + } + }) + .collect(); if ordered_accounts.is_empty() { return None; @@ -304,26 +319,26 @@ impl AccountsDB { } fn get_account(&self, id: AppendVecId, offset: u64) -> Account { - let appendvec = &self.storage.read().unwrap()[id].appendvec; - let av = appendvec.read().unwrap(); + let accounts = &self.storage.read().unwrap()[id].accounts; + let av = accounts.read().unwrap(); av.get_account(offset).unwrap() } fn load(&self, fork: Fork, pubkey: &Pubkey, walk_back: bool) -> Option { - let index = self.index_info.index.read().unwrap(); - if let Some(map) = index.get(pubkey) { - let forks = map.0.read().unwrap(); + let account_maps = self.account_index.account_maps.read().unwrap(); + if let Some(account_map) = account_maps.get(pubkey) { + let account_map = account_map.read().unwrap(); // find most recent fork that is an ancestor of current_fork - if let Some((id, offset)) = forks.get(&fork) { + if let Some((id, offset)) = account_map.get(&fork) { return Some(self.get_account(*id, *offset)); } else { if !walk_back { return None; } - let fork_info = self.fork_info.read().unwrap(); - if let Some(info) = fork_info.get(&fork) { - for parent_fork in info.parents.iter() { - if let Some((id, offset)) = forks.get(&parent_fork) { + let fork_infos = self.fork_infos.read().unwrap(); + if let Some(fork_info) = fork_infos.get(&fork) { + for parent_fork in fork_info.parents.iter() { + if let Some((id, offset)) = account_map.get(&parent_fork) { return Some(self.get_account(*id, *offset)); } } @@ -363,7 +378,7 @@ impl AccountsDB { // check if new store was already created if stores.len() == len { let storage = AccountStorage { - appendvec: self.new_account_storage(&stores[id].path), + accounts: self.new_account_storage(&stores[id].path), count: AtomicUsize::new(0), status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), path: stores[id].path.clone(), @@ -381,7 +396,7 @@ impl AccountsDB { let mut id = self.get_storage_id(start, std::usize::MAX); // Even if no tokens, need to preserve the account owner so - // we can update the vote_index correctly if this account is purged + // we can update the vote_accounts correctly if this account is purged // when squashing. let acc = &mut account.clone(); if account.tokens == 0 { @@ -391,7 +406,7 @@ impl AccountsDB { loop { let result: Option; { - let av = &self.storage.read().unwrap()[id].appendvec; + let av = &self.storage.read().unwrap()[id].accounts; result = av.read().unwrap().append_account(acc); } if let Some(val) = result { @@ -405,12 +420,12 @@ impl AccountsDB { } fn remove_account_entries(&self, entries: &[Fork], map: &AccountMap) -> bool { - let mut forks = map.0.write().unwrap(); + let mut forks = map.write().unwrap(); for fork in entries.iter() { if let Some((id, _)) = forks.remove(&fork) { let stores = self.storage.read().unwrap(); if stores[id].count.fetch_sub(1, Ordering::Relaxed) == 1 { - stores[id].appendvec.write().unwrap().reset(); + stores[id].accounts.write().unwrap().reset(); stores[id].set_status(AccountStorageStatus::StorageAvailable); } } @@ -418,9 +433,9 @@ impl AccountsDB { forks.is_empty() } - fn account_map_is_empty(pubkey: &Pubkey, index: &HashMap) -> bool { - if let Some(account_map) = index.get(pubkey) { - if account_map.0.read().unwrap().len() == 0 { + fn account_map_is_empty(pubkey: &Pubkey, account_maps: &HashMap) -> bool { + if let Some(account_map) = account_maps.get(pubkey) { + if account_map.read().unwrap().len() == 0 { return true; } } @@ -430,25 +445,33 @@ impl AccountsDB { fn update_vote_cache( &self, account: &Account, - index: &HashMap, + account_maps: &HashMap, pubkey: &Pubkey, ) { if solana_vote_api::check_id(&account.owner) { - if Self::account_map_is_empty(pubkey, index) { - self.index_info.vote_index.write().unwrap().remove(pubkey); + if Self::account_map_is_empty(pubkey, account_maps) { + self.account_index + .vote_accounts + .write() + .unwrap() + .remove(pubkey); } else { - self.index_info.vote_index.write().unwrap().insert(*pubkey); + self.account_index + .vote_accounts + .write() + .unwrap() + .insert(*pubkey); } } } fn insert_account_entry(&self, fork: Fork, id: AppendVecId, offset: u64, map: &AccountMap) { - let mut forks = map.0.write().unwrap(); + let mut forks = map.write().unwrap(); let stores = self.storage.read().unwrap(); stores[id].count.fetch_add(1, Ordering::Relaxed); if let Some((old_id, _)) = forks.insert(fork, (id, offset)) { if stores[old_id].count.fetch_sub(1, Ordering::Relaxed) == 1 { - stores[old_id].appendvec.write().unwrap().reset(); + stores[old_id].accounts.write().unwrap().reset(); stores[old_id].set_status(AccountStorageStatus::StorageAvailable); } } @@ -458,26 +481,32 @@ impl AccountsDB { fn store_account(&self, fork: Fork, pubkey: &Pubkey, account: &Account) { if account.tokens == 0 && self.is_squashed(fork) { // purge if balance is 0 and no checkpoints - let index = self.index_info.index.read().unwrap(); - let map = index.get(&pubkey).unwrap(); + let account_maps = self.account_index.account_maps.read().unwrap(); + let map = account_maps.get(&pubkey).unwrap(); self.remove_account_entries(&[fork], &map); - self.update_vote_cache(account, &index, pubkey); + self.update_vote_cache(account, &account_maps, pubkey); } else { let (id, offset) = self.append_account(account); - let index = self.index_info.index.read().unwrap(); + let account_maps = self.account_index.account_maps.read().unwrap(); - let map = index.get(&pubkey).unwrap(); + let map = account_maps.get(&pubkey).unwrap(); self.insert_account_entry(fork, id, offset, &map); - self.update_vote_cache(account, &index, pubkey); + self.update_vote_cache(account, &account_maps, pubkey); } } pub fn store(&self, fork: Fork, pubkey: &Pubkey, account: &Account) { { - if !self.index_info.index.read().unwrap().contains_key(&pubkey) { - let mut windex = self.index_info.index.write().unwrap(); - windex.insert(*pubkey, AccountMap(RwLock::new(HashMap::new()))); + if !self + .account_index + .account_maps + .read() + .unwrap() + .contains_key(&pubkey) + { + let mut waccount_maps = self.account_index.account_maps.write().unwrap(); + waccount_maps.insert(*pubkey, RwLock::new(HashMap::new())); } } self.store_account(fork, pubkey, account); @@ -492,23 +521,23 @@ impl AccountsDB { ) { let mut keys = vec![]; { - let index = self.index_info.index.read().unwrap(); + let account_maps = self.account_index.account_maps.read().unwrap(); for (i, raccs) in loaded.iter().enumerate() { if res[i].is_err() || raccs.is_err() { continue; } let tx = &txs[i]; for key in tx.account_keys.iter() { - if !index.contains_key(&key) { + if !account_maps.contains_key(&key) { keys.push(*key); } } } } if !keys.is_empty() { - let mut index = self.index_info.index.write().unwrap(); + let mut account_maps = self.account_index.account_maps.write().unwrap(); for key in keys.iter() { - index.insert(*key, AccountMap(RwLock::new(HashMap::new()))); + account_maps.insert(*key, RwLock::new(HashMap::new())); } } for (i, raccs) in loaded.iter().enumerate() { @@ -640,28 +669,27 @@ impl AccountsDB { } pub fn increment_transaction_count(&self, fork: Fork, tx_count: usize) { - let mut info = self.fork_info.write().unwrap(); - let entry = info.entry(fork).or_insert(AccountsForkInfo::default()); - entry.transaction_count += tx_count as u64; + let mut fork_infos = self.fork_infos.write().unwrap(); + let fork_info = fork_infos.entry(fork).or_insert(ForkInfo::default()); + fork_info.transaction_count += tx_count as u64; } pub fn transaction_count(&self, fork: Fork) -> u64 { - let info = self.fork_info.read().unwrap(); - if let Some(entry) = info.get(&fork) { - entry.transaction_count - } else { - 0 - } + self.fork_infos + .read() + .unwrap() + .get(&fork) + .map_or(0, |fork_info| fork_info.transaction_count) } fn remove_parents(&self, fork: Fork) -> Vec { - let mut info = self.fork_info.write().unwrap(); + let mut info = self.fork_infos.write().unwrap(); let fork_info = info.get_mut(&fork).unwrap(); fork_info.parents.split_off(0) } fn is_squashed(&self, fork: Fork) -> bool { - self.fork_info + self.fork_infos .read() .unwrap() .get(&fork) @@ -670,13 +698,13 @@ impl AccountsDB { .is_empty() } - fn get_merged_index( + fn get_merged_account_map( &self, fork: Fork, parents: &[Fork], map: &AccountMap, ) -> Option<(Fork, AppendVecId, u64)> { - let forks = map.0.read().unwrap(); + let forks = map.read().unwrap(); if let Some((id, offset)) = forks.get(&fork) { return Some((fork, *id, *offset)); } else { @@ -701,9 +729,10 @@ impl AccountsDB { // absent let mut keys = vec![]; { - let index = self.index_info.index.read().unwrap(); - index.iter().for_each(|(pubkey, map)| { - if let Some((parent_fork, id, offset)) = self.get_merged_index(fork, &parents, &map) + let account_maps = self.account_index.account_maps.read().unwrap(); + account_maps.iter().for_each(|(pubkey, map)| { + if let Some((parent_fork, id, offset)) = + self.get_merged_account_map(fork, &parents, &map) { if parent_fork != fork { self.insert_account_entry(fork, id, offset, &map); @@ -713,16 +742,16 @@ impl AccountsDB { if self.remove_account_entries(&[fork], &map) { keys.push(pubkey.clone()); } - self.update_vote_cache(&account, &index, pubkey); + self.update_vote_cache(&account, &account_maps, pubkey); } } } }); } if !keys.is_empty() { - let mut index = self.index_info.index.write().unwrap(); + let mut account_maps = self.account_index.account_maps.write().unwrap(); for key in keys.iter() { - index.remove(&key); + account_maps.remove(&key); } } } @@ -1688,7 +1717,15 @@ mod tests { accounts_db.squash(1); accounts_db.squash(2); - assert_eq!(accounts_db.index_info.vote_index.read().unwrap().len(), 1); + assert_eq!( + accounts_db + .account_index + .vote_accounts + .read() + .unwrap() + .len(), + 1 + ); assert_eq!(accounts_db.get_vote_accounts(1).len(), 1); assert_eq!(accounts_db.get_vote_accounts(2).len(), 1); diff --git a/runtime/src/appendvec.rs b/runtime/src/append_vec.rs similarity index 100% rename from runtime/src/appendvec.rs rename to runtime/src/append_vec.rs diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 32fdf10e05..e682c4556e 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -1,5 +1,5 @@ mod accounts; -pub mod appendvec; +pub mod append_vec; pub mod bank; pub mod bloom; mod hash_queue;