From 1a2b131ceb1e357b9ace7d73b4446442ce9e14a6 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 8 May 2019 10:32:25 -0700 Subject: [PATCH] Don't forward transactions that are expired or failed signature check (#4199) --- core/src/banking_stage.rs | 377 ++++++++++++++++++++++---------------- runtime/src/bank.rs | 26 ++- runtime/src/lib.rs | 2 +- 3 files changed, 245 insertions(+), 160 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index e724095b2f..bcbc4898ee 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -16,6 +16,7 @@ use crate::sigverify_stage::VerifiedPackets; use bincode::deserialize; use itertools::Itertools; use solana_metrics::counter::Counter; +use solana_runtime::accounts_db::ErrorCounters; use solana_runtime::bank::Bank; use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_sdk::pubkey::Pubkey; @@ -487,58 +488,59 @@ impl BankingStage { Ok((chunk_start, unprocessed_txs)) } - fn process_received_packets_using_closure<'a, F>( - bank: &'a Arc, - poh: &'a Arc>, + // This function returns a vector of transactions that are not None. It also returns a vector + // with position of the transaction in the input list + fn filter_transaction_indexes( transactions: Vec>, indexes: &[usize], - f: F, - ) -> Result<(usize, usize, Vec)> - where - F: Fn(&'a Bank, &[Transaction], &'a Arc>) -> Result<(usize, Vec)>, - { - debug!("banking-stage-tx bank {}", bank.slot()); - - debug!( - "bank: {} transactions received {}", - bank.slot(), - transactions.len() - ); - let (verified_transactions, verified_indexes): (Vec<_>, Vec<_>) = transactions + ) -> (Vec, Vec) { + transactions .into_iter() .zip(indexes) .filter_map(|(tx, index)| match tx { None => None, Some(tx) => Some((tx, index)), }) - .unzip(); - - debug!( - "bank: {} verified transactions {}", - bank.slot(), - verified_transactions.len() - ); - - let tx_len = verified_transactions.len(); - - let (processed, unprocessed_txs) = f(bank, &verified_transactions, poh)?; - - let unprocessed_indexes: Vec<_> = unprocessed_txs - .iter() - .map(|x| verified_indexes[*x]) - .collect(); - - Ok((processed, tx_len, unprocessed_indexes)) + .unzip() } - fn process_received_packets<'a>( - bank: &'a Arc, - poh: &'a Arc>, + // This function creates a filter of transaction results with Ok() for every pending + // transaction. The non-pending transactions are marked with TransactionError + fn prepare_filter_for_pending_transactions( + transactions: &[Transaction], + pending_tx_indexes: &[usize], + ) -> Vec> { + let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions.len()]; + pending_tx_indexes.iter().for_each(|x| mask[*x] = Ok(())); + mask + } + + // This function returns a vector containing index of all valid transactions. A valid + // transaction has result Ok() as the value + fn filter_valid_transaction_indexes( + valid_txs: &[transaction::Result<()>], + transaction_indexes: &[usize], + ) -> Vec { + let valid_transactions = valid_txs + .iter() + .enumerate() + .filter_map(|(index, x)| if x.is_ok() { Some(index) } else { None }) + .collect_vec(); + + valid_transactions + .iter() + .map(|x| transaction_indexes[*x]) + .collect() + } + + fn process_received_packets( + bank: &Arc, + poh: &Arc>, msgs: &Packets, - packet_indexes: Vec, + transaction_indexes: Vec, ) -> Result<(usize, usize, Vec)> { let packets = Packets::new( - packet_indexes + transaction_indexes .iter() .map(|x| msgs.packets[*x].to_owned()) .collect_vec(), @@ -546,15 +548,42 @@ impl BankingStage { let transactions = Self::deserialize_transactions(&packets); - Self::process_received_packets_using_closure( - bank, - poh, - transactions, - &packet_indexes, - |x: &'a Bank, y: &[Transaction], z: &'a Arc>| { - Self::process_transactions(x, y, z) - }, - ) + debug!( + "banking-stage-tx bank: {} transactions received {}", + bank.slot(), + transactions.len() + ); + + let (transactions, transaction_indexes) = + Self::filter_transaction_indexes(transactions, &transaction_indexes); + + debug!( + "bank: {} filtered transactions {}", + bank.slot(), + transactions.len() + ); + + let tx_len = transactions.len(); + + let (processed, unprocessed_tx_indexes) = + Self::process_transactions(bank, &transactions, poh)?; + + let filter = + Self::prepare_filter_for_pending_transactions(&transactions, &unprocessed_tx_indexes); + + let mut error_counters = ErrorCounters::default(); + let result = bank.check_transactions( + &transactions, + &filter, + MAX_RECENT_BLOCKHASHES / 2, + &mut error_counters, + ); + + Ok(( + processed, + tx_len, + Self::filter_valid_transaction_indexes(&result, &transaction_indexes), + )) } /// Process the incoming packets @@ -1041,118 +1070,158 @@ mod tests { } #[test] - fn test_bank_process_received_transactions() { + fn test_bank_filter_transaction_indexes() { let (genesis_block, mint_keypair) = create_genesis_block(10_000); - let bank = Arc::new(Bank::new(&genesis_block)); - let ledger_path = get_tmp_ledger_path!(); - { - let blocktree = - Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); - let (poh_recorder, _entry_receiver) = PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - bank.slot(), - None, - bank.ticks_per_slot(), - &Pubkey::default(), - &Arc::new(blocktree), - &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), - ); - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let pubkey = Pubkey::new_rand(); - let pubkey = Pubkey::new_rand(); + let transactions = vec![ + None, + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + None, + None, + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + None, + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + None, + Some(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_block.hash(), + 0, + )), + None, + None, + ]; - let transactions = vec![ - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_block.hash(), - 0, - )), - ]; + let filtered_transactions = vec![ + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + ]; - assert_eq!( - BankingStage::process_received_packets_using_closure( - &bank, - &poh_recorder, - transactions.clone(), - &vec![0, 1, 2, 3, 4, 5], - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( - y.len(), - vec![0, 1, 2, 3, 4, 5] - )), - ) - .ok(), - Some((6, 6, vec![0, 1, 2, 3, 4, 5])) - ); + assert_eq!( + BankingStage::filter_transaction_indexes( + transactions.clone(), + &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], + ), + (filtered_transactions.clone(), vec![1, 2, 3, 6, 8, 10]) + ); - assert_eq!( - BankingStage::process_received_packets_using_closure( - &bank, - &poh_recorder, - transactions.clone(), - &vec![0, 1, 2, 3, 4, 5], - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( - y.len() - 1, - vec![0, 1, 2, 4, 5] - )), - ) - .ok(), - Some((5, 6, vec![0, 1, 2, 4, 5])) - ); + assert_eq!( + BankingStage::filter_transaction_indexes( + transactions, + &vec![1, 2, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15], + ), + (filtered_transactions, vec![2, 4, 5, 9, 11, 13]) + ); + } - assert_eq!( - BankingStage::process_received_packets_using_closure( - &bank, - &poh_recorder, - transactions.clone(), - &vec![0, 1, 2, 3, 4, 5], - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( - y.len() - 3, - vec![2, 4, 5] - )), - ) - .ok(), - Some((3, 6, vec![2, 4, 5])) - ); - } - Blocktree::destroy(&ledger_path).unwrap(); + #[test] + fn test_bank_prepare_filter_for_pending_transaction() { + let (genesis_block, mint_keypair) = create_genesis_block(10_000); + let pubkey = Pubkey::new_rand(); + + let transactions = vec![ + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + ]; + + assert_eq!( + BankingStage::prepare_filter_for_pending_transactions(&transactions, &vec![2, 4, 5],), + vec![ + Err(TransactionError::BlockhashNotFound), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Ok(()) + ] + ); + + assert_eq!( + BankingStage::prepare_filter_for_pending_transactions(&transactions, &vec![0, 2, 3],), + vec![ + Ok(()), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Ok(()), + Err(TransactionError::BlockhashNotFound), + Err(TransactionError::BlockhashNotFound), + ] + ); + } + + #[test] + fn test_bank_filter_valid_transaction_indexes() { + assert_eq!( + BankingStage::filter_valid_transaction_indexes( + &vec![ + Err(TransactionError::BlockhashNotFound), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Ok(()) + ], + &vec![2, 4, 5, 9, 11, 13] + ), + vec![5, 11, 13] + ); + + assert_eq!( + BankingStage::filter_valid_transaction_indexes( + &vec![ + Ok(()), + Err(TransactionError::BlockhashNotFound), + Err(TransactionError::BlockhashNotFound), + Ok(()), + Ok(()), + Ok(()) + ], + &vec![1, 6, 7, 9, 31, 43] + ), + vec![1, 9, 31, 43] + ); } #[test] diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 42c91e76e8..76491555a8 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -558,11 +558,11 @@ impl Bank { fn check_refs( &self, txs: &[Transaction], - lock_results: &LockedAccountsResults, + lock_results: &[Result<()>], error_counters: &mut ErrorCounters, ) -> Vec> { txs.iter() - .zip(lock_results.locked_accounts_results()) + .zip(lock_results) .map(|(tx, lock_res)| { if lock_res.is_ok() && !tx.verify_refs() { error_counters.invalid_account_index += 1; @@ -625,6 +625,19 @@ impl Bank { }) .collect() } + + pub fn check_transactions( + &self, + txs: &[Transaction], + lock_results: &[Result<()>], + max_age: usize, + mut error_counters: &mut ErrorCounters, + ) -> Vec> { + let refs_results = self.check_refs(txs, lock_results, &mut error_counters); + let age_results = self.check_age(txs, refs_results, max_age, &mut error_counters); + self.check_signatures(txs, age_results, &mut error_counters) + } + #[allow(clippy::type_complexity)] pub fn load_and_execute_transactions( &self, @@ -638,9 +651,12 @@ impl Bank { debug!("processing transactions: {}", txs.len()); let mut error_counters = ErrorCounters::default(); let now = Instant::now(); - let refs_results = self.check_refs(txs, lock_results, &mut error_counters); - let age_results = self.check_age(txs, refs_results, max_age, &mut error_counters); - let sig_results = self.check_signatures(txs, age_results, &mut error_counters); + let sig_results = self.check_transactions( + txs, + lock_results.locked_accounts_results(), + max_age, + &mut error_counters, + ); let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters); let tick_height = self.tick_height(); diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 4a98a3c4a4..d823d3e644 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -1,5 +1,5 @@ mod accounts; -mod accounts_db; +pub mod accounts_db; mod accounts_index; pub mod append_vec; pub mod bank;