Consolidate locks and error handling when loading accounts(#2309)

This commit is contained in:
jackcmay
2019-01-06 22:06:55 -08:00
committed by GitHub
parent 91bd38504e
commit 0c52df7569
2 changed files with 91 additions and 87 deletions

View File

@ -15,6 +15,9 @@ use std::collections::VecDeque;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::{Mutex, RwLock}; use std::sync::{Mutex, RwLock};
pub type InstructionAccounts = Vec<Account>;
pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>;
#[derive(Default)] #[derive(Default)]
pub struct ErrorCounters { pub struct ErrorCounters {
pub account_not_found: usize, pub account_not_found: usize,
@ -113,22 +116,22 @@ impl AccountsDB {
&mut self, &mut self,
txs: &[Transaction], txs: &[Transaction],
res: &[Result<()>], res: &[Result<()>],
loaded: &[Result<Vec<Account>>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>],
) { ) {
for (i, racc) in loaded.iter().enumerate() { for (i, raccs) in loaded.iter().enumerate() {
if res[i].is_err() || racc.is_err() { if res[i].is_err() || raccs.is_err() {
continue; continue;
} }
let tx = &txs[i]; let tx = &txs[i];
let acc = racc.as_ref().unwrap(); let accs = raccs.as_ref().unwrap();
for (key, account) in tx.account_keys.iter().zip(acc.iter()) { for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) {
self.store(key, account); self.store(key, account);
} }
} }
} }
fn load_account( fn load_tx_accounts(
&self, &self,
tx: &Transaction, tx: &Transaction,
last_ids: &mut StatusDeque<Result<()>>, last_ids: &mut StatusDeque<Result<()>>,
@ -175,18 +178,63 @@ impl AccountsDB {
} }
} }
fn load_executable_accounts(&self, mut program_id: Pubkey) -> Result<Vec<(Pubkey, Account)>> {
let mut accounts = Vec::new();
let mut depth = 0;
loop {
if solana_native_loader::check_id(&program_id) {
// at the root of the chain, ready to dispatch
break;
}
if depth >= 5 {
return Err(BankError::CallChainTooDeep);
}
depth += 1;
let program = match self.load(&program_id) {
Some(program) => program,
None => return Err(BankError::AccountNotFound),
};
if !program.executable || program.loader == Pubkey::default() {
return Err(BankError::AccountNotFound);
}
// add loader to chain
accounts.insert(0, (program_id, program.clone()));
program_id = program.loader;
}
Ok(accounts)
}
/// For each program_id in the transaction, load its loaders.
fn load_loaders(&self, tx: &Transaction) -> Result<Vec<Vec<(Pubkey, Account)>>> {
tx.instructions
.iter()
.map(|ix| {
let program_id = tx.program_ids[ix.program_ids_index as usize];
self.load_executable_accounts(program_id)
})
.collect()
}
fn load_accounts( fn load_accounts(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
last_ids: &mut StatusDeque<Result<()>>, last_ids: &mut StatusDeque<Result<()>>,
results: Vec<Result<()>>, lock_results: Vec<Result<()>>,
max_age: usize, max_age: usize,
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Vec<(Result<Vec<Account>>)> { ) -> Vec<Result<(InstructionAccounts, InstructionLoaders)>> {
txs.iter() txs.iter()
.zip(results.into_iter()) .zip(lock_results.into_iter())
.map(|etx| match etx { .map(|etx| match etx {
(tx, Ok(())) => self.load_account(tx, last_ids, max_age, error_counters), (tx, Ok(())) => {
let accounts = self.load_tx_accounts(tx, last_ids, max_age, error_counters)?;
let loaders = self.load_loaders(tx)?;
Ok((accounts, loaders))
}
(_, Err(e)) => Err(e), (_, Err(e)) => Err(e),
}) })
.collect() .collect()
@ -281,14 +329,14 @@ impl Accounts {
&self, &self,
txs: &[Transaction], txs: &[Transaction],
last_ids: &mut StatusDeque<Result<()>>, last_ids: &mut StatusDeque<Result<()>>,
results: Vec<Result<()>>, lock_results: Vec<Result<()>>,
max_age: usize, max_age: usize,
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Vec<(Result<Vec<Account>>)> { ) -> Vec<Result<(InstructionAccounts, InstructionLoaders)>> {
self.accounts_db.read().unwrap().load_accounts( self.accounts_db.read().unwrap().load_accounts(
txs, txs,
last_ids, last_ids,
results, lock_results,
max_age, max_age,
error_counters, error_counters,
) )
@ -298,7 +346,7 @@ impl Accounts {
&self, &self,
txs: &[Transaction], txs: &[Transaction],
res: &[Result<()>], res: &[Result<()>],
loaded: &[Result<Vec<Account>>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>],
) { ) {
self.accounts_db self.accounts_db
.write() .write()

View File

@ -3,7 +3,7 @@
//! on behalf of the caller, and a low-level API for when they have //! on behalf of the caller, and a low-level API for when they have
//! already been signed and verified. //! already been signed and verified.
use crate::accounts::{Accounts, ErrorCounters}; use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionLoaders};
use crate::checkpoint::Checkpoint; use crate::checkpoint::Checkpoint;
use crate::counter::Counter; use crate::counter::Counter;
use crate::entry::Entry; use crate::entry::Entry;
@ -348,49 +348,6 @@ impl Bank {
} }
} }
fn load_executable_accounts(&self, mut program_id: Pubkey) -> Result<Vec<(Pubkey, Account)>> {
let mut accounts = Vec::new();
let mut depth = 0;
loop {
if solana_native_loader::check_id(&program_id) {
// at the root of the chain, ready to dispatch
break;
}
if depth >= 5 {
return Err(BankError::CallChainTooDeep);
}
depth += 1;
let program = match self.get_account(&program_id) {
Some(program) => program,
None => return Err(BankError::AccountNotFound),
};
if !program.executable || program.loader == Pubkey::default() {
return Err(BankError::AccountNotFound);
}
let loader = program.loader;
// add loader to chain
accounts.insert(0, (program_id, program));
program_id = loader;
}
Ok(accounts)
}
/// For each program_id in the transaction, load its loaders.
fn load_loaders(&self, tx: &Transaction) -> Result<Vec<Vec<(Pubkey, Account)>>> {
tx.instructions
.iter()
.map(|ix| {
let program_id = tx.program_ids[ix.program_ids_index as usize];
self.load_executable_accounts(program_id)
})
.collect()
}
fn lock_accounts(&self, txs: &[Transaction]) -> Vec<Result<()>> { fn lock_accounts(&self, txs: &[Transaction]) -> Vec<Result<()>> {
self.accounts.lock_accounts(txs) self.accounts.lock_accounts(txs)
} }
@ -407,7 +364,7 @@ impl Bank {
let now = Instant::now(); let now = Instant::now();
// Once accounts are locked, other threads cannot encode transactions that will modify the // Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state // same account state
let locked_accounts = self.lock_accounts(txs); let lock_results = self.lock_accounts(txs);
let lock_time = now.elapsed(); let lock_time = now.elapsed();
let now = Instant::now(); let now = Instant::now();
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce // Use a shorter maximum age when adding transactions into the pipeline. This will reduce
@ -415,13 +372,13 @@ impl Bank {
// TODO: Banking stage threads should be prioritized to complete faster then this queue // TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires. // expires.
let results = let results =
self.execute_and_commit_transactions(txs, locked_accounts, MAX_ENTRY_IDS as usize / 2); self.execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2);
let process_time = now.elapsed(); let process_time = now.elapsed();
let now = Instant::now(); let now = Instant::now();
self.record_transactions(txs, &results, poh)?; self.record_transactions(txs, &results, poh)?;
let record_time = now.elapsed(); let record_time = now.elapsed();
let now = Instant::now(); let now = Instant::now();
// Once the accounts are unlocked new transactions can enter the pipeline to process them // Once the accounts are new transactions can enter the pipeline to process them
self.unlock_accounts(&txs, &results); self.unlock_accounts(&txs, &results);
let unlock_time = now.elapsed(); let unlock_time = now.elapsed();
debug!( debug!(
@ -468,13 +425,13 @@ impl Bank {
fn load_accounts( fn load_accounts(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
results: Vec<Result<()>>, lock_results: Vec<Result<()>>,
max_age: usize, max_age: usize,
error_counters: &mut ErrorCounters, error_counters: &mut ErrorCounters,
) -> Vec<(Result<Vec<Account>>)> { ) -> Vec<Result<(InstructionAccounts, InstructionLoaders)>> {
let mut last_ids = self.last_ids.write().unwrap(); let mut last_ids = self.last_ids.write().unwrap();
self.accounts self.accounts
.load_accounts(txs, &mut last_ids, results, max_age, error_counters) .load_accounts(txs, &mut last_ids, lock_results, max_age, error_counters)
} }
/// Process a batch of transactions. /// Process a batch of transactions.
@ -482,14 +439,14 @@ impl Bank {
pub fn execute_and_commit_transactions( pub fn execute_and_commit_transactions(
&self, &self,
txs: &[Transaction], txs: &[Transaction],
locked_accounts: Vec<Result<()>>, lock_results: Vec<Result<()>>,
max_age: usize, max_age: usize,
) -> Vec<Result<()>> { ) -> Vec<Result<()>> {
debug!("processing transactions: {}", txs.len()); debug!("processing transactions: {}", txs.len());
let mut error_counters = ErrorCounters::default(); let mut error_counters = ErrorCounters::default();
let now = Instant::now(); let now = Instant::now();
let mut loaded_accounts = let mut loaded_accounts =
self.load_accounts(txs, locked_accounts, max_age, &mut error_counters); self.load_accounts(txs, lock_results, max_age, &mut error_counters);
let tick_height = self.tick_height(); let tick_height = self.tick_height();
let load_elapsed = now.elapsed(); let load_elapsed = now.elapsed();
@ -497,11 +454,10 @@ impl Bank {
let executed: Vec<Result<()>> = loaded_accounts let executed: Vec<Result<()>> = loaded_accounts
.iter_mut() .iter_mut()
.zip(txs.iter()) .zip(txs.iter())
.map(|(acc, tx)| match acc { .map(|(accs, tx)| match accs {
Err(e) => Err(e.clone()), Err(e) => Err(e.clone()),
Ok(ref mut accounts) => { Ok((ref mut accounts, ref mut loaders)) => {
let mut loaders = self.load_loaders(tx)?; runtime::execute_transaction(tx, loaders, accounts, tick_height).map_err(
runtime::execute_transaction(tx, &mut loaders, accounts, tick_height).map_err(
|RuntimeError::ProgramError(index, err)| { |RuntimeError::ProgramError(index, err)| {
BankError::ProgramError(index, err) BankError::ProgramError(index, err)
}, },
@ -581,8 +537,8 @@ impl Bank {
#[must_use] #[must_use]
pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> { pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let locked_accounts = self.lock_accounts(txs); let lock_results = self.lock_accounts(txs);
let results = self.execute_and_commit_transactions(txs, locked_accounts, MAX_ENTRY_IDS); let results = self.execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS);
self.unlock_accounts(txs, &results); self.unlock_accounts(txs, &results);
results results
} }
@ -618,10 +574,10 @@ impl Bank {
inc_new_counter_info!("bank-par_execute_entries-count", entries.len()); inc_new_counter_info!("bank-par_execute_entries-count", entries.len());
let results: Vec<Result<()>> = entries let results: Vec<Result<()>> = entries
.into_par_iter() .into_par_iter()
.map(|(e, locks)| { .map(|(e, lock_results)| {
let results = self.execute_and_commit_transactions( let results = self.execute_and_commit_transactions(
&e.transactions, &e.transactions,
locks.to_vec(), lock_results.to_vec(),
MAX_ENTRY_IDS, MAX_ENTRY_IDS,
); );
self.unlock_accounts(&e.transactions, &results); self.unlock_accounts(&e.transactions, &results);
@ -651,19 +607,19 @@ impl Bank {
continue; continue;
} }
// try to lock the accounts // try to lock the accounts
let locked = self.lock_accounts(&entry.transactions); let lock_results = self.lock_accounts(&entry.transactions);
// if any of the locks error out // if any of the locks error out
// execute the current group // execute the current group
if Self::first_err(&locked).is_err() { if Self::first_err(&lock_results).is_err() {
self.par_execute_entries(&mt_group)?; self.par_execute_entries(&mt_group)?;
mt_group = vec![]; mt_group = vec![];
//reset the lock and push the entry //reset the lock and push the entry
self.unlock_accounts(&entry.transactions, &locked); self.unlock_accounts(&entry.transactions, &lock_results);
let locked = self.lock_accounts(&entry.transactions); let lock_results = self.lock_accounts(&entry.transactions);
mt_group.push((entry, locked)); mt_group.push((entry, lock_results));
} else { } else {
// push the entry to the mt_group // push the entry to the mt_group
mt_group.push((entry, locked)); mt_group.push((entry, lock_results));
} }
} }
self.par_execute_entries(&mt_group)?; self.par_execute_entries(&mt_group)?;
@ -889,16 +845,16 @@ impl Bank {
&self, &self,
txs: &[Transaction], txs: &[Transaction],
res: &[Result<()>], res: &[Result<()>],
loaded: &[Result<Vec<Account>>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>],
) { ) {
for (i, racc) in loaded.iter().enumerate() { for (i, raccs) in loaded.iter().enumerate() {
if res[i].is_err() || racc.is_err() { if res[i].is_err() || raccs.is_err() {
continue; continue;
} }
let tx = &txs[i]; let tx = &txs[i];
let acc = racc.as_ref().unwrap(); let accs = raccs.as_ref().unwrap();
for (key, account) in tx.account_keys.iter().zip(acc.iter()) { for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) {
self.check_account_subscriptions(&key, account); self.check_account_subscriptions(&key, account);
} }
} }
@ -1426,9 +1382,9 @@ mod tests {
let tx1 = Transaction::system_new(&mint.keypair(), alice.pubkey(), 1, mint.last_id()); let tx1 = Transaction::system_new(&mint.keypair(), alice.pubkey(), 1, mint.last_id());
let pay_alice = vec![tx1]; let pay_alice = vec![tx1];
let locked_alice = bank.lock_accounts(&pay_alice); let lock_result = bank.lock_accounts(&pay_alice);
let results_alice = let results_alice =
bank.execute_and_commit_transactions(&pay_alice, locked_alice, MAX_ENTRY_IDS); bank.execute_and_commit_transactions(&pay_alice, lock_result, MAX_ENTRY_IDS);
assert_eq!(results_alice[0], Ok(())); assert_eq!(results_alice[0], Ok(()));
// try executing an interleaved transfer twice // try executing an interleaved transfer twice