From 0c52df75699b750ef293b0b78acc9adf95137764 Mon Sep 17 00:00:00 2001 From: jackcmay Date: Sun, 6 Jan 2019 22:06:55 -0800 Subject: [PATCH] Consolidate locks and error handling when loading accounts(#2309) --- src/accounts.rs | 76 +++++++++++++++++++++++++++++------- src/bank.rs | 102 ++++++++++++++---------------------------------- 2 files changed, 91 insertions(+), 87 deletions(-) diff --git a/src/accounts.rs b/src/accounts.rs index c1eb0dc314..7fd636eda1 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -15,6 +15,9 @@ use std::collections::VecDeque; use std::sync::atomic::AtomicUsize; use std::sync::{Mutex, RwLock}; +pub type InstructionAccounts = Vec; +pub type InstructionLoaders = Vec>; + #[derive(Default)] pub struct ErrorCounters { pub account_not_found: usize, @@ -113,22 +116,22 @@ impl AccountsDB { &mut self, txs: &[Transaction], res: &[Result<()>], - loaded: &[Result>], + loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], ) { - for (i, racc) in loaded.iter().enumerate() { - if res[i].is_err() || racc.is_err() { + for (i, raccs) in loaded.iter().enumerate() { + if res[i].is_err() || raccs.is_err() { continue; } let tx = &txs[i]; - let acc = racc.as_ref().unwrap(); - for (key, account) in tx.account_keys.iter().zip(acc.iter()) { + let accs = raccs.as_ref().unwrap(); + for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) { self.store(key, account); } } } - fn load_account( + fn load_tx_accounts( &self, tx: &Transaction, last_ids: &mut StatusDeque>, @@ -175,18 +178,63 @@ impl AccountsDB { } } + fn load_executable_accounts(&self, mut program_id: Pubkey) -> Result> { + let mut accounts = Vec::new(); + let mut depth = 0; + loop { + if solana_native_loader::check_id(&program_id) { + // at the root of the chain, ready to dispatch + break; + } + + if depth >= 5 { + return Err(BankError::CallChainTooDeep); + } + depth += 1; + + let program = match self.load(&program_id) { + Some(program) => program, + None => return Err(BankError::AccountNotFound), + }; + if !program.executable || program.loader == Pubkey::default() { + return Err(BankError::AccountNotFound); + } + + // add loader to chain + accounts.insert(0, (program_id, program.clone())); + + program_id = program.loader; + } + Ok(accounts) + } + + /// For each program_id in the transaction, load its loaders. + fn load_loaders(&self, tx: &Transaction) -> Result>> { + tx.instructions + .iter() + .map(|ix| { + let program_id = tx.program_ids[ix.program_ids_index as usize]; + self.load_executable_accounts(program_id) + }) + .collect() + } + fn load_accounts( &self, txs: &[Transaction], last_ids: &mut StatusDeque>, - results: Vec>, + lock_results: Vec>, max_age: usize, error_counters: &mut ErrorCounters, - ) -> Vec<(Result>)> { + ) -> Vec> { txs.iter() - .zip(results.into_iter()) + .zip(lock_results.into_iter()) .map(|etx| match etx { - (tx, Ok(())) => self.load_account(tx, last_ids, max_age, error_counters), + (tx, Ok(())) => { + let accounts = self.load_tx_accounts(tx, last_ids, max_age, error_counters)?; + let loaders = self.load_loaders(tx)?; + Ok((accounts, loaders)) + } (_, Err(e)) => Err(e), }) .collect() @@ -281,14 +329,14 @@ impl Accounts { &self, txs: &[Transaction], last_ids: &mut StatusDeque>, - results: Vec>, + lock_results: Vec>, max_age: usize, error_counters: &mut ErrorCounters, - ) -> Vec<(Result>)> { + ) -> Vec> { self.accounts_db.read().unwrap().load_accounts( txs, last_ids, - results, + lock_results, max_age, error_counters, ) @@ -298,7 +346,7 @@ impl Accounts { &self, txs: &[Transaction], res: &[Result<()>], - loaded: &[Result>], + loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], ) { self.accounts_db .write() diff --git a/src/bank.rs b/src/bank.rs index 734f19eb42..86fb3c3183 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -3,7 +3,7 @@ //! on behalf of the caller, and a low-level API for when they have //! already been signed and verified. -use crate::accounts::{Accounts, ErrorCounters}; +use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionLoaders}; use crate::checkpoint::Checkpoint; use crate::counter::Counter; use crate::entry::Entry; @@ -348,49 +348,6 @@ impl Bank { } } - fn load_executable_accounts(&self, mut program_id: Pubkey) -> Result> { - let mut accounts = Vec::new(); - let mut depth = 0; - loop { - if solana_native_loader::check_id(&program_id) { - // at the root of the chain, ready to dispatch - break; - } - - if depth >= 5 { - return Err(BankError::CallChainTooDeep); - } - depth += 1; - - let program = match self.get_account(&program_id) { - Some(program) => program, - None => return Err(BankError::AccountNotFound), - }; - if !program.executable || program.loader == Pubkey::default() { - return Err(BankError::AccountNotFound); - } - - let loader = program.loader; - - // add loader to chain - accounts.insert(0, (program_id, program)); - - program_id = loader; - } - Ok(accounts) - } - - /// For each program_id in the transaction, load its loaders. - fn load_loaders(&self, tx: &Transaction) -> Result>> { - tx.instructions - .iter() - .map(|ix| { - let program_id = tx.program_ids[ix.program_ids_index as usize]; - self.load_executable_accounts(program_id) - }) - .collect() - } - fn lock_accounts(&self, txs: &[Transaction]) -> Vec> { self.accounts.lock_accounts(txs) } @@ -407,7 +364,7 @@ impl Bank { let now = Instant::now(); // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state - let locked_accounts = self.lock_accounts(txs); + let lock_results = self.lock_accounts(txs); let lock_time = now.elapsed(); let now = Instant::now(); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce @@ -415,13 +372,13 @@ impl Bank { // TODO: Banking stage threads should be prioritized to complete faster then this queue // expires. let results = - self.execute_and_commit_transactions(txs, locked_accounts, MAX_ENTRY_IDS as usize / 2); + self.execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS as usize / 2); let process_time = now.elapsed(); let now = Instant::now(); self.record_transactions(txs, &results, poh)?; let record_time = now.elapsed(); let now = Instant::now(); - // Once the accounts are unlocked new transactions can enter the pipeline to process them + // Once the accounts are new transactions can enter the pipeline to process them self.unlock_accounts(&txs, &results); let unlock_time = now.elapsed(); debug!( @@ -468,13 +425,13 @@ impl Bank { fn load_accounts( &self, txs: &[Transaction], - results: Vec>, + lock_results: Vec>, max_age: usize, error_counters: &mut ErrorCounters, - ) -> Vec<(Result>)> { + ) -> Vec> { let mut last_ids = self.last_ids.write().unwrap(); self.accounts - .load_accounts(txs, &mut last_ids, results, max_age, error_counters) + .load_accounts(txs, &mut last_ids, lock_results, max_age, error_counters) } /// Process a batch of transactions. @@ -482,14 +439,14 @@ impl Bank { pub fn execute_and_commit_transactions( &self, txs: &[Transaction], - locked_accounts: Vec>, + lock_results: Vec>, max_age: usize, ) -> Vec> { debug!("processing transactions: {}", txs.len()); let mut error_counters = ErrorCounters::default(); let now = Instant::now(); let mut loaded_accounts = - self.load_accounts(txs, locked_accounts, max_age, &mut error_counters); + self.load_accounts(txs, lock_results, max_age, &mut error_counters); let tick_height = self.tick_height(); let load_elapsed = now.elapsed(); @@ -497,11 +454,10 @@ impl Bank { let executed: Vec> = loaded_accounts .iter_mut() .zip(txs.iter()) - .map(|(acc, tx)| match acc { + .map(|(accs, tx)| match accs { Err(e) => Err(e.clone()), - Ok(ref mut accounts) => { - let mut loaders = self.load_loaders(tx)?; - runtime::execute_transaction(tx, &mut loaders, accounts, tick_height).map_err( + Ok((ref mut accounts, ref mut loaders)) => { + runtime::execute_transaction(tx, loaders, accounts, tick_height).map_err( |RuntimeError::ProgramError(index, err)| { BankError::ProgramError(index, err) }, @@ -581,8 +537,8 @@ impl Bank { #[must_use] pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { - let locked_accounts = self.lock_accounts(txs); - let results = self.execute_and_commit_transactions(txs, locked_accounts, MAX_ENTRY_IDS); + let lock_results = self.lock_accounts(txs); + let results = self.execute_and_commit_transactions(txs, lock_results, MAX_ENTRY_IDS); self.unlock_accounts(txs, &results); results } @@ -618,10 +574,10 @@ impl Bank { inc_new_counter_info!("bank-par_execute_entries-count", entries.len()); let results: Vec> = entries .into_par_iter() - .map(|(e, locks)| { + .map(|(e, lock_results)| { let results = self.execute_and_commit_transactions( &e.transactions, - locks.to_vec(), + lock_results.to_vec(), MAX_ENTRY_IDS, ); self.unlock_accounts(&e.transactions, &results); @@ -651,19 +607,19 @@ impl Bank { continue; } // try to lock the accounts - let locked = self.lock_accounts(&entry.transactions); + let lock_results = self.lock_accounts(&entry.transactions); // if any of the locks error out // execute the current group - if Self::first_err(&locked).is_err() { + if Self::first_err(&lock_results).is_err() { self.par_execute_entries(&mt_group)?; mt_group = vec![]; //reset the lock and push the entry - self.unlock_accounts(&entry.transactions, &locked); - let locked = self.lock_accounts(&entry.transactions); - mt_group.push((entry, locked)); + self.unlock_accounts(&entry.transactions, &lock_results); + let lock_results = self.lock_accounts(&entry.transactions); + mt_group.push((entry, lock_results)); } else { // push the entry to the mt_group - mt_group.push((entry, locked)); + mt_group.push((entry, lock_results)); } } self.par_execute_entries(&mt_group)?; @@ -889,16 +845,16 @@ impl Bank { &self, txs: &[Transaction], res: &[Result<()>], - loaded: &[Result>], + loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], ) { - for (i, racc) in loaded.iter().enumerate() { - if res[i].is_err() || racc.is_err() { + for (i, raccs) in loaded.iter().enumerate() { + if res[i].is_err() || raccs.is_err() { continue; } let tx = &txs[i]; - let acc = racc.as_ref().unwrap(); - for (key, account) in tx.account_keys.iter().zip(acc.iter()) { + let accs = raccs.as_ref().unwrap(); + for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) { self.check_account_subscriptions(&key, account); } } @@ -1426,9 +1382,9 @@ mod tests { let tx1 = Transaction::system_new(&mint.keypair(), alice.pubkey(), 1, mint.last_id()); let pay_alice = vec![tx1]; - let locked_alice = bank.lock_accounts(&pay_alice); + let lock_result = bank.lock_accounts(&pay_alice); let results_alice = - bank.execute_and_commit_transactions(&pay_alice, locked_alice, MAX_ENTRY_IDS); + bank.execute_and_commit_transactions(&pay_alice, lock_result, MAX_ENTRY_IDS); assert_eq!(results_alice[0], Ok(())); // try executing an interleaved transfer twice