diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index e3d69f1c5d..a6af53560b 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1,6 +1,6 @@ use crate::accounts_db::{ - get_paths_vec, AccountInfo, AccountStorageSlice, AccountsDB, ErrorCounters, - InstructionAccounts, InstructionLoaders, + get_paths_vec, AccountInfo, AccountStorage, AccountsDB, ErrorCounters, InstructionAccounts, + InstructionLoaders, }; use crate::accounts_index::{AccountsIndex, Fork}; use crate::append_vec::StoredAccount; @@ -140,7 +140,7 @@ impl Accounts { } fn load_tx_accounts( - storage: &AccountStorageSlice, + storage: &AccountStorage, ancestors: &HashMap, accounts_index: &AccountsIndex, tx: &Transaction, @@ -180,7 +180,7 @@ impl Accounts { } fn load_executable_accounts( - storage: &AccountStorageSlice, + storage: &AccountStorage, ancestors: &HashMap, accounts_index: &AccountsIndex, program_id: &Pubkey, @@ -222,7 +222,7 @@ impl Accounts { /// For each program_id in the transaction, load its loaders. fn load_loaders( - storage: &AccountStorageSlice, + storage: &AccountStorage, ancestors: &HashMap, accounts_index: &AccountsIndex, tx: &Transaction, diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 5f4955d7c5..67de70e135 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -63,30 +63,20 @@ pub struct AccountInfo { } /// An offset into the AccountsDB::storage vector type AppendVecId = usize; -type AccountStorage = Vec>; -pub type AccountStorageSlice = [Arc]; +pub type AccountStorage = HashMap>>; pub type InstructionAccounts = Vec; pub type InstructionLoaders = Vec>; -#[derive(Debug, PartialEq)] +#[derive(Copy, Clone, Debug, PartialEq)] pub enum AccountStorageStatus { StorageAvailable = 0, StorageFull = 1, } -impl From for AccountStorageStatus { - fn from(status: usize) -> Self { - use self::AccountStorageStatus::*; - match status { - 0 => StorageAvailable, - 1 => StorageFull, - _ => unreachable!(), - } - } -} - /// Persistent storage structure holding the accounts pub struct AccountStorageEntry { + id: AppendVecId, + fork_id: Fork, /// storage holding the accounts @@ -95,10 +85,10 @@ pub struct AccountStorageEntry { /// Keeps track of the number of accounts stored in a specific AppendVec. /// This is periodically checked to reuse the stores that do not have /// any accounts in it. - count: AtomicUsize, + count: usize, /// status corresponding to the storage - status: AtomicUsize, + status: AccountStorageStatus, } impl AccountStorageEntry { @@ -110,32 +100,31 @@ impl AccountStorageEntry { let accounts = AppendVec::new(&path.join(ACCOUNT_DATA_FILE), true, file_size as usize); AccountStorageEntry { + id, fork_id, accounts, - count: AtomicUsize::new(0), - status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), + count: 0, + status: AccountStorageStatus::StorageAvailable, } } - pub fn set_status(&self, status: AccountStorageStatus) { - self.status.store(status as usize, Ordering::Relaxed); + pub fn set_status(&mut self, status: AccountStorageStatus) { + self.status = status; } - pub fn get_status(&self) -> AccountStorageStatus { - self.status.load(Ordering::Relaxed).into() + pub fn status(&self) -> AccountStorageStatus { + self.status } - fn add_account(&self) { - self.count.fetch_add(1, Ordering::Relaxed); + fn add_account(&mut self) { + self.count += 1; } - fn remove_account(&self) -> bool { - if self.count.fetch_sub(1, Ordering::Relaxed) == 1 { + fn remove_account(&mut self) { + self.count -= 1; + if self.count == 0 { self.accounts.reset(); self.set_status(AccountStorageStatus::StorageAvailable); - true - } else { - false } } } @@ -171,7 +160,7 @@ impl AccountsDB { let paths = get_paths_vec(&paths); AccountsDB { accounts_index: RwLock::new(AccountsIndex::default()), - storage: RwLock::new(vec![]), + storage: RwLock::new(HashMap::new()), next_id: AtomicUsize::new(0), write_version: AtomicUsize::new(0), paths, @@ -193,8 +182,9 @@ impl AccountsDB { } pub fn has_accounts(&self, fork: Fork) -> bool { - for x in self.storage.read().unwrap().iter() { - if x.fork_id == fork && x.count.load(Ordering::Relaxed) > 0 { + for storage_entry in self.storage.read().unwrap().values() { + let storage_entry = storage_entry.read().unwrap(); + if storage_entry.fork_id == fork && storage_entry.count > 0 { return true; } } @@ -209,17 +199,18 @@ impl AccountsDB { F: Send + Sync, B: Send + Default, { - let storage_maps: Vec> = self + let storage_maps: Vec>> = self .storage .read() .unwrap() - .iter() - .filter(|store| store.fork_id == fork_id) + .values() + .filter(|store| store.read().unwrap().fork_id == fork_id) .cloned() .collect(); storage_maps .into_par_iter() .map(|storage| { + let storage = storage.read().unwrap(); let accounts = storage.accounts.accounts(0); let mut retval = B::default(); accounts @@ -231,16 +222,24 @@ impl AccountsDB { } pub fn load( - storage: &AccountStorageSlice, + storage: &AccountStorage, ancestors: &HashMap, accounts_index: &AccountsIndex, pubkey: &Pubkey, ) -> Option { let info = accounts_index.get(pubkey, ancestors)?; //TODO: thread this as a ref - storage - .get(info.id) - .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) + storage.get(&info.id).and_then(|store| { + Some( + store + .read() + .unwrap() + .accounts + .get_account(info.offset)? + .0 + .clone_account(), + ) + }) } pub fn load_slow(&self, ancestors: &HashMap, pubkey: &Pubkey) -> Option { @@ -249,82 +248,69 @@ impl AccountsDB { Self::load(&storage, ancestors, &accounts_index, pubkey) } - fn get_storage_id(&self, fork_id: Fork, start: usize, current: usize) -> usize { - let mut id = current; - let len: usize; - { - let stores = self.storage.read().unwrap(); - len = stores.len(); - if len > 0 { - if id == std::usize::MAX { - id = start % len; - if stores[id].get_status() == AccountStorageStatus::StorageAvailable { - return id; - } + fn with_exclusive_storage(&self, fork_id: Fork, mut access: F) -> bool + where + F: FnMut(&mut AccountStorageEntry) -> bool, + { + let entries: Vec<_> = self + .storage + .read() + .unwrap() + .values() + .filter_map(|entry| { + let res = { + let entry = entry.read().unwrap(); + entry.status() == AccountStorageStatus::StorageAvailable + && entry.fork_id == fork_id + }; + if res { + Some(entry.clone()) } else { - stores[id].set_status(AccountStorageStatus::StorageFull); + None } + }) + .collect(); - loop { - id = (id + 1) % len; - if fork_id == stores[id].fork_id - && stores[id].get_status() == AccountStorageStatus::StorageAvailable - { - break; - } - if id == start % len { - break; - } - } + for entry in entries { + if access(&mut entry.write().unwrap()) { + return true; } } - if len == 0 || id == start % len { - let mut stores = self.storage.write().unwrap(); - // check if new store was already created - if stores.len() == len { - let path_idx = thread_rng().gen_range(0, self.paths.len()); - let storage = self.new_storage_entry(fork_id, &self.paths[path_idx]); - stores.push(Arc::new(storage)); - } - id = stores.len() - 1; - } - id + + let path_idx = thread_rng().gen_range(0, self.paths.len()); + let mut new_entry = self.new_storage_entry(fork_id, &self.paths[path_idx]); + + let rv = access(&mut new_entry); + + self.storage + .write() + .unwrap() + .insert(new_entry.id, Arc::new(RwLock::new(new_entry))); + + rv } - fn append_account(&self, fork_id: Fork, pubkey: &Pubkey, account: &Account) -> (usize, usize) { - let offset: usize; - let start = self.next_id.fetch_add(1, Ordering::Relaxed); - let mut id = self.get_storage_id(fork_id, start, std::usize::MAX); - - // Even if no lamports, need to preserve the account owner so - // we can update the vote_accounts correctly if this account is purged - // when squashing. - let acc = &mut account.clone(); + fn append_account( + &self, + storage: &AccountStorageEntry, + pubkey: &Pubkey, + account: &Account, + ) -> Option { + let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64; + let meta = StorageMeta { + write_version, + pubkey: *pubkey, + data_len: account.data.len() as u64, + }; if account.lamports == 0 { - acc.data.resize(0, 0); + // Even if no lamports, need to preserve the account owner so + // we can update the vote_accounts correctly as roots move forward + let account = &mut account.clone(); + account.data.resize(0, 0); + storage.accounts.append_account(meta, account) + } else { + storage.accounts.append_account(meta, account) } - - loop { - let result: Option; - { - let accounts = &self.storage.read().unwrap()[id]; - let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64; - let meta = StorageMeta { - write_version, - pubkey: *pubkey, - data_len: account.data.len() as u64, - }; - result = accounts.accounts.append_account(meta, account); - accounts.add_account(); - } - if let Some(val) = result { - offset = val; - break; - } else { - id = self.get_storage_id(fork_id, start, id); - } - } - (id, offset) } pub fn purge_fork(&self, fork: Fork) { @@ -332,55 +318,108 @@ impl AccountsDB { let is_root = self.accounts_index.read().unwrap().is_root(fork); trace!("PURGING {} {}", fork, is_root); if !is_root { - self.storage.write().unwrap().retain(|x| { - trace!("PURGING {} {}", x.fork_id, fork); - x.fork_id != fork + self.storage.write().unwrap().retain(|_, v| { + trace!("PURGING {} {}", v.read().unwrap().fork_id, fork); + v.read().unwrap().fork_id != fork }); } } + fn store_accounts(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) -> Vec { + let mut infos = vec![]; + + for (pubkey, account) in accounts { + self.with_exclusive_storage(fork_id, |storage| { + if let Some(offset) = self.append_account(&storage, pubkey, account) { + storage.add_account(); + infos.push(AccountInfo { + id: storage.id, + offset, + lamports: account.lamports, + }); + true + } else { + storage.set_status(AccountStorageStatus::StorageFull); + false + } + }); + } + + infos + } + + fn update_index( + &self, + fork_id: Fork, + infos: Vec, + accounts: &[(&Pubkey, &Account)], + ) -> Vec<(Fork, AccountInfo)> { + let mut index = self.accounts_index.write().unwrap(); + let mut reclaims = vec![]; + for (i, info) in infos.into_iter().enumerate() { + let key = &accounts[i].0; + reclaims.extend(index.insert(fork_id, key, info).into_iter()) + } + reclaims + } + + fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet { + let storage = self.storage.read().unwrap(); + for (fork_id, account_info) in reclaims { + if let Some(entry) = storage.get(&account_info.id) { + assert_eq!( + fork_id, + entry.read().unwrap().fork_id, + "AccountDB::accounts_index corrupted. Storage should only point to one fork" + ); + entry.write().unwrap().remove_account(); + } + } + //TODO: performance here could be improved if AccountsDB::storage was organized by fork + let dead_forks: HashSet = storage + .values() + .filter_map(|entry| { + let entry = entry.read().unwrap(); + if entry.count == 0 { + Some(entry.fork_id) + } else { + None + } + }) + .collect(); + let live_forks: HashSet = storage + .values() + .filter_map(|entry| { + let entry = entry.read().unwrap(); + if entry.count > 0 { + Some(entry.fork_id) + } else { + None + } + }) + .collect(); + dead_forks.difference(&live_forks).cloned().collect() + } + fn cleanup_dead_forks(&self, dead_forks: &mut HashSet) { + let mut index = self.accounts_index.write().unwrap(); + // a fork is not totally dead until it is older than the root + dead_forks.retain(|fork| *fork < index.last_root); + for fork in dead_forks.iter() { + index.cleanup_dead_fork(*fork); + } + } + /// Store the account update. pub fn store(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) { - //TODO; these blocks should be separate functions and unit tested - let infos: Vec<_> = accounts - .iter() - .map(|(pubkey, account)| { - let (id, offset) = self.append_account(fork_id, pubkey, account); - AccountInfo { - id, - offset, - lamports: account.lamports, - } - }) - .collect(); - - let reclaims: Vec<(Fork, AccountInfo)> = { - let mut index = self.accounts_index.write().unwrap(); - let mut reclaims = vec![]; - for (i, info) in infos.into_iter().enumerate() { - let key = &accounts[i].0; - reclaims.extend(index.insert(fork_id, key, info).into_iter()) - } - reclaims - }; - - let dead_forks: HashSet = { - let stores = self.storage.read().unwrap(); - let mut cleared_forks: HashSet = HashSet::new(); - for (fork_id, account_info) in reclaims { - let cleared = stores[account_info.id].remove_account(); - if cleared { - cleared_forks.insert(fork_id); - } - } - let live_forks: HashSet = stores.iter().map(|x| x.fork_id).collect(); - cleared_forks.difference(&live_forks).cloned().collect() - }; - { - let mut index = self.accounts_index.write().unwrap(); - for fork in dead_forks { - index.cleanup_dead_fork(fork); - } + let infos = self.store_accounts(fork_id, accounts); + let reclaims = self.update_index(fork_id, infos, accounts); + trace!("reclaim: {}", reclaims.len()); + let mut dead_forks = self.remove_dead_accounts(reclaims); + trace!("dead_forks: {}", dead_forks.len()); + self.cleanup_dead_forks(&mut dead_forks); + trace!("purge_forks: {}", dead_forks.len()); + for fork in dead_forks { + self.purge_fork(fork); } } @@ -569,7 +608,6 @@ mod tests { } #[test] - #[ignore] fn test_accountsdb_count_stores() { let paths = get_tmp_accounts_path!(); let db = AccountsDB::new(&paths.paths); @@ -592,15 +630,15 @@ mod tests { { let stores = db.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 2); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[&0].read().unwrap().count, 2); + assert_eq!(stores[&1].read().unwrap().count, 2); } db.add_root(1); { let stores = db.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 2); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[&0].read().unwrap().count, 2); + assert_eq!(stores[&1].read().unwrap().count, 2); } } @@ -675,10 +713,11 @@ mod tests { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 1); assert_eq!( - stores[0].get_status(), + stores[&0].read().unwrap().status(), AccountStorageStatus::StorageAvailable ); - stores[0].count.load(Ordering::Relaxed) == count + let rv = stores[&0].read().unwrap().count == count; + rv } fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork) { @@ -745,8 +784,10 @@ mod tests { } let mut append_vec_histogram = HashMap::new(); - for storage in accounts.storage.read().unwrap().iter() { - *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1; + for storage in accounts.storage.read().unwrap().values() { + *append_vec_histogram + .entry(storage.read().unwrap().fork_id) + .or_insert(0) += 1; } for count in append_vec_histogram.values() { assert!(*count >= 2); @@ -754,7 +795,6 @@ mod tests { } #[test] - #[ignore] fn test_account_grow() { let paths = get_tmp_accounts_path!(); let accounts = AccountsDB::new(&paths.paths); @@ -769,8 +809,8 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 1); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[0].get_status(), status[0]); + assert_eq!(stores[&0].read().unwrap().count, 1); + assert_eq!(stores[&0].read().unwrap().status(), status[0]); } let pubkey2 = Pubkey::new_rand(); @@ -779,10 +819,10 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[0].get_status(), status[1]); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[1].get_status(), status[0]); + assert_eq!(stores[&0].read().unwrap().count, 1); + assert_eq!(stores[&0].read().unwrap().status(), status[1]); + assert_eq!(stores[&1].read().unwrap().count, 1); + assert_eq!(stores[&1].read().unwrap().status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); @@ -794,12 +834,12 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 3); - assert_eq!(stores[0].count.load(Ordering::Relaxed), count[index]); - assert_eq!(stores[0].get_status(), status[0]); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[1].get_status(), status[1]); - assert_eq!(stores[2].count.load(Ordering::Relaxed), count[index ^ 1]); - assert_eq!(stores[2].get_status(), status[0]); + assert_eq!(stores[&0].read().unwrap().count, count[index]); + assert_eq!(stores[&0].read().unwrap().status(), status[0]); + assert_eq!(stores[&1].read().unwrap().count, 1); + assert_eq!(stores[&1].read().unwrap().status(), status[1]); + assert_eq!(stores[&2].read().unwrap().count, count[index ^ 1]); + assert_eq!(stores[&2].read().unwrap().status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); @@ -831,4 +871,41 @@ mod tests { assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_some()); } + #[test] + fn test_lazy_gc_fork() { + //This test is pedantic + //A fork is purged when a non root bank is cleaned up. If a fork is behind root but it is + //not root, it means we are retaining dead banks. + let paths = get_tmp_accounts_path!(); + let accounts = AccountsDB::new(&paths.paths); + let pubkey = Pubkey::new_rand(); + let account = Account::new(1, 0, &Account::default().owner); + //store an account + accounts.store(0, &[(&pubkey, &account)]); + let ancestors = vec![(0, 0)].into_iter().collect(); + let info = accounts + .accounts_index + .read() + .unwrap() + .get(&pubkey, &ancestors) + .unwrap() + .clone(); + //fork 0 is behind root, but it is not root, therefore it is purged + accounts.add_root(1); + assert!(accounts.accounts_index.read().unwrap().is_purged(0)); + + //fork is still there, since gc is lazy + assert!(accounts.storage.read().unwrap().get(&info.id).is_some()); + + //store causes cleanup + accounts.store(1, &[(&pubkey, &account)]); + + //fork is gone + assert!(accounts.storage.read().unwrap().get(&info.id).is_none()); + + //new value is there + let ancestors = vec![(1, 1)].into_iter().collect(); + assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some(account));; + } + } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 0ae3e16e7f..fae7fc1055 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -9,7 +9,7 @@ pub struct AccountsIndex { account_maps: HashMap>, roots: HashSet, //This value that needs to be stored to recover the index from AppendVec - last_root: Fork, + pub last_root: Fork, } impl AccountsIndex { @@ -59,7 +59,7 @@ impl AccountsIndex { }; rv } - fn is_purged(&self, fork: Fork) -> bool { + pub fn is_purged(&self, fork: Fork) -> bool { !self.is_root(fork) && fork < self.last_root } pub fn is_root(&self, fork: Fork) -> bool {