From 357f28ae4e87f51a9ffc18151abe27423b4733d6 Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 2 Apr 2019 13:49:09 -0700 Subject: [PATCH] Implement finalizer so that all locked accounts are dropped (#3585) (#3598) * Implement finalizer so that all locked accounts are dropped when finalizer goes out of scope * Add test for tx error with lock conflict * Fix double unlock from destructor running after a call to unlock --- core/src/banking_stage.rs | 12 ++- core/src/blocktree_processor.rs | 108 +++++++++++++++++++++++-- runtime/src/bank.rs | 41 ++++++---- runtime/src/lib.rs | 1 + runtime/src/locked_accounts_results.rs | 42 ++++++++++ 5 files changed, 173 insertions(+), 31 deletions(-) create mode 100644 runtime/src/locked_accounts_results.rs diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 94f666344b..bf3fe79c86 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -16,6 +16,7 @@ use crate::sigverify_stage::VerifiedPackets; use bincode::deserialize; use solana_metrics::counter::Counter; use solana_runtime::bank::{self, Bank, BankError}; +use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{self, duration_as_us, MAX_RECENT_BLOCKHASHES}; use solana_sdk::transaction::Transaction; @@ -223,18 +224,15 @@ impl BankingStage { bank: &Bank, txs: &[Transaction], poh: &Arc>, - lock_results: &[bank::Result<()>], + lock_results: &LockedAccountsResults, ) -> Result<()> { let now = Instant::now(); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce // the likelihood of any single thread getting starved and processing old ids. // TODO: Banking stage threads should be prioritized to complete faster then this queue // expires. - let (loaded_accounts, results) = bank.load_and_execute_transactions( - txs, - lock_results.to_vec(), - MAX_RECENT_BLOCKHASHES / 2, - ); + let (loaded_accounts, results) = + bank.load_and_execute_transactions(txs, lock_results, MAX_RECENT_BLOCKHASHES / 2); let load_execute_time = now.elapsed(); let record_time = { @@ -276,7 +274,7 @@ impl BankingStage { let now = Instant::now(); // Once the accounts are new transactions can enter the pipeline to process them - bank.unlock_accounts(&txs, &lock_results); + drop(lock_results); let unlock_time = now.elapsed(); debug!( diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index 18b222bd5f..f2efc737d5 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -5,6 +5,7 @@ use crate::leader_schedule_utils; use rayon::prelude::*; use solana_metrics::counter::Counter; use solana_runtime::bank::{Bank, BankError, Result}; +use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::timing::duration_as_ms; use solana_sdk::timing::MAX_RECENT_BLOCKHASHES; @@ -27,17 +28,16 @@ fn first_err(results: &[Result<()>]) -> Result<()> { Ok(()) } -fn par_execute_entries(bank: &Bank, entries: &[(&Entry, Vec>)]) -> Result<()> { +fn par_execute_entries(bank: &Bank, entries: &[(&Entry, LockedAccountsResults)]) -> Result<()> { inc_new_counter_info!("bank-par_execute_entries-count", entries.len()); let results: Vec> = entries .into_par_iter() - .map(|(e, lock_results)| { + .map(|(e, locked_accounts)| { let results = bank.load_execute_and_commit_transactions( &e.transactions, - lock_results.to_vec(), + locked_accounts, MAX_RECENT_BLOCKHASHES, ); - bank.unlock_accounts(&e.transactions, &results); first_err(&results) }) .collect(); @@ -65,11 +65,12 @@ fn par_process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> { let lock_results = bank.lock_accounts(&entry.transactions); // if any of the locks error out // execute the current group - if first_err(&lock_results).is_err() { + if first_err(lock_results.locked_accounts_results()).is_err() { par_execute_entries(bank, &mt_group)?; + // Drop all the locks on accounts by clearing the LockedAccountsFinalizer's in the + // mt_group mt_group = vec![]; - //reset the lock and push the entry - bank.unlock_accounts(&entry.transactions, &lock_results); + drop(lock_results); let lock_results = bank.lock_accounts(&entry.transactions); mt_group.push((entry, lock_results)); } else { @@ -627,7 +628,98 @@ mod tests { } #[test] - fn test_par_process_entries_2_entries_par() { + fn test_process_entries_2_txes_collision_and_error() { + let (genesis_block, mint_keypair) = GenesisBlock::new(1000); + let bank = Bank::new(&genesis_block); + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let keypair3 = Keypair::new(); + let keypair4 = Keypair::new(); + + // fund: put 4 in each of 1 and 2 + assert_matches!( + bank.transfer(4, &mint_keypair, &keypair1.pubkey(), bank.last_blockhash()), + Ok(_) + ); + assert_matches!( + bank.transfer(4, &mint_keypair, &keypair2.pubkey(), bank.last_blockhash()), + Ok(_) + ); + assert_matches!( + bank.transfer(4, &mint_keypair, &keypair4.pubkey(), bank.last_blockhash()), + Ok(_) + ); + + // construct an Entry whose 2nd transaction would cause a lock conflict with previous entry + let entry_1_to_mint = next_entry( + &bank.last_blockhash(), + 1, + vec![ + SystemTransaction::new_account( + &keypair1, + &mint_keypair.pubkey(), + 1, + bank.last_blockhash(), + 0, + ), + SystemTransaction::new_move( + &keypair4, + &keypair4.pubkey(), + 1, + Hash::default(), // Should cause a transaction failure with BlockhashNotFound + 0, + ), + ], + ); + + let entry_2_to_3_mint_to_1 = next_entry( + &entry_1_to_mint.hash, + 1, + vec![ + SystemTransaction::new_account( + &keypair2, + &keypair3.pubkey(), + 2, + bank.last_blockhash(), + 0, + ), // should be fine + SystemTransaction::new_account( + &keypair1, + &mint_keypair.pubkey(), + 2, + bank.last_blockhash(), + 0, + ), // will collide + ], + ); + + assert!(process_entries( + &bank, + &[entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()] + ) + .is_err()); + + // First transaction in first entry succeeded, so keypair1 lost 1 lamport + assert_eq!(bank.get_balance(&keypair1.pubkey()), 3); + assert_eq!(bank.get_balance(&keypair2.pubkey()), 4); + + // Check all accounts are unlocked + let txs1 = &entry_1_to_mint.transactions[..]; + let txs2 = &entry_2_to_3_mint_to_1.transactions[..]; + let locked_accounts1 = bank.lock_accounts(txs1); + for result in locked_accounts1.locked_accounts_results() { + assert!(result.is_ok()); + } + // txs1 and txs2 have accounts that conflict, so we must drop txs1 first + drop(locked_accounts1); + let locked_accounts2 = bank.lock_accounts(txs2); + for result in locked_accounts2.locked_accounts_results() { + assert!(result.is_ok()); + } + } + + #[test] + fn test_process_entries_2_entries_par() { let (genesis_block, mint_keypair) = GenesisBlock::new(1000); let bank = Bank::new(&genesis_block); let keypair1 = Keypair::new(); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 4e369d7d95..648c37668f 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -5,6 +5,7 @@ use crate::accounts::{Accounts, ErrorCounters, InstructionAccounts, InstructionLoaders}; use crate::hash_queue::HashQueue; +use crate::locked_accounts_results::LockedAccountsResults; use crate::runtime::{self, RuntimeError}; use crate::status_cache::StatusCache; use bincode::serialize; @@ -489,18 +490,28 @@ impl Bank { .map_or(Ok(()), |sig| self.get_signature_status(sig).unwrap()) } - pub fn lock_accounts(&self, txs: &[Transaction]) -> Vec> { + pub fn lock_accounts<'a, 'b>( + &'a self, + txs: &'b [Transaction], + ) -> LockedAccountsResults<'a, 'b> { if self.is_frozen() { warn!("=========== FIXME: lock_accounts() working on a frozen bank! ================"); } // TODO: put this assert back in // assert!(!self.is_frozen()); - self.accounts().lock_accounts(self.accounts_id, txs) + let results = self.accounts().lock_accounts(self.accounts_id, txs); + LockedAccountsResults::new(results, &self, txs) } - pub fn unlock_accounts(&self, txs: &[Transaction], results: &[Result<()>]) { - self.accounts() - .unlock_accounts(self.accounts_id, txs, results) + pub fn unlock_accounts(&self, locked_accounts_results: &mut LockedAccountsResults) { + if locked_accounts_results.needs_unlock { + locked_accounts_results.needs_unlock = false; + self.accounts().unlock_accounts( + self.accounts_id, + locked_accounts_results.transactions(), + locked_accounts_results.locked_accounts_results(), + ) + } } fn load_accounts( @@ -512,22 +523,23 @@ impl Bank { self.accounts() .load_accounts(self.accounts_id, txs, results, error_counters) } + fn check_age( &self, txs: &[Transaction], - lock_results: Vec>, + lock_results: &LockedAccountsResults, max_age: usize, error_counters: &mut ErrorCounters, ) -> Vec> { let hash_queue = self.blockhash_queue.read().unwrap(); txs.iter() - .zip(lock_results.into_iter()) + .zip(lock_results.locked_accounts_results()) .map(|(tx, lock_res)| { if lock_res.is_ok() && !hash_queue.check_entry_age(tx.recent_blockhash, max_age) { error_counters.reserve_blockhash += 1; Err(BankError::BlockhashNotFound) } else { - lock_res + lock_res.clone() } }) .collect() @@ -560,7 +572,7 @@ impl Bank { pub fn load_and_execute_transactions( &self, txs: &[Transaction], - lock_results: Vec>, + lock_results: &LockedAccountsResults, max_age: usize, ) -> ( Vec>, @@ -718,7 +730,7 @@ impl Bank { pub fn load_execute_and_commit_transactions( &self, txs: &[Transaction], - lock_results: Vec>, + lock_results: &LockedAccountsResults, max_age: usize, ) -> Vec> { let (loaded_accounts, executed) = @@ -730,10 +742,7 @@ impl Bank { #[must_use] pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { let lock_results = self.lock_accounts(txs); - let results = - self.load_execute_and_commit_transactions(txs, lock_results, MAX_RECENT_BLOCKHASHES); - self.unlock_accounts(txs, &results); - results + self.load_execute_and_commit_transactions(txs, &lock_results, MAX_RECENT_BLOCKHASHES) } /// Create, sign, and process a Transaction from `keypair` to `to` of @@ -1291,7 +1300,7 @@ mod tests { let lock_result = bank.lock_accounts(&pay_alice); let results_alice = bank.load_execute_and_commit_transactions( &pay_alice, - lock_result, + &lock_result, MAX_RECENT_BLOCKHASHES, ); assert_eq!(results_alice[0], Ok(())); @@ -1308,7 +1317,7 @@ mod tests { Err(BankError::AccountInUse) ); - bank.unlock_accounts(&pay_alice, &results_alice); + drop(lock_result); assert!(bank .transfer(2, &mint_keypair, &bob.pubkey(), genesis_block.hash()) diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 291dc2fb87..e9f07883a2 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -4,6 +4,7 @@ pub mod bank; pub mod bloom; mod hash_queue; pub mod loader_utils; +pub mod locked_accounts_results; mod native_loader; pub mod runtime; mod status_cache; diff --git a/runtime/src/locked_accounts_results.rs b/runtime/src/locked_accounts_results.rs new file mode 100644 index 0000000000..98d8845698 --- /dev/null +++ b/runtime/src/locked_accounts_results.rs @@ -0,0 +1,42 @@ +use crate::bank::{Bank, Result}; +use solana_sdk::transaction::Transaction; + +// Represents the results of trying to lock a set of accounts +pub struct LockedAccountsResults<'a, 'b> { + locked_accounts_results: Vec>, + bank: &'a Bank, + transactions: &'b [Transaction], + pub(crate) needs_unlock: bool, +} + +impl<'a, 'b> LockedAccountsResults<'a, 'b> { + pub fn new( + locked_accounts_results: Vec>, + bank: &'a Bank, + transactions: &'b [Transaction], + ) -> Self { + Self { + locked_accounts_results, + bank, + transactions, + needs_unlock: true, + } + } + + pub fn locked_accounts_results(&self) -> &Vec> { + &self.locked_accounts_results + } + + pub fn transactions(&self) -> &[Transaction] { + self.transactions + } +} + +// Unlock all locked accounts in destructor. +impl<'a, 'b> Drop for LockedAccountsResults<'a, 'b> { + fn drop(&mut self) { + if self.needs_unlock { + self.bank.unlock_accounts(self) + } + } +}