From 400610bf6a011aeaca507973f6a743a653b3652c Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 11 Jul 2019 17:57:56 -0600 Subject: [PATCH] v0.16: AccountsDB updates and getProgramAccounts RPC fix (#5044) * reduce replicode in accounts, fix cast to i64 (#5025) * add accounts_index_scan_accounts (#5020) * Plumb scan_accounts into accounts_db, adding load from storage (#5029) * Fix getProgramAccounts RPC (#5024) * Use scan_accounts to load accounts by program_id * Add bank test * Use get_program_accounts in RPC * Rebase for v0.16 --- core/src/rpc.rs | 2 +- runtime/src/accounts.rs | 100 ++++++++++++++++++++++------------ runtime/src/accounts_db.rs | 72 ++++++++++++++++++++++++ runtime/src/accounts_index.rs | 87 +++++++++++++++++++++++------ runtime/src/bank.rs | 57 ++++++++++++++++++- 5 files changed, 262 insertions(+), 56 deletions(-) diff --git a/core/src/rpc.rs b/core/src/rpc.rs index ed28435852..fb380002aa 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -73,7 +73,7 @@ impl JsonRpcRequestProcessor { pub fn get_program_accounts(&self, program_id: &Pubkey) -> Result> { Ok(self .bank() - .get_program_accounts_modified_since_parent(&program_id) + .get_program_accounts(&program_id) .into_iter() .map(|(pubkey, account)| (pubkey.to_string(), account)) .collect()) diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index e9121723fb..bea5073871 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -22,7 +22,6 @@ use std::collections::{HashMap, HashSet}; use std::env; use std::fs::remove_dir_all; use std::io::{BufReader, Read}; -use std::ops::Neg; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -294,27 +293,65 @@ impl Accounts { .filter(|(acc, _)| acc.lamports != 0) } - pub fn load_by_program(&self, fork: Fork, program_id: &Pubkey) -> Vec<(Pubkey, Account)> { - let accumulator: Vec> = self.accounts_db.scan_account_storage( + /// scans underlying accounts_db for this delta (fork) with a map function + /// from StoredAccount to B + /// returns only the latest/current version of B for this fork + fn scan_fork(&self, fork: Fork, func: F) -> Vec + where + F: Fn(&StoredAccount) -> Option, + F: Send + Sync, + B: Send + Default, + { + let accumulator: Vec> = self.accounts_db.scan_account_storage( fork, |stored_account: &StoredAccount, _id: AppendVecId, - accum: &mut Vec<(Pubkey, u64, Account)>| { - if stored_account.balance.owner == *program_id { - let val = ( + accum: &mut Vec<(Pubkey, u64, B)>| { + if let Some(val) = func(stored_account) { + accum.push(( stored_account.meta.pubkey, - stored_account.meta.write_version, - stored_account.clone_account(), - ); - accum.push(val) + std::u64::MAX - stored_account.meta.write_version, + val, + )); } }, ); - let mut versions: Vec<(Pubkey, u64, Account)> = - accumulator.into_iter().flat_map(|x| x).collect(); - versions.sort_by_key(|s| (s.0, (s.1 as i64).neg())); + + let mut versions: Vec<(Pubkey, u64, B)> = accumulator.into_iter().flat_map(|x| x).collect(); + versions.sort_by_key(|s| (s.0, s.1)); versions.dedup_by_key(|s| s.0); - versions.into_iter().map(|s| (s.0, s.2)).collect() + versions + .into_iter() + .map(|(_pubkey, _version, val)| val) + .collect() + } + + pub fn load_by_program_fork(&self, fork: Fork, program_id: &Pubkey) -> Vec<(Pubkey, Account)> { + self.scan_fork(fork, |stored_account| { + if stored_account.balance.owner == *program_id { + Some((stored_account.meta.pubkey, stored_account.clone_account())) + } else { + None + } + }) + } + + pub fn load_by_program( + &self, + ancestors: &HashMap, + program_id: &Pubkey, + ) -> Vec<(Pubkey, Account)> { + self.accounts_db.scan_accounts( + ancestors, + |collector: &mut Vec<(Pubkey, Account)>, option| { + if let Some(data) = option + .filter(|(_, account, _)| account.owner == *program_id && account.lamports != 0) + .map(|(pubkey, account, _fork)| (*pubkey, account)) + { + collector.push(data) + } + }, + ) } /// Slow because lock is held for 1 operation instead of many @@ -362,28 +399,19 @@ impl Accounts { } pub fn hash_internal_state(&self, fork_id: Fork) -> Option { - let accumulator: Vec> = self.accounts_db.scan_account_storage( - fork_id, - |stored_account: &StoredAccount, - _id: AppendVecId, - accum: &mut Vec<(Pubkey, u64, Hash)>| { - if !syscall::check_id(&stored_account.balance.owner) { - accum.push(( - stored_account.meta.pubkey, - stored_account.meta.write_version, - Self::hash_account(stored_account), - )); - } - }, - ); - let mut account_hashes: Vec<_> = accumulator.into_iter().flat_map(|x| x).collect(); - account_hashes.sort_by_key(|s| (s.0, (s.1 as i64).neg())); - account_hashes.dedup_by_key(|s| s.0); + let account_hashes = self.scan_fork(fork_id, |stored_account| { + if !syscall::check_id(&stored_account.balance.owner) { + Some(Self::hash_account(stored_account)) + } else { + None + } + }); + if account_hashes.is_empty() { None } else { let mut hasher = Hasher::default(); - for (_, _, hash) in account_hashes { + for hash in account_hashes { hasher.hash(hash.as_ref()); } Some(hasher.result()) @@ -938,7 +966,7 @@ mod tests { } #[test] - fn test_load_by_program() { + fn test_load_by_program_fork() { let accounts = Accounts::new(None); // Load accounts owned by various programs into AccountsDB @@ -952,11 +980,11 @@ mod tests { let account2 = Account::new(1, 0, &Pubkey::new(&[3; 32])); accounts.store_slow(0, &pubkey2, &account2); - let loaded = accounts.load_by_program(0, &Pubkey::new(&[2; 32])); + let loaded = accounts.load_by_program_fork(0, &Pubkey::new(&[2; 32])); assert_eq!(loaded.len(), 2); - let loaded = accounts.load_by_program(0, &Pubkey::new(&[3; 32])); + let loaded = accounts.load_by_program_fork(0, &Pubkey::new(&[3; 32])); assert_eq!(loaded, vec![(pubkey2, account2)]); - let loaded = accounts.load_by_program(0, &Pubkey::new(&[4; 32])); + let loaded = accounts.load_by_program_fork(0, &Pubkey::new(&[4; 32])); assert_eq!(loaded, vec![]); } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index b479ba562d..b3a5bad678 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -378,6 +378,36 @@ impl AccountsDB { false } + pub fn scan_accounts(&self, ancestors: &HashMap, scan_func: F) -> A + where + F: Fn(&mut A, Option<(&Pubkey, Account, Fork)>) -> (), + A: Default, + { + let mut collector = A::default(); + let accounts_index = self.accounts_index.read().unwrap(); + let storage = self.storage.read().unwrap(); + accounts_index.scan_accounts(ancestors, |pubkey, (account_info, fork)| { + scan_func( + &mut collector, + storage + .0 + .get(&fork) + .and_then(|storage_map| storage_map.get(&account_info.id)) + .and_then(|store| { + Some( + store + .accounts + .get_account(account_info.offset)? + .0 + .clone_account(), + ) + }) + .map(|account| (pubkey, account, fork)), + ) + }); + collector + } + /// Scan a specific fork through all the account storage in parallel with sequential read // PERF: Sequentially read each storage entry in parallel pub fn scan_account_storage(&self, fork_id: Fork, scan_func: F) -> Vec @@ -770,6 +800,14 @@ mod tests { let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1); + + let accounts: Vec = + db.scan_accounts(&ancestors, |accounts: &mut Vec, option| { + if let Some(data) = option { + accounts.push(data.1); + } + }); + assert_eq!(accounts, vec![account1]); } #[test] @@ -1313,4 +1351,38 @@ mod tests { t.join().unwrap(); } } + + #[test] + fn test_accountsdb_scan_accounts() { + solana_logger::setup(); + let paths = get_tmp_accounts_path!(); + let db = AccountsDB::new(&paths.paths); + let key = Pubkey::default(); + let key0 = Pubkey::new_rand(); + let account0 = Account::new(1, 0, &key); + + db.store(0, &hashmap!(&key0 => (&account0, 0))); + + let key1 = Pubkey::new_rand(); + let account1 = Account::new(2, 0, &key); + db.store(1, &hashmap!(&key1 => (&account1, 0))); + + let ancestors = vec![(0, 0)].into_iter().collect(); + let accounts: Vec = + db.scan_accounts(&ancestors, |accounts: &mut Vec, option| { + if let Some(data) = option { + accounts.push(data.1); + } + }); + assert_eq!(accounts, vec![account0]); + + let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); + let accounts: Vec = + db.scan_accounts(&ancestors, |accounts: &mut Vec, option| { + if let Some(data) = option { + accounts.push(data.1); + } + }); + assert_eq!(accounts.len(), 2); + } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 85fb436145..a704900290 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1,14 +1,12 @@ -use hashbrown::HashMap; use log::*; use solana_sdk::pubkey::Pubkey; -use std::collections; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; pub type Fork = u64; #[derive(Debug, Default)] pub struct AccountsIndex { - pub account_maps: HashMap>, + pub account_maps: hashbrown::HashMap>, pub roots: HashSet, @@ -17,26 +15,44 @@ pub struct AccountsIndex { } impl AccountsIndex { - /// Get an account - /// The latest account that appears in `ancestors` or `roots` is returned. - pub fn get( + /// call func with every pubkey and index visible from a given set of ancestors + pub fn scan_accounts(&self, ancestors: &HashMap, mut func: F) + where + F: FnMut(&Pubkey, (&T, Fork)) -> (), + { + for (pubkey, list) in self.account_maps.iter() { + if let Some(fork_info) = self.latest_fork(ancestors, list) { + func(pubkey, fork_info); + } + } + } + + // find the latest fork and T in a list for a given ancestor + fn latest_fork<'a>( &self, - pubkey: &Pubkey, - ancestors: &collections::HashMap, - ) -> Option<(&T, Fork)> { - let list = self.account_maps.get(pubkey)?; + ancestors: &HashMap, + list: &'a [(Fork, T)], + ) -> Option<(&'a T, Fork)> { let mut max = 0; let mut rv = None; - for e in list.iter().rev() { - if e.0 >= max && (ancestors.get(&e.0).is_some() || self.is_root(e.0)) { - trace!("GET {} {:?}", e.0, ancestors); - rv = Some((&e.1, e.0)); - max = e.0; + for (fork, t) in list.iter().rev() { + if *fork >= max && (ancestors.get(fork).is_some() || self.is_root(*fork)) { + trace!("GET {} {:?}", fork, ancestors); + rv = Some((t, *fork)); + max = *fork; } } rv } + /// Get an account + /// The latest account that appears in `ancestors` or `roots` is returned. + pub fn get(&self, pubkey: &Pubkey, ancestors: &HashMap) -> Option<(&T, Fork)> { + self.account_maps + .get(pubkey) + .and_then(|list| self.latest_fork(ancestors, list)) + } + pub fn get_max_root(roots: &HashSet, fork_vec: &[(Fork, T)]) -> Fork { let mut max_root = 0; for (f, _) in fork_vec.iter() { @@ -119,8 +135,12 @@ mod tests { fn test_get_empty() { let key = Keypair::new(); let index = AccountsIndex::::default(); - let ancestors = collections::HashMap::new(); + let ancestors = HashMap::new(); assert_eq!(index.get(&key.pubkey(), &ancestors), None); + + let mut num = 0; + index.scan_accounts(&ancestors, |_pubkey, _index| num += 1); + assert_eq!(num, 0); } #[test] @@ -131,8 +151,12 @@ mod tests { index.insert(0, &key.pubkey(), true, &mut gc); assert!(gc.is_empty()); - let ancestors = collections::HashMap::new(); + let ancestors = HashMap::new(); assert_eq!(index.get(&key.pubkey(), &ancestors), None); + + let mut num = 0; + index.scan_accounts(&ancestors, |_pubkey, _index| num += 1); + assert_eq!(num, 0); } #[test] @@ -145,6 +169,10 @@ mod tests { let ancestors = vec![(1, 1)].into_iter().collect(); assert_eq!(index.get(&key.pubkey(), &ancestors), None); + + let mut num = 0; + index.scan_accounts(&ancestors, |_pubkey, _index| num += 1); + assert_eq!(num, 0); } #[test] @@ -157,6 +185,17 @@ mod tests { let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0))); + + let mut num = 0; + let mut found_key = false; + index.scan_accounts(&ancestors, |pubkey, _index| { + if pubkey == &key.pubkey() { + found_key = true + }; + num += 1 + }); + assert_eq!(num, 1); + assert!(found_key); } #[test] @@ -272,5 +311,17 @@ mod tests { assert_eq!(gc, vec![(0, true), (1, false), (2, true)]); let ancestors = vec![].into_iter().collect(); assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 3))); + + let mut num = 0; + let mut found_key = false; + index.scan_accounts(&ancestors, |pubkey, _index| { + if pubkey == &key.pubkey() { + found_key = true; + assert_eq!(_index, (&true, 3)); + }; + num += 1 + }); + assert_eq!(num, 1); + assert!(found_key); } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 5d595e2aa3..81687831c9 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1166,11 +1166,19 @@ impl Bank { .map(|(account, _)| account) } + pub fn get_program_accounts(&self, program_id: &Pubkey) -> Vec<(Pubkey, Account)> { + self.rc + .accounts + .load_by_program(&self.ancestors, program_id) + } + pub fn get_program_accounts_modified_since_parent( &self, program_id: &Pubkey, ) -> Vec<(Pubkey, Account)> { - self.rc.accounts.load_by_program(self.slot(), program_id) + self.rc + .accounts + .load_by_program_fork(self.slot(), program_id) } pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<(Account, Fork)> { @@ -2595,4 +2603,51 @@ mod tests { assert_eq!(dbank.get_balance(&key.pubkey()), 10); bank.compare_bank(&dbank); } + + #[test] + fn test_bank_get_program_accounts() { + let (genesis_block, _mint_keypair) = create_genesis_block(500); + let parent = Arc::new(Bank::new(&genesis_block)); + + let bank0 = Arc::new(new_from_parent(&parent)); + + let pubkey0 = Pubkey::new_rand(); + let program_id = Pubkey::new(&[2; 32]); + let account0 = Account::new(1, 0, &program_id); + bank0.store_account(&pubkey0, &account0); + + assert_eq!( + bank0.get_program_accounts_modified_since_parent(&program_id), + vec![(pubkey0, account0.clone())] + ); + + let bank1 = Arc::new(new_from_parent(&bank0)); + bank1.squash(); + assert_eq!( + bank0.get_program_accounts(&program_id), + vec![(pubkey0, account0.clone())] + ); + assert_eq!( + bank1.get_program_accounts(&program_id), + vec![(pubkey0, account0.clone())] + ); + assert_eq!( + bank1.get_program_accounts_modified_since_parent(&program_id), + vec![] + ); + + let bank2 = Arc::new(new_from_parent(&bank1)); + let pubkey1 = Pubkey::new_rand(); + let account1 = Account::new(3, 0, &program_id); + bank2.store_account(&pubkey1, &account1); + // Accounts with 0 lamports should be filtered out by Accounts::load_by_program() + let pubkey2 = Pubkey::new_rand(); + let account2 = Account::new(0, 0, &program_id); + bank2.store_account(&pubkey2, &account2); + + let bank3 = Arc::new(new_from_parent(&bank2)); + bank3.squash(); + assert_eq!(bank1.get_program_accounts(&program_id).len(), 2); + assert_eq!(bank3.get_program_accounts(&program_id).len(), 2); + } }