diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 341359c485..f6bbddc87c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -14,12 +14,13 @@ use crate::result::{Error, Result}; use crate::service::Service; use crate::sigverify_stage::VerifiedPackets; use bincode::deserialize; +use itertools::Itertools; use solana_metrics::counter::Counter; use solana_runtime::bank::Bank; use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{self, duration_as_us, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}; -use solana_sdk::transaction::{self, Transaction}; +use solana_sdk::transaction::{self, Transaction, TransactionError}; use std::cmp; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -30,7 +31,7 @@ use std::time::Duration; use std::time::Instant; use sys_info; -pub type UnprocessedPackets = Vec<(Packets, usize, Vec)>; // `usize` is the index of the first unprocessed packet in `SharedPackets` +pub type UnprocessedPackets = Vec<(Packets, Vec)>; // number of threads is 1 until mt bank is ready pub const NUM_THREADS: u32 = 10; @@ -115,15 +116,13 @@ impl BankingStage { fn forward_buffered_packets( socket: &std::net::UdpSocket, tpu_via_blobs: &std::net::SocketAddr, - unprocessed_packets: &[(Packets, usize, Vec)], + unprocessed_packets: &[(Packets, Vec)], ) -> std::io::Result<()> { - let locked_packets: Vec<_> = unprocessed_packets + let packets: Vec<&Packet> = unprocessed_packets .iter() - .map(|(p, start_index, _)| (p, start_index)) - .collect(); - let packets: Vec<&Packet> = locked_packets - .iter() - .flat_map(|(p, start_index)| &p.packets[**start_index..]) + .flat_map(|(p, unprocessed_indexes)| { + unprocessed_indexes.iter().map(move |x| &p.packets[*x]) + }) .collect(); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); let blobs = packet::packets_to_blobs(&packets); @@ -137,42 +136,43 @@ impl BankingStage { fn consume_buffered_packets( poh_recorder: &Arc>, - buffered_packets: &[(Packets, usize, Vec)], + buffered_packets: &[(Packets, Vec)], ) -> Result { let mut unprocessed_packets = vec![]; let mut bank_shutdown = false; let mut rebuffered_packets = 0; let mut new_tx_count = 0; - for (msgs, offset, vers) in buffered_packets { + for (msgs, unprocessed_indexes) in buffered_packets { if bank_shutdown { - rebuffered_packets += vers.len() - *offset; - unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned())); + rebuffered_packets += unprocessed_indexes.len(); + unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes.to_owned())); continue; } let bank = poh_recorder.lock().unwrap().bank(); if bank.is_none() { - rebuffered_packets += vers.len() - *offset; - unprocessed_packets.push((msgs.to_owned(), *offset, vers.to_owned())); + rebuffered_packets += unprocessed_indexes.len(); + unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes.to_owned())); continue; } let bank = bank.unwrap(); - let (processed, verified_txs_len, verified_indexes) = - Self::process_received_packets(&bank, &poh_recorder, &msgs, &vers, *offset)?; + let (processed, verified_txs_len, new_unprocessed_indexes) = + Self::process_received_packets( + &bank, + &poh_recorder, + &msgs, + unprocessed_indexes.to_owned(), + )?; new_tx_count += processed; if processed < verified_txs_len { - rebuffered_packets += verified_txs_len - processed; bank_shutdown = true; - // Collect any unprocessed transactions in this batch for forwarding - unprocessed_packets.push(( - msgs.to_owned(), - verified_indexes[processed], - vers.to_owned(), - )); } + rebuffered_packets += new_unprocessed_indexes.len(); + // Collect any unprocessed transactions in this batch for forwarding + unprocessed_packets.push((msgs.to_owned(), new_unprocessed_indexes)); } inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets); @@ -214,7 +214,7 @@ impl BankingStage { socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &Arc>, - buffered_packets: &[(Packets, usize, Vec)], + buffered_packets: &[(Packets, Vec)], enable_forwarding: bool, ) -> Result { let rcluster_info = cluster_info.read().unwrap(); @@ -301,7 +301,7 @@ impl BankingStage { } let num = unprocessed_packets .iter() - .map(|(x, start, _)| x.packets.len().saturating_sub(*start)) + .map(|(_, unprocessed)| unprocessed.len()) .sum(); inc_new_counter_info!("banking_stage-buffered_packets", num); buffered_packets.extend_from_slice(&unprocessed_packets); @@ -398,13 +398,25 @@ impl BankingStage { bank: &Bank, txs: &[Transaction], poh: &Arc>, - ) -> Result<()> { + chunk_offset: usize, + ) -> (Result<()>, Vec) { let now = Instant::now(); // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state let lock_results = bank.lock_accounts(txs); let lock_time = now.elapsed(); + let unprocessed_txs: Vec<_> = lock_results + .locked_accounts_results() + .iter() + .zip(chunk_offset..) + .filter_map(|(res, index)| match res { + Err(TransactionError::AccountInUse) => Some(index), + Ok(_) => None, + Err(_) => None, + }) + .collect(); + let results = Self::process_and_record_transactions_locked(bank, txs, poh, &lock_results); let now = Instant::now(); @@ -420,7 +432,7 @@ impl BankingStage { txs.len(), ); - results + (results, unprocessed_txs) } /// Sends transactions to the bank. @@ -431,8 +443,9 @@ impl BankingStage { bank: &Bank, transactions: &[Transaction], poh: &Arc>, - ) -> Result<(usize)> { + ) -> Result<(usize, Vec)> { let mut chunk_start = 0; + let mut unprocessed_txs = vec![]; while chunk_start != transactions.len() { let chunk_end = chunk_start + entry::num_will_fit( @@ -441,39 +454,43 @@ impl BankingStage { &Entry::serialized_to_blob_size, ); - let result = Self::process_and_record_transactions( + let (result, unprocessed_txs_in_chunk) = Self::process_and_record_transactions( bank, &transactions[chunk_start..chunk_end], poh, + chunk_start, ); trace!("process_transactions: {:?}", result); - chunk_start = chunk_end; + unprocessed_txs.extend_from_slice(&unprocessed_txs_in_chunk); if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result { info!( "process transactions: max height reached slot: {} height: {}", bank.slot(), bank.tick_height() ); + let range: Vec = (chunk_start..chunk_end).collect(); + unprocessed_txs.extend_from_slice(&range); + unprocessed_txs.sort_unstable(); + unprocessed_txs.dedup(); break; } result?; + chunk_start = chunk_end; } - Ok(chunk_start) + Ok((chunk_start, unprocessed_txs)) } fn process_received_packets_using_closure<'a, F>( bank: &'a Arc, poh: &'a Arc>, transactions: Vec>, - vers: &[u8], - offset: usize, + indexes: &[usize], f: F, ) -> Result<(usize, usize, Vec)> where - F: Fn(&'a Bank, &[Transaction], &'a Arc>) -> Result<(usize)>, + F: Fn(&'a Bank, &[Transaction], &'a Arc>) -> Result<(usize, Vec)>, { debug!("banking-stage-tx bank {}", bank.slot()); - let vers = vers[offset..].to_owned(); debug!( "bank: {} transactions received {}", @@ -482,17 +499,10 @@ impl BankingStage { ); let (verified_transactions, verified_indexes): (Vec<_>, Vec<_>) = transactions .into_iter() - .zip(vers) - .zip(offset..) - .filter_map(|((tx, ver), index)| match tx { + .zip(indexes) + .filter_map(|(tx, index)| match tx { None => None, - Some(tx) => { - if ver != 0 { - Some((tx, index)) - } else { - None - } - } + Some(tx) => Some((tx, index)), }) .unzip(); @@ -504,27 +514,36 @@ impl BankingStage { let tx_len = verified_transactions.len(); - let processed = f(bank, &verified_transactions, poh)?; + let (processed, unprocessed_txs) = f(bank, &verified_transactions, poh)?; - Ok((processed, tx_len, verified_indexes)) + let unprocessed_indexes: Vec<_> = unprocessed_txs + .iter() + .map(|x| verified_indexes[*x]) + .collect(); + + Ok((processed, tx_len, unprocessed_indexes)) } fn process_received_packets<'a>( bank: &'a Arc, poh: &'a Arc>, msgs: &Packets, - vers: &[u8], - offset: usize, + packet_indexes: Vec, ) -> Result<(usize, usize, Vec)> { - let transactions = - Self::deserialize_transactions(&Packets::new(msgs.packets[offset..].to_owned())); + let packets = Packets::new( + packet_indexes + .iter() + .map(|x| msgs.packets[*x].to_owned()) + .collect_vec(), + ); + + let transactions = Self::deserialize_transactions(&packets); Self::process_received_packets_using_closure( bank, poh, transactions, - vers, - offset, + &packet_indexes, |x: &'a Bank, y: &[Transaction], z: &'a Arc>| { Self::process_transactions(x, y, z) }, @@ -558,26 +577,32 @@ impl BankingStage { let mut unprocessed_packets = vec![]; let mut bank_shutdown = false; for (msgs, vers) in mms { + let packet_indexes: Vec = vers + .iter() + .enumerate() + .filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None }) + .collect(); if bank_shutdown { - unprocessed_packets.push((msgs, 0, vers)); + unprocessed_packets.push((msgs, packet_indexes)); continue; } let bank = poh.lock().unwrap().bank(); if bank.is_none() { - unprocessed_packets.push((msgs, 0, vers)); + unprocessed_packets.push((msgs, packet_indexes)); continue; } let bank = bank.unwrap(); - let (processed, verified_txs_len, verified_indexes) = - Self::process_received_packets(&bank, &poh, &msgs, &vers, 0)?; + let (processed, verified_txs_len, unprocessed_indexes) = + Self::process_received_packets(&bank, &poh, &msgs, packet_indexes)?; if processed < verified_txs_len { bank_shutdown = true; - // Collect any unprocessed transactions in this batch for forwarding - unprocessed_packets.push((msgs, verified_indexes[processed], vers)); } + // Collect any unprocessed transactions in this batch for forwarding + unprocessed_packets.push((msgs, unprocessed_indexes)); + new_tx_count += processed; } @@ -1060,9 +1085,11 @@ mod tests { &bank, &poh_recorder, transactions.clone(), - &vec![1, 1, 1, 1, 1, 1], - 0, - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(y.len()), + &vec![0, 1, 2, 3, 4, 5], + |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( + y.len(), + vec![0, 1, 2, 3, 4, 5] + )), ) .ok(), Some((6, 6, vec![0, 1, 2, 3, 4, 5])) @@ -1073,12 +1100,14 @@ mod tests { &bank, &poh_recorder, transactions.clone(), - &vec![1, 1, 1, 0, 1, 1], - 0, - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(y.len()), + &vec![0, 1, 2, 3, 4, 5], + |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( + y.len() - 1, + vec![0, 1, 2, 4, 5] + )), ) .ok(), - Some((5, 5, vec![0, 1, 2, 4, 5])) + Some((5, 6, vec![0, 1, 2, 4, 5])) ); assert_eq!( @@ -1086,12 +1115,14 @@ mod tests { &bank, &poh_recorder, transactions.clone(), - &vec![1, 1, 1, 0, 1, 1], - 2, - |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(y.len()), + &vec![0, 1, 2, 3, 4, 5], + |_x: &Bank, y: &[Transaction], _z: &Arc>| Ok(( + y.len() - 3, + vec![2, 4, 5] + )), ) .ok(), - Some((3, 3, vec![2, 4, 5])) + Some((3, 6, vec![2, 4, 5])) ); } Blocktree::destroy(&ledger_path).unwrap(); @@ -1175,7 +1206,8 @@ mod tests { poh_recorder.lock().unwrap().set_working_bank(working_bank); - BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder) + BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder, 0) + .0 .unwrap(); poh_recorder.lock().unwrap().tick(); @@ -1207,7 +1239,13 @@ mod tests { )]; assert_matches!( - BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder), + BankingStage::process_and_record_transactions( + &bank, + &transactions, + &poh_recorder, + 0 + ) + .0, Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) ); @@ -1215,4 +1253,53 @@ mod tests { } Blocktree::destroy(&ledger_path).unwrap(); } + + #[test] + fn test_bank_process_and_record_transactions_account_in_use() { + solana_logger::setup(); + let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); + let bank = Arc::new(Bank::new(&genesis_block)); + let pubkey = Pubkey::new_rand(); + let pubkey1 = Pubkey::new_rand(); + + let transactions = vec![ + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), + system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_block.hash(), 0), + ]; + + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: bank.tick_height(), + max_tick_height: bank.tick_height() + 1, + }; + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (poh_recorder, _entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + Some(4), + bank.ticks_per_slot(), + &pubkey, + &Arc::new(blocktree), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + ); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + + poh_recorder.lock().unwrap().set_working_bank(working_bank); + + let (result, unprocessed) = BankingStage::process_and_record_transactions( + &bank, + &transactions, + &poh_recorder, + 0, + ); + + assert!(result.is_ok()); + assert_eq!(unprocessed.len(), 1); + } + Blocktree::destroy(&ledger_path).unwrap(); + } }