v0.16: AccountsDB updates and getProgramAccounts RPC fix (#5044)

* reduce replicode in accounts, fix cast to i64 (#5025)

* add accounts_index_scan_accounts (#5020)

* Plumb scan_accounts into accounts_db, adding load from storage (#5029)

* Fix getProgramAccounts RPC (#5024)

* Use scan_accounts to load accounts by program_id

* Add bank test

* Use get_program_accounts in RPC

* Rebase for v0.16
This commit is contained in:
Tyera Eulberg
2019-07-11 17:57:56 -06:00
committed by GitHub
parent f759ac3a8d
commit 400610bf6a
5 changed files with 262 additions and 56 deletions

View File

@ -73,7 +73,7 @@ impl JsonRpcRequestProcessor {
pub fn get_program_accounts(&self, program_id: &Pubkey) -> Result<Vec<(String, Account)>> { pub fn get_program_accounts(&self, program_id: &Pubkey) -> Result<Vec<(String, Account)>> {
Ok(self Ok(self
.bank() .bank()
.get_program_accounts_modified_since_parent(&program_id) .get_program_accounts(&program_id)
.into_iter() .into_iter()
.map(|(pubkey, account)| (pubkey.to_string(), account)) .map(|(pubkey, account)| (pubkey.to_string(), account))
.collect()) .collect())

View File

@ -22,7 +22,6 @@ use std::collections::{HashMap, HashSet};
use std::env; use std::env;
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
use std::io::{BufReader, Read}; use std::io::{BufReader, Read};
use std::ops::Neg;
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -294,27 +293,65 @@ impl Accounts {
.filter(|(acc, _)| acc.lamports != 0) .filter(|(acc, _)| acc.lamports != 0)
} }
pub fn load_by_program(&self, fork: Fork, program_id: &Pubkey) -> Vec<(Pubkey, Account)> { /// scans underlying accounts_db for this delta (fork) with a map function
let accumulator: Vec<Vec<(Pubkey, u64, Account)>> = self.accounts_db.scan_account_storage( /// from StoredAccount to B
/// returns only the latest/current version of B for this fork
fn scan_fork<F, B>(&self, fork: Fork, func: F) -> Vec<B>
where
F: Fn(&StoredAccount) -> Option<B>,
F: Send + Sync,
B: Send + Default,
{
let accumulator: Vec<Vec<(Pubkey, u64, B)>> = self.accounts_db.scan_account_storage(
fork, fork,
|stored_account: &StoredAccount, |stored_account: &StoredAccount,
_id: AppendVecId, _id: AppendVecId,
accum: &mut Vec<(Pubkey, u64, Account)>| { accum: &mut Vec<(Pubkey, u64, B)>| {
if stored_account.balance.owner == *program_id { if let Some(val) = func(stored_account) {
let val = ( accum.push((
stored_account.meta.pubkey, stored_account.meta.pubkey,
stored_account.meta.write_version, std::u64::MAX - stored_account.meta.write_version,
stored_account.clone_account(), val,
); ));
accum.push(val)
} }
}, },
); );
let mut versions: Vec<(Pubkey, u64, Account)> =
accumulator.into_iter().flat_map(|x| x).collect(); let mut versions: Vec<(Pubkey, u64, B)> = accumulator.into_iter().flat_map(|x| x).collect();
versions.sort_by_key(|s| (s.0, (s.1 as i64).neg())); versions.sort_by_key(|s| (s.0, s.1));
versions.dedup_by_key(|s| s.0); versions.dedup_by_key(|s| s.0);
versions.into_iter().map(|s| (s.0, s.2)).collect() versions
.into_iter()
.map(|(_pubkey, _version, val)| val)
.collect()
}
pub fn load_by_program_fork(&self, fork: Fork, program_id: &Pubkey) -> Vec<(Pubkey, Account)> {
self.scan_fork(fork, |stored_account| {
if stored_account.balance.owner == *program_id {
Some((stored_account.meta.pubkey, stored_account.clone_account()))
} else {
None
}
})
}
pub fn load_by_program(
&self,
ancestors: &HashMap<Fork, usize>,
program_id: &Pubkey,
) -> Vec<(Pubkey, Account)> {
self.accounts_db.scan_accounts(
ancestors,
|collector: &mut Vec<(Pubkey, Account)>, option| {
if let Some(data) = option
.filter(|(_, account, _)| account.owner == *program_id && account.lamports != 0)
.map(|(pubkey, account, _fork)| (*pubkey, account))
{
collector.push(data)
}
},
)
} }
/// Slow because lock is held for 1 operation instead of many /// Slow because lock is held for 1 operation instead of many
@ -362,28 +399,19 @@ impl Accounts {
} }
pub fn hash_internal_state(&self, fork_id: Fork) -> Option<Hash> { pub fn hash_internal_state(&self, fork_id: Fork) -> Option<Hash> {
let accumulator: Vec<Vec<(Pubkey, u64, Hash)>> = self.accounts_db.scan_account_storage( let account_hashes = self.scan_fork(fork_id, |stored_account| {
fork_id, if !syscall::check_id(&stored_account.balance.owner) {
|stored_account: &StoredAccount, Some(Self::hash_account(stored_account))
_id: AppendVecId, } else {
accum: &mut Vec<(Pubkey, u64, Hash)>| { None
if !syscall::check_id(&stored_account.balance.owner) { }
accum.push(( });
stored_account.meta.pubkey,
stored_account.meta.write_version,
Self::hash_account(stored_account),
));
}
},
);
let mut account_hashes: Vec<_> = accumulator.into_iter().flat_map(|x| x).collect();
account_hashes.sort_by_key(|s| (s.0, (s.1 as i64).neg()));
account_hashes.dedup_by_key(|s| s.0);
if account_hashes.is_empty() { if account_hashes.is_empty() {
None None
} else { } else {
let mut hasher = Hasher::default(); let mut hasher = Hasher::default();
for (_, _, hash) in account_hashes { for hash in account_hashes {
hasher.hash(hash.as_ref()); hasher.hash(hash.as_ref());
} }
Some(hasher.result()) Some(hasher.result())
@ -938,7 +966,7 @@ mod tests {
} }
#[test] #[test]
fn test_load_by_program() { fn test_load_by_program_fork() {
let accounts = Accounts::new(None); let accounts = Accounts::new(None);
// Load accounts owned by various programs into AccountsDB // Load accounts owned by various programs into AccountsDB
@ -952,11 +980,11 @@ mod tests {
let account2 = Account::new(1, 0, &Pubkey::new(&[3; 32])); let account2 = Account::new(1, 0, &Pubkey::new(&[3; 32]));
accounts.store_slow(0, &pubkey2, &account2); accounts.store_slow(0, &pubkey2, &account2);
let loaded = accounts.load_by_program(0, &Pubkey::new(&[2; 32])); let loaded = accounts.load_by_program_fork(0, &Pubkey::new(&[2; 32]));
assert_eq!(loaded.len(), 2); assert_eq!(loaded.len(), 2);
let loaded = accounts.load_by_program(0, &Pubkey::new(&[3; 32])); let loaded = accounts.load_by_program_fork(0, &Pubkey::new(&[3; 32]));
assert_eq!(loaded, vec![(pubkey2, account2)]); assert_eq!(loaded, vec![(pubkey2, account2)]);
let loaded = accounts.load_by_program(0, &Pubkey::new(&[4; 32])); let loaded = accounts.load_by_program_fork(0, &Pubkey::new(&[4; 32]));
assert_eq!(loaded, vec![]); assert_eq!(loaded, vec![]);
} }

View File

@ -378,6 +378,36 @@ impl AccountsDB {
false false
} }
pub fn scan_accounts<F, A>(&self, ancestors: &HashMap<Fork, usize>, scan_func: F) -> A
where
F: Fn(&mut A, Option<(&Pubkey, Account, Fork)>) -> (),
A: Default,
{
let mut collector = A::default();
let accounts_index = self.accounts_index.read().unwrap();
let storage = self.storage.read().unwrap();
accounts_index.scan_accounts(ancestors, |pubkey, (account_info, fork)| {
scan_func(
&mut collector,
storage
.0
.get(&fork)
.and_then(|storage_map| storage_map.get(&account_info.id))
.and_then(|store| {
Some(
store
.accounts
.get_account(account_info.offset)?
.0
.clone_account(),
)
})
.map(|account| (pubkey, account, fork)),
)
});
collector
}
/// Scan a specific fork through all the account storage in parallel with sequential read /// Scan a specific fork through all the account storage in parallel with sequential read
// PERF: Sequentially read each storage entry in parallel // PERF: Sequentially read each storage entry in parallel
pub fn scan_account_storage<F, B>(&self, fork_id: Fork, scan_func: F) -> Vec<B> pub fn scan_account_storage<F, B>(&self, fork_id: Fork, scan_func: F) -> Vec<B>
@ -770,6 +800,14 @@ mod tests {
let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); let ancestors = vec![(1, 1), (0, 0)].into_iter().collect();
assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1); assert_eq!(&db.load_slow(&ancestors, &key).unwrap().0, &account1);
let accounts: Vec<Account> =
db.scan_accounts(&ancestors, |accounts: &mut Vec<Account>, option| {
if let Some(data) = option {
accounts.push(data.1);
}
});
assert_eq!(accounts, vec![account1]);
} }
#[test] #[test]
@ -1313,4 +1351,38 @@ mod tests {
t.join().unwrap(); t.join().unwrap();
} }
} }
#[test]
fn test_accountsdb_scan_accounts() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths);
let key = Pubkey::default();
let key0 = Pubkey::new_rand();
let account0 = Account::new(1, 0, &key);
db.store(0, &hashmap!(&key0 => (&account0, 0)));
let key1 = Pubkey::new_rand();
let account1 = Account::new(2, 0, &key);
db.store(1, &hashmap!(&key1 => (&account1, 0)));
let ancestors = vec![(0, 0)].into_iter().collect();
let accounts: Vec<Account> =
db.scan_accounts(&ancestors, |accounts: &mut Vec<Account>, option| {
if let Some(data) = option {
accounts.push(data.1);
}
});
assert_eq!(accounts, vec![account0]);
let ancestors = vec![(1, 1), (0, 0)].into_iter().collect();
let accounts: Vec<Account> =
db.scan_accounts(&ancestors, |accounts: &mut Vec<Account>, option| {
if let Some(data) = option {
accounts.push(data.1);
}
});
assert_eq!(accounts.len(), 2);
}
} }

View File

@ -1,14 +1,12 @@
use hashbrown::HashMap;
use log::*; use log::*;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::collections; use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
pub type Fork = u64; pub type Fork = u64;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct AccountsIndex<T> { pub struct AccountsIndex<T> {
pub account_maps: HashMap<Pubkey, Vec<(Fork, T)>>, pub account_maps: hashbrown::HashMap<Pubkey, Vec<(Fork, T)>>,
pub roots: HashSet<Fork>, pub roots: HashSet<Fork>,
@ -17,26 +15,44 @@ pub struct AccountsIndex<T> {
} }
impl<T: Clone> AccountsIndex<T> { impl<T: Clone> AccountsIndex<T> {
/// Get an account /// call func with every pubkey and index visible from a given set of ancestors
/// The latest account that appears in `ancestors` or `roots` is returned. pub fn scan_accounts<F>(&self, ancestors: &HashMap<Fork, usize>, mut func: F)
pub fn get( where
F: FnMut(&Pubkey, (&T, Fork)) -> (),
{
for (pubkey, list) in self.account_maps.iter() {
if let Some(fork_info) = self.latest_fork(ancestors, list) {
func(pubkey, fork_info);
}
}
}
// find the latest fork and T in a list for a given ancestor
fn latest_fork<'a>(
&self, &self,
pubkey: &Pubkey, ancestors: &HashMap<Fork, usize>,
ancestors: &collections::HashMap<Fork, usize>, list: &'a [(Fork, T)],
) -> Option<(&T, Fork)> { ) -> Option<(&'a T, Fork)> {
let list = self.account_maps.get(pubkey)?;
let mut max = 0; let mut max = 0;
let mut rv = None; let mut rv = None;
for e in list.iter().rev() { for (fork, t) in list.iter().rev() {
if e.0 >= max && (ancestors.get(&e.0).is_some() || self.is_root(e.0)) { if *fork >= max && (ancestors.get(fork).is_some() || self.is_root(*fork)) {
trace!("GET {} {:?}", e.0, ancestors); trace!("GET {} {:?}", fork, ancestors);
rv = Some((&e.1, e.0)); rv = Some((t, *fork));
max = e.0; max = *fork;
} }
} }
rv rv
} }
/// Get an account
/// The latest account that appears in `ancestors` or `roots` is returned.
pub fn get(&self, pubkey: &Pubkey, ancestors: &HashMap<Fork, usize>) -> Option<(&T, Fork)> {
self.account_maps
.get(pubkey)
.and_then(|list| self.latest_fork(ancestors, list))
}
pub fn get_max_root(roots: &HashSet<Fork>, fork_vec: &[(Fork, T)]) -> Fork { pub fn get_max_root(roots: &HashSet<Fork>, fork_vec: &[(Fork, T)]) -> Fork {
let mut max_root = 0; let mut max_root = 0;
for (f, _) in fork_vec.iter() { for (f, _) in fork_vec.iter() {
@ -119,8 +135,12 @@ mod tests {
fn test_get_empty() { fn test_get_empty() {
let key = Keypair::new(); let key = Keypair::new();
let index = AccountsIndex::<bool>::default(); let index = AccountsIndex::<bool>::default();
let ancestors = collections::HashMap::new(); let ancestors = HashMap::new();
assert_eq!(index.get(&key.pubkey(), &ancestors), None); assert_eq!(index.get(&key.pubkey(), &ancestors), None);
let mut num = 0;
index.scan_accounts(&ancestors, |_pubkey, _index| num += 1);
assert_eq!(num, 0);
} }
#[test] #[test]
@ -131,8 +151,12 @@ mod tests {
index.insert(0, &key.pubkey(), true, &mut gc); index.insert(0, &key.pubkey(), true, &mut gc);
assert!(gc.is_empty()); assert!(gc.is_empty());
let ancestors = collections::HashMap::new(); let ancestors = HashMap::new();
assert_eq!(index.get(&key.pubkey(), &ancestors), None); assert_eq!(index.get(&key.pubkey(), &ancestors), None);
let mut num = 0;
index.scan_accounts(&ancestors, |_pubkey, _index| num += 1);
assert_eq!(num, 0);
} }
#[test] #[test]
@ -145,6 +169,10 @@ mod tests {
let ancestors = vec![(1, 1)].into_iter().collect(); let ancestors = vec![(1, 1)].into_iter().collect();
assert_eq!(index.get(&key.pubkey(), &ancestors), None); assert_eq!(index.get(&key.pubkey(), &ancestors), None);
let mut num = 0;
index.scan_accounts(&ancestors, |_pubkey, _index| num += 1);
assert_eq!(num, 0);
} }
#[test] #[test]
@ -157,6 +185,17 @@ mod tests {
let ancestors = vec![(0, 0)].into_iter().collect(); let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0))); assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 0)));
let mut num = 0;
let mut found_key = false;
index.scan_accounts(&ancestors, |pubkey, _index| {
if pubkey == &key.pubkey() {
found_key = true
};
num += 1
});
assert_eq!(num, 1);
assert!(found_key);
} }
#[test] #[test]
@ -272,5 +311,17 @@ mod tests {
assert_eq!(gc, vec![(0, true), (1, false), (2, true)]); assert_eq!(gc, vec![(0, true), (1, false), (2, true)]);
let ancestors = vec![].into_iter().collect(); let ancestors = vec![].into_iter().collect();
assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 3))); assert_eq!(index.get(&key.pubkey(), &ancestors), Some((&true, 3)));
let mut num = 0;
let mut found_key = false;
index.scan_accounts(&ancestors, |pubkey, _index| {
if pubkey == &key.pubkey() {
found_key = true;
assert_eq!(_index, (&true, 3));
};
num += 1
});
assert_eq!(num, 1);
assert!(found_key);
} }
} }

View File

@ -1166,11 +1166,19 @@ impl Bank {
.map(|(account, _)| account) .map(|(account, _)| account)
} }
pub fn get_program_accounts(&self, program_id: &Pubkey) -> Vec<(Pubkey, Account)> {
self.rc
.accounts
.load_by_program(&self.ancestors, program_id)
}
pub fn get_program_accounts_modified_since_parent( pub fn get_program_accounts_modified_since_parent(
&self, &self,
program_id: &Pubkey, program_id: &Pubkey,
) -> Vec<(Pubkey, Account)> { ) -> Vec<(Pubkey, Account)> {
self.rc.accounts.load_by_program(self.slot(), program_id) self.rc
.accounts
.load_by_program_fork(self.slot(), program_id)
} }
pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<(Account, Fork)> { pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<(Account, Fork)> {
@ -2595,4 +2603,51 @@ mod tests {
assert_eq!(dbank.get_balance(&key.pubkey()), 10); assert_eq!(dbank.get_balance(&key.pubkey()), 10);
bank.compare_bank(&dbank); bank.compare_bank(&dbank);
} }
#[test]
fn test_bank_get_program_accounts() {
let (genesis_block, _mint_keypair) = create_genesis_block(500);
let parent = Arc::new(Bank::new(&genesis_block));
let bank0 = Arc::new(new_from_parent(&parent));
let pubkey0 = Pubkey::new_rand();
let program_id = Pubkey::new(&[2; 32]);
let account0 = Account::new(1, 0, &program_id);
bank0.store_account(&pubkey0, &account0);
assert_eq!(
bank0.get_program_accounts_modified_since_parent(&program_id),
vec![(pubkey0, account0.clone())]
);
let bank1 = Arc::new(new_from_parent(&bank0));
bank1.squash();
assert_eq!(
bank0.get_program_accounts(&program_id),
vec![(pubkey0, account0.clone())]
);
assert_eq!(
bank1.get_program_accounts(&program_id),
vec![(pubkey0, account0.clone())]
);
assert_eq!(
bank1.get_program_accounts_modified_since_parent(&program_id),
vec![]
);
let bank2 = Arc::new(new_from_parent(&bank1));
let pubkey1 = Pubkey::new_rand();
let account1 = Account::new(3, 0, &program_id);
bank2.store_account(&pubkey1, &account1);
// Accounts with 0 lamports should be filtered out by Accounts::load_by_program()
let pubkey2 = Pubkey::new_rand();
let account2 = Account::new(0, 0, &program_id);
bank2.store_account(&pubkey2, &account2);
let bank3 = Arc::new(new_from_parent(&bank2));
bank3.squash();
assert_eq!(bank1.get_program_accounts(&program_id).len(), 2);
assert_eq!(bank3.get_program_accounts(&program_id).len(), 2);
}
} }