diff --git a/Cargo.lock b/Cargo.lock index 5a202408dc..ed0f28791e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4233,6 +4233,7 @@ dependencies = [ "ahash 0.6.1", "base64 0.12.3", "bincode", + "blake3", "bs58", "bv", "byteorder", @@ -4900,6 +4901,7 @@ version = "1.7.0" dependencies = [ "assert_matches", "bincode", + "blake3", "borsh", "borsh-derive", "bs58", @@ -5003,6 +5005,7 @@ dependencies = [ name = "solana-runtime" version = "1.7.0" dependencies = [ + "arrayref", "assert_matches", "bincode", "blake3", diff --git a/core/Cargo.toml b/core/Cargo.toml index 619080233e..3b761977ec 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -17,6 +17,7 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git ahash = "0.6.1" base64 = "0.12.3" bincode = "1.3.1" +blake3 = "0.3.6" bv = { version = "0.11.1", features = ["serde"] } bs58 = "0.3.1" byteorder = "1.3.4" diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index f4f122e97d..1bfc27aef9 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -7,7 +7,7 @@ use crossbeam_channel::unbounded; use log::*; use rand::{thread_rng, Rng}; use rayon::prelude::*; -use solana_core::banking_stage::{create_test_recorder, BankingStage}; +use solana_core::banking_stage::{create_test_recorder, BankingStage, BankingStageStats}; use solana_core::cluster_info::ClusterInfo; use solana_core::cluster_info::Node; use solana_core::poh_recorder::WorkingBankEntry; @@ -89,7 +89,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { None, &s, None::>, - None, + &BankingStageStats::default(), &recorder, ); }); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 018998f888..347ac5569b 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -29,6 +29,7 @@ use solana_runtime::{ TransactionExecutionResult, }, bank_utils, + hashed_transaction::HashedTransaction, transaction_batch::TransactionBatch, vote_sender_types::ReplayVoteSender, }; @@ -37,8 +38,11 @@ use solana_sdk::{ Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY, MAX_TRANSACTION_FORWARDING_DELAY_GPU, }, + message::Message, poh_config::PohConfig, pubkey::Pubkey, + short_vec::decode_shortu16_len, + signature::Signature, timing::{duration_as_ms, timestamp}, transaction::{self, Transaction, TransactionError}, }; @@ -46,9 +50,11 @@ use solana_transaction_status::token_balances::{ collect_token_balances, TransactionTokenBalancesSet, }; use std::{ + borrow::Cow, cmp, collections::{HashMap, VecDeque}, env, + mem::size_of, net::UdpSocket, ops::DerefMut, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, @@ -89,6 +95,15 @@ pub struct BankingStageStats { current_buffered_packets_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, + + // Timing + consume_buffered_packets_elapsed: AtomicU64, + process_packets_elapsed: AtomicU64, + handle_retryable_packets_elapsed: AtomicU64, + filter_pending_packets_elapsed: AtomicU64, + packet_duplicate_check_elapsed: AtomicU64, + packet_conversion_elapsed: AtomicU64, + transaction_processing_elapsed: AtomicU64, } impl BankingStageStats { @@ -147,6 +162,46 @@ impl BankingStageStats { self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "consume_buffered_packets_elapsed", + self.consume_buffered_packets_elapsed + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "process_packets_elapsed", + self.process_packets_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "handle_retryable_packets_elapsed", + self.handle_retryable_packets_elapsed + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "filter_pending_packets_elapsed", + self.filter_pending_packets_elapsed + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "packet_duplicate_check_elapsed", + self.packet_duplicate_check_elapsed + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "packet_conversion_elapsed", + self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "transaction_processing_elapsed", + self.transaction_processing_elapsed + .swap(0, Ordering::Relaxed) as i64, + i64 + ), ); } } @@ -291,7 +346,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, test_fn: Option, - banking_stage_stats: Option<&BankingStageStats>, + banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, ) { let mut rebuffered_packets_len = 0; @@ -299,6 +354,7 @@ impl BankingStage { let buffered_len = buffered_packets.len(); let mut proc_start = Measure::start("consume_buffered_process"); let mut reached_end_of_slot = None; + buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes, _forwarded)| { if let Some((next_leader, bank)) = &reached_end_of_slot { // We've hit the end of this slot, no need to perform more processing, @@ -326,6 +382,7 @@ impl BankingStage { original_unprocessed_indexes.to_owned(), transaction_status_sender.clone(), gossip_vote_sender, + banking_stage_stats, ); if processed < verified_txs_len || !Bank::should_bank_still_be_processing_txs( @@ -372,14 +429,15 @@ impl BankingStage { (new_tx_count as f32) / (proc_start.as_s()) ); - if let Some(stats) = banking_stage_stats { - stats - .rebuffered_packets_count - .fetch_add(rebuffered_packets_len, Ordering::Relaxed); - stats - .consumed_buffered_packets_count - .fetch_add(new_tx_count, Ordering::Relaxed); - } + banking_stage_stats + .consume_buffered_packets_elapsed + .fetch_add(proc_start.as_us(), Ordering::Relaxed); + banking_stage_stats + .rebuffered_packets_count + .fetch_add(rebuffered_packets_len, Ordering::Relaxed); + banking_stage_stats + .consumed_buffered_packets_count + .fetch_add(new_tx_count, Ordering::Relaxed); } fn consume_or_forward_packets( @@ -465,7 +523,7 @@ impl BankingStage { transaction_status_sender, gossip_vote_sender, None::>, - Some(banking_stage_stats), + banking_stage_stats, recorder, ); } @@ -616,27 +674,19 @@ impl BankingStage { ) } - /// Convert the transactions from a blob of binary data to a vector of transactions - fn deserialize_transactions(p: &Packets) -> Vec> { - p.packets - .iter() - .map(|x| limited_deserialize(&x.data[0..x.meta.size]).ok()) - .collect() - } - #[allow(clippy::match_wild_err_arm)] - fn record_transactions( + fn record_transactions<'a>( bank_slot: Slot, - txs: &[Transaction], + txs: impl Iterator, results: &[TransactionExecutionResult], recorder: &TransactionRecorder, ) -> (Result, Vec) { let mut processed_generation = Measure::start("record::process_generation"); let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results .iter() - .zip(txs.iter()) + .zip(txs) .enumerate() - .filter_map(|(i, ((r, _h), x))| { + .filter_map(|(i, ((r, _n), x))| { if Bank::can_commit(r) { Some((x.clone(), i)) } else { @@ -694,7 +744,6 @@ impl BankingStage { // the likelihood of any single thread getting starved and processing old ids. // TODO: Banking stage threads should be prioritized to complete faster then this queue // expires. - let txs = batch.transactions(); let pre_balances = if transaction_status_sender.is_some() { bank.collect_balances(batch) } else { @@ -732,7 +781,7 @@ impl BankingStage { let mut record_time = Measure::start("record_time"); let (num_to_commit, retryable_record_txs) = - Self::record_transactions(bank.slot(), txs, &results, poh); + Self::record_transactions(bank.slot(), batch.transactions_iter(), &results, poh); inc_new_counter_info!( "banking_stage-record_transactions_num_to_commit", *num_to_commit.as_ref().unwrap_or(&0) @@ -748,12 +797,11 @@ impl BankingStage { record_time.stop(); let mut commit_time = Measure::start("commit_time"); - + let hashed_txs = batch.hashed_transactions(); let num_to_commit = num_to_commit.unwrap(); - if num_to_commit != 0 { let tx_results = bank.commit_transactions( - txs, + hashed_txs, &mut loaded_accounts, &results, tx_count, @@ -761,13 +809,14 @@ impl BankingStage { &mut execute_timings, ); - bank_utils::find_and_send_votes(txs, &tx_results, Some(gossip_vote_sender)); + bank_utils::find_and_send_votes(hashed_txs, &tx_results, Some(gossip_vote_sender)); if let Some(transaction_status_sender) = transaction_status_sender { + let txs = batch.transactions_iter().cloned().collect(); let post_balances = bank.collect_balances(batch); let post_token_balances = collect_token_balances(&bank, &batch, &mut mint_decimals); transaction_status_sender.send_transaction_status_batch( bank.clone(), - batch.transactions(), + txs, tx_results.execution_results, TransactionBalancesSet::new(pre_balances, post_balances), TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances), @@ -786,7 +835,7 @@ impl BankingStage { load_execute_time.as_us(), record_time.as_us(), commit_time.as_us(), - txs.len(), + hashed_txs.len(), ); debug!( @@ -799,7 +848,7 @@ impl BankingStage { pub fn process_and_record_transactions( bank: &Arc, - txs: &[Transaction], + txs: &[HashedTransaction], poh: &TransactionRecorder, chunk_offset: usize, transaction_status_sender: Option, @@ -808,7 +857,7 @@ impl BankingStage { let mut lock_time = Measure::start("lock_time"); // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state - let batch = bank.prepare_batch(txs); + let batch = bank.prepare_hashed_batch(txs); lock_time.stop(); let (result, mut retryable_txs) = Self::process_and_record_transactions_locked( @@ -843,7 +892,7 @@ impl BankingStage { fn process_transactions( bank: &Arc, bank_creation_time: &Instant, - transactions: &[Transaction], + transactions: &[HashedTransaction], poh: &TransactionRecorder, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, @@ -896,26 +945,13 @@ impl BankingStage { (chunk_start, unprocessed_txs) } - // This function returns a vector of transactions that are not None. It also returns a vector - // with position of the transaction in the input list - fn filter_transaction_indexes( - transactions: Vec>, - indexes: &[usize], - ) -> (Vec, Vec) { - transactions - .into_iter() - .zip(indexes) - .filter_map(|(tx, index)| tx.map(|tx| (tx, index))) - .unzip() - } - // This function creates a filter of transaction results with Ok() for every pending // transaction. The non-pending transactions are marked with TransactionError fn prepare_filter_for_pending_transactions( - transactions: &[Transaction], + transactions_len: usize, pending_tx_indexes: &[usize], ) -> Vec> { - let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions.len()]; + let mut mask = vec![Err(TransactionError::BlockhashNotFound); transactions_len]; pending_tx_indexes.iter().for_each(|x| mask[*x] = Ok(())); mask } @@ -938,40 +974,40 @@ impl BankingStage { .collect() } - // This function deserializes packets into transactions and returns non-None transactions + /// Read the transaction message from packet data + fn packet_message(packet: &Packet) -> Option<&[u8]> { + let (sig_len, sig_size) = decode_shortu16_len(&packet.data).ok()?; + let msg_start = sig_len + .checked_mul(size_of::()) + .and_then(|v| v.checked_add(sig_size))?; + let msg_end = packet.meta.size; + Some(&packet.data[msg_start..msg_end]) + } + + // This function deserializes packets into transactions, computes the blake3 hash of transaction messages, + // and verifies secp256k1 instructions. A list of valid transactions are returned with their message hashes + // and packet indexes. fn transactions_from_packets( msgs: &Packets, transaction_indexes: &[usize], secp256k1_program_enabled: bool, - ) -> (Vec, Vec) { - let packets = Packets::new( - transaction_indexes - .iter() - .map(|x| msgs.packets[*x].to_owned()) - .collect_vec(), - ); - - let transactions = Self::deserialize_transactions(&packets); - let maybe_secp_verified_transactions: Vec<_> = if secp256k1_program_enabled { - transactions - .into_iter() - .map(|tx| { - if let Some(tx) = tx { - if tx.verify_precompiles().is_ok() { - Some(tx) - } else { - None - } - } else { - None - } - }) - .collect() - } else { - transactions - }; - - Self::filter_transaction_indexes(maybe_secp_verified_transactions, &transaction_indexes) + ) -> (Vec>, Vec) { + transaction_indexes + .iter() + .filter_map(|tx_index| { + let p = &msgs.packets[*tx_index]; + let tx: Transaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?; + if secp256k1_program_enabled { + tx.verify_precompiles().ok()?; + } + let message_bytes = Self::packet_message(p)?; + let message_hash = Message::hash_raw_message(message_bytes); + Some(( + HashedTransaction::new(Cow::Owned(tx), message_hash), + tx_index, + )) + }) + .unzip() } /// This function filters pending packets that are still valid @@ -981,11 +1017,12 @@ impl BankingStage { /// * `pending_indexes` - identifies which indexes in the `transactions` list are still pending fn filter_pending_packets_from_pending_txs( bank: &Arc, - transactions: &[Transaction], + transactions: &[HashedTransaction], transaction_to_packet_indexes: &[usize], pending_indexes: &[usize], ) -> Vec { - let filter = Self::prepare_filter_for_pending_transactions(transactions, pending_indexes); + let filter = + Self::prepare_filter_for_pending_transactions(transactions.len(), pending_indexes); let mut error_counters = ErrorCounters::default(); // The following code also checks if the blockhash for a transaction is too old @@ -999,7 +1036,8 @@ impl BankingStage { } else { MAX_TRANSACTION_FORWARDING_DELAY_GPU }; - let result = bank.check_transactions( + + let results = bank.check_transactions( transactions, &filter, (MAX_PROCESSING_AGE) @@ -1008,7 +1046,7 @@ impl BankingStage { &mut error_counters, ); - Self::filter_valid_transaction_indexes(&result, transaction_to_packet_indexes) + Self::filter_valid_transaction_indexes(&results, transaction_to_packet_indexes) } fn process_packets_transactions( @@ -1019,12 +1057,16 @@ impl BankingStage { packet_indexes: Vec, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, + banking_stage_stats: &BankingStageStats, ) -> (usize, usize, Vec) { + let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( msgs, &packet_indexes, bank.secp256k1_program_enabled(), ); + packet_conversion_time.stop(); + debug!( "bank: {} filtered transactions {}", bank.slot(), @@ -1033,6 +1075,7 @@ impl BankingStage { let tx_len = transactions.len(); + let mut process_tx_time = Measure::start("process_tx_time"); let (processed, unprocessed_tx_indexes) = Self::process_transactions( bank, bank_creation_time, @@ -1041,20 +1084,34 @@ impl BankingStage { transaction_status_sender, gossip_vote_sender, ); + process_tx_time.stop(); let unprocessed_tx_count = unprocessed_tx_indexes.len(); + let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( bank, &transactions, &transaction_to_packet_indexes, &unprocessed_tx_indexes, ); + filter_pending_packets_time.stop(); + inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) ); + banking_stage_stats + .packet_conversion_elapsed + .fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed); + banking_stage_stats + .transaction_processing_elapsed + .fetch_add(process_tx_time.as_us(), Ordering::Relaxed); + banking_stage_stats + .filter_pending_packets_elapsed + .fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed); + (processed, tx_len, filtered_unprocessed_packet_indexes) } @@ -1162,6 +1219,7 @@ impl BankingStage { &mut newly_buffered_packets_count, batch_limit, duplicates, + banking_stage_stats, ); continue; } @@ -1176,6 +1234,7 @@ impl BankingStage { packet_indexes, transaction_status_sender.clone(), gossip_vote_sender, + banking_stage_stats, ); new_tx_count += processed; @@ -1189,10 +1248,12 @@ impl BankingStage { &mut newly_buffered_packets_count, batch_limit, duplicates, + banking_stage_stats, ); // If there were retryable transactions, add the unexpired ones to the buffered queue if processed < verified_txs_len { + let mut handle_retryable_packets_time = Measure::start("handle_retryable_packets"); let next_leader = poh.lock().unwrap().next_slot_leader(); // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones #[allow(clippy::while_let_on_iterator)] @@ -1213,14 +1274,17 @@ impl BankingStage { &mut newly_buffered_packets_count, batch_limit, duplicates, + banking_stage_stats, ); } + handle_retryable_packets_time.stop(); + banking_stage_stats + .handle_retryable_packets_elapsed + .fetch_add(handle_retryable_packets_time.as_us(), Ordering::Relaxed); } } - proc_start.stop(); - inc_new_counter_debug!("banking_stage-time_ms", proc_start.as_ms() as usize); debug!( "@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}", timestamp(), @@ -1231,6 +1295,9 @@ impl BankingStage { count, id, ); + banking_stage_stats + .process_packets_elapsed + .fetch_add(proc_start.as_us(), Ordering::Relaxed); banking_stage_stats .process_packets_count .fetch_add(count, Ordering::Relaxed); @@ -1258,8 +1325,10 @@ impl BankingStage { newly_buffered_packets_count: &mut usize, batch_limit: usize, duplicates: &Arc, PacketHasher)>>, + banking_stage_stats: &BankingStageStats, ) { { + let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check"); let mut duplicates = duplicates.lock().unwrap(); let (cache, hasher) = duplicates.deref_mut(); packet_indexes.retain(|i| { @@ -1272,6 +1341,10 @@ impl BankingStage { } } }); + packet_duplicate_check_time.stop(); + banking_stage_stats + .packet_duplicate_check_elapsed + .fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed); } if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packets.len() >= batch_limit { @@ -1351,6 +1424,7 @@ mod tests { }; use solana_perf::packet::to_packets_chunked; use solana_sdk::{ + hash::Hash, instruction::InstructionError, signature::{Keypair, Signer}, system_instruction::SystemError, @@ -1738,8 +1812,12 @@ mod tests { ]; let mut results = vec![(Ok(()), None), (Ok(()), None)]; - let _ = - BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder); + let _ = BankingStage::record_transactions( + bank.slot(), + transactions.iter(), + &results, + &recorder, + ); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); assert_eq!(entry.transactions.len(), transactions.len()); @@ -1751,8 +1829,12 @@ mod tests { )), None, ); - let (res, retryable) = - BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder); + let (res, retryable) = BankingStage::record_transactions( + bank.slot(), + transactions.iter(), + &results, + &recorder, + ); res.unwrap(); assert!(retryable.is_empty()); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); @@ -1760,8 +1842,12 @@ mod tests { // Other TransactionErrors should not be recorded results[0] = (Err(TransactionError::AccountNotFound), None); - let (res, retryable) = - BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder); + let (res, retryable) = BankingStage::record_transactions( + bank.slot(), + transactions.iter(), + &results, + &recorder, + ); res.unwrap(); assert!(retryable.is_empty()); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); @@ -1772,7 +1858,7 @@ mod tests { // txs let (res, retryable) = BankingStage::record_transactions( bank.slot() + 1, - &transactions, + transactions.iter(), &results, &recorder, ); @@ -1789,107 +1875,10 @@ mod tests { Blockstore::destroy(&ledger_path).unwrap(); } - #[test] - fn test_bank_filter_transaction_indexes() { - let GenesisConfigInfo { - genesis_config, - mint_keypair, - .. - } = create_genesis_config(10_000); - let pubkey = solana_sdk::pubkey::new_rand(); - - let transactions = vec![ - None, - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_config.hash(), - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_config.hash(), - )), - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_config.hash(), - )), - None, - None, - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_config.hash(), - )), - None, - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_config.hash(), - )), - None, - Some(system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_config.hash(), - )), - None, - None, - ]; - - let filtered_transactions = vec![ - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - ]; - - assert_eq!( - BankingStage::filter_transaction_indexes( - transactions.clone(), - &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], - ), - (filtered_transactions.clone(), vec![1, 2, 3, 6, 8, 10]) - ); - - assert_eq!( - BankingStage::filter_transaction_indexes( - transactions, - &[1, 2, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15], - ), - (filtered_transactions, vec![2, 4, 5, 9, 11, 13]) - ); - } - #[test] fn test_bank_prepare_filter_for_pending_transaction() { - let GenesisConfigInfo { - genesis_config, - mint_keypair, - .. - } = create_genesis_config(10_000); - let pubkey = solana_sdk::pubkey::new_rand(); - - let transactions = vec![ - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - ]; - assert_eq!( - BankingStage::prepare_filter_for_pending_transactions(&transactions, &[2, 4, 5],), + BankingStage::prepare_filter_for_pending_transactions(6, &[2, 4, 5]), vec![ Err(TransactionError::BlockhashNotFound), Err(TransactionError::BlockhashNotFound), @@ -1901,7 +1890,7 @@ mod tests { ); assert_eq!( - BankingStage::prepare_filter_for_pending_transactions(&transactions, &[0, 2, 3],), + BankingStage::prepare_filter_for_pending_transactions(6, &[0, 2, 3]), vec![ Ok(()), Err(TransactionError::BlockhashNotFound), @@ -2045,12 +2034,11 @@ mod tests { let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config)); let pubkey = solana_sdk::pubkey::new_rand(); - let transactions = vec![system_transaction::transfer( - &mint_keypair, - &pubkey, - 1, - genesis_config.hash(), - )]; + let transactions = + vec![ + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()) + .into(), + ]; let start = Arc::new(Instant::now()); let working_bank = WorkingBank { @@ -2116,7 +2104,8 @@ mod tests { &pubkey, 2, genesis_config.hash(), - )]; + ) + .into()]; assert_matches!( BankingStage::process_and_record_transactions( @@ -2174,8 +2163,8 @@ mod tests { let pubkey1 = solana_sdk::pubkey::new_rand(); let transactions = vec![ - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()), - system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()), + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()).into(), + system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()).into(), ]; let start = Arc::new(Instant::now()); @@ -2282,8 +2271,8 @@ mod tests { let transactions = vec![ - system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash(),); - 3 + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()) + .into(), ]; let ledger_path = get_tmp_ledger_path!(); @@ -2360,7 +2349,7 @@ mod tests { let entry_3 = next_entry(&entry_2.hash, 1, vec![fail_tx.clone()]); let entries = vec![entry_1, entry_2, entry_3]; - let transactions = vec![success_tx, ix_error_tx, fail_tx]; + let transactions = vec![success_tx.into(), ix_error_tx.into(), fail_tx.into()]; bank.transfer(4, &mint_keypair, &keypair1.pubkey()).unwrap(); let start = Arc::new(Instant::now()); @@ -2538,7 +2527,7 @@ mod tests { None, &gossip_vote_sender, None::>, - None, + &BankingStageStats::default(), &recorder, ); assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions); @@ -2554,7 +2543,7 @@ mod tests { None, &gossip_vote_sender, None::>, - None, + &BankingStageStats::default(), &recorder, ); if num_expected_unprocessed == 0 { @@ -2615,7 +2604,7 @@ mod tests { None, &gossip_vote_sender, test_fn, - None, + &BankingStageStats::default(), &recorder, ); @@ -2677,6 +2666,7 @@ mod tests { ))); let mut dropped_batches_count = 0; let mut newly_buffered_packets_count = 0; + let banking_stage_stats = BankingStageStats::default(); // Because the set of unprocessed `packet_indexes` is empty, the // packets are not added to the unprocessed queue BankingStage::push_unprocessed( @@ -2687,6 +2677,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &duplicates, + &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 1); assert_eq!(dropped_batches_count, 0); @@ -2703,6 +2694,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &duplicates, + &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(dropped_batches_count, 0); @@ -2711,7 +2703,7 @@ mod tests { // Because we've reached the batch limit, old unprocessed packets are // dropped and the new one is appended to the end let new_packets = Packets::new(vec![Packet::from_data( - &SocketAddr::from(([127, 0, 0, 1], 8001)), + Some(&SocketAddr::from(([127, 0, 0, 1], 8001))), 42, ) .unwrap()]); @@ -2724,6 +2716,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &duplicates, + &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); @@ -2739,10 +2732,24 @@ mod tests { &mut newly_buffered_packets_count, 3, &duplicates, + &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); assert_eq!(dropped_batches_count, 1); assert_eq!(newly_buffered_packets_count, 2); } + + #[test] + fn test_packet_message() { + let keypair = Keypair::new(); + let pubkey = solana_sdk::pubkey::new_rand(); + let blockhash = Hash::new_unique(); + let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash); + let packet = Packet::from_data(None, &transaction).unwrap(); + assert_eq!( + BankingStage::packet_message(&packet).unwrap().to_vec(), + transaction.message_data() + ); + } } diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a0b98c31eb..c13115f55b 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2150,7 +2150,7 @@ impl ClusterInfo { let (check, ping) = ping_cache.check(now, node, &mut pingf); if let Some(ping) = ping { let ping = Protocol::PingMessage(ping); - match Packet::from_data(&node.1, ping) { + match Packet::from_data(Some(&node.1), ping) { Ok(packet) => packets.packets.push(packet), Err(err) => error!("failed to write ping packet: {:?}", err), }; @@ -2264,7 +2264,7 @@ impl ClusterInfo { let from_addr = pull_responses[stat.to].1; let response = pull_responses[stat.to].0[stat.responses_index].clone(); let protocol = Protocol::PullResponse(self_id, vec![response]); - match Packet::from_data(&from_addr, protocol) { + match Packet::from_data(Some(&from_addr), protocol) { Err(err) => error!("failed to write pull-response packet: {:?}", err), Ok(packet) => { if self.outbound_budget.take(packet.meta.size) { @@ -2466,7 +2466,7 @@ impl ClusterInfo { .filter_map(|(addr, ping)| { let pong = Pong::new(&ping, &self.keypair).ok()?; let pong = Protocol::PongMessage(pong); - match Packet::from_data(&addr, pong) { + match Packet::from_data(Some(&addr), pong) { Ok(packet) => Some(packet), Err(err) => { error!("failed to write pong packet: {:?}", err); @@ -2618,7 +2618,7 @@ impl ClusterInfo { inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len()); for (address, request) in new_push_requests { if ContactInfo::is_valid_address(&address) { - match Packet::from_data(&address, &request) { + match Packet::from_data(Some(&address), &request) { Ok(packet) => packets.packets.push(packet), Err(err) => error!("failed to write push-request packet: {:?}", err), } @@ -3710,7 +3710,7 @@ mod tests { CrdsValue::new_signed(CrdsData::SnapshotHashes(snapshot_hash), &Keypair::new()); let message = Protocol::PushMessage(Pubkey::new_unique(), vec![crds_value]); let socket = new_rand_socket_addr(&mut rng); - assert!(Packet::from_data(&socket, message).is_ok()); + assert!(Packet::from_data(Some(&socket), message).is_ok()); } } @@ -3723,7 +3723,7 @@ mod tests { CrdsValue::new_signed(CrdsData::AccountsHashes(snapshot_hash), &Keypair::new()); let response = Protocol::PullResponse(Pubkey::new_unique(), vec![crds_value]); let socket = new_rand_socket_addr(&mut rng); - assert!(Packet::from_data(&socket, response).is_ok()); + assert!(Packet::from_data(Some(&socket), response).is_ok()); } } @@ -3736,7 +3736,7 @@ mod tests { PruneData::new_rand(&mut rng, &self_keypair, Some(MAX_PRUNE_DATA_NODES)); let prune_message = Protocol::PruneMessage(self_keypair.pubkey(), prune_data); let socket = new_rand_socket_addr(&mut rng); - assert!(Packet::from_data(&socket, prune_message).is_ok()); + assert!(Packet::from_data(Some(&socket), prune_message).is_ok()); } // Assert that MAX_PRUNE_DATA_NODES is highest possible. let self_keypair = Keypair::new(); @@ -3744,7 +3744,7 @@ mod tests { PruneData::new_rand(&mut rng, &self_keypair, Some(MAX_PRUNE_DATA_NODES + 1)); let prune_message = Protocol::PruneMessage(self_keypair.pubkey(), prune_data); let socket = new_rand_socket_addr(&mut rng); - assert!(Packet::from_data(&socket, prune_message).is_err()); + assert!(Packet::from_data(Some(&socket), prune_message).is_err()); } #[test] @@ -4216,7 +4216,7 @@ mod tests { let message = Protocol::PushMessage(self_pubkey, values); assert_eq!(serialized_size(&message).unwrap(), size); // Assert that the message fits into a packet. - assert!(Packet::from_data(&socket, message).is_ok()); + assert!(Packet::from_data(Some(&socket), message).is_ok()); } } diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs index 98c1aa3e13..5eddcc8df7 100644 --- a/core/src/progress_map.rs +++ b/core/src/progress_map.rs @@ -59,6 +59,7 @@ impl ReplaySlotStats { ), ("total_entries", num_entries as i64, i64), ("total_shreds", num_shreds as i64, i64), + ("check_us", self.execute_timings.check_us, i64), ("load_us", self.execute_timings.load_us, i64), ("execute_us", self.execute_timings.execute_us, i64), ("store_us", self.execute_timings.store_us, i64), diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index ab2653b64e..7355440975 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -3,7 +3,7 @@ use crate::{ blockstore::Blockstore, blockstore_db::BlockstoreError, blockstore_meta::SlotMeta, - entry::{create_ticks, Entry, EntrySlice, EntryVerificationStatus, VerifyRecyclers}, + entry::{create_ticks, Entry, EntrySlice, EntryType, EntryVerificationStatus, VerifyRecyclers}, leader_schedule_cache::LeaderScheduleCache, }; use chrono_humanize::{Accuracy, HumanTime, Tense}; @@ -34,6 +34,7 @@ use solana_sdk::{ hash::Hash, pubkey::Pubkey, signature::{Keypair, Signature}, + timing, transaction::{Result, Transaction, TransactionError}, }; use solana_transaction_status::token_balances::{ @@ -75,7 +76,7 @@ fn get_first_error( fee_collection_results: Vec>, ) -> Option<(Result<()>, Signature)> { let mut first_err = None; - for (result, transaction) in fee_collection_results.iter().zip(batch.transactions()) { + for (result, transaction) in fee_collection_results.iter().zip(batch.transactions_iter()) { if let Err(ref err) = result { if first_err.is_none() { first_err = Some((result.clone(), transaction.signatures[0])); @@ -124,7 +125,7 @@ fn execute_batch( timings, ); - bank_utils::find_and_send_votes(batch.transactions(), &tx_results, replay_vote_sender); + bank_utils::find_and_send_votes(batch.hashed_transactions(), &tx_results, replay_vote_sender); let TransactionResults { fee_collection_results, @@ -133,6 +134,7 @@ fn execute_batch( } = tx_results; if let Some(transaction_status_sender) = transaction_status_sender { + let txs = batch.transactions_iter().cloned().collect(); let post_token_balances = if record_token_balances { collect_token_balances(&bank, &batch, &mut mint_decimals) } else { @@ -144,7 +146,7 @@ fn execute_batch( transaction_status_sender.send_transaction_status_batch( bank.clone(), - batch.transactions(), + txs, execution_results, balances, token_balances, @@ -209,9 +211,10 @@ pub fn process_entries( replay_vote_sender: Option<&ReplayVoteSender>, ) -> Result<()> { let mut timings = ExecuteTimings::default(); + let mut entry_types: Vec<_> = entries.iter().map(EntryType::from).collect(); let result = process_entries_with_callback( bank, - entries, + &mut entry_types, randomize, None, transaction_status_sender, @@ -226,7 +229,7 @@ pub fn process_entries( // Note: If randomize is true this will shuffle entries' transactions in-place. fn process_entries_with_callback( bank: &Arc, - entries: &mut [Entry], + entries: &mut [EntryType], randomize: bool, entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option, @@ -236,76 +239,78 @@ fn process_entries_with_callback( // accumulator for entries that can be processed in parallel let mut batches = vec![]; let mut tick_hashes = vec![]; - if randomize { - let mut rng = thread_rng(); - for entry in entries.iter_mut() { - entry.transactions.shuffle(&mut rng); - } - } - for entry in entries { - if entry.is_tick() { - // If it's a tick, save it for later - tick_hashes.push(entry.hash); - if bank.is_block_boundary(bank.tick_height() + tick_hashes.len() as u64) { - // If it's a tick that will cause a new blockhash to be created, - // execute the group and register the tick - execute_batches( - bank, - &batches, - entry_callback, - transaction_status_sender.clone(), - replay_vote_sender, - timings, - )?; - batches.clear(); - for hash in &tick_hashes { - bank.register_tick(hash); - } - tick_hashes.clear(); - } - continue; - } - // else loop on processing the entry - loop { - // try to lock the accounts - let batch = bank.prepare_batch(&entry.transactions); - let first_lock_err = first_err(batch.lock_results()); + let mut rng = thread_rng(); - // if locking worked - if first_lock_err.is_ok() { - batches.push(batch); - // done with this entry - break; + for entry in entries { + match entry { + EntryType::Tick(hash) => { + // If it's a tick, save it for later + tick_hashes.push(hash); + if bank.is_block_boundary(bank.tick_height() + tick_hashes.len() as u64) { + // If it's a tick that will cause a new blockhash to be created, + // execute the group and register the tick + execute_batches( + bank, + &batches, + entry_callback, + transaction_status_sender.clone(), + replay_vote_sender, + timings, + )?; + batches.clear(); + for hash in &tick_hashes { + bank.register_tick(hash); + } + tick_hashes.clear(); + } } - // else we failed to lock, 2 possible reasons - if batches.is_empty() { - // An entry has account lock conflicts with *itself*, which should not happen - // if generated by a properly functioning leader - datapoint_error!( - "validator_process_entry_error", - ( - "error", - format!( - "Lock accounts error, entry conflicts with itself, txs: {:?}", - entry.transactions - ), - String - ) - ); - // bail - first_lock_err?; - } else { - // else we have an entry that conflicts with a prior entry - // execute the current queue and try to process this entry again - execute_batches( - bank, - &batches, - entry_callback, - transaction_status_sender.clone(), - replay_vote_sender, - timings, - )?; - batches.clear(); + EntryType::Transactions(transactions) => { + if randomize { + transactions.shuffle(&mut rng); + } + + loop { + // try to lock the accounts + let batch = bank.prepare_hashed_batch(transactions); + let first_lock_err = first_err(batch.lock_results()); + + // if locking worked + if first_lock_err.is_ok() { + batches.push(batch); + // done with this entry + break; + } + // else we failed to lock, 2 possible reasons + if batches.is_empty() { + // An entry has account lock conflicts with *itself*, which should not happen + // if generated by a properly functioning leader + datapoint_error!( + "validator_process_entry_error", + ( + "error", + format!( + "Lock accounts error, entry conflicts with itself, txs: {:?}", + transactions + ), + String + ) + ); + // bail + first_lock_err?; + } else { + // else we have an entry that conflicts with a prior entry + // execute the current queue and try to process this entry again + execute_batches( + bank, + &batches, + entry_callback, + transaction_status_sender.clone(), + replay_vote_sender, + timings, + )?; + batches.clear(); + } + } } } } @@ -668,7 +673,7 @@ pub fn confirm_slot( ) -> result::Result<(), BlockstoreProcessorError> { let slot = bank.slot(); - let (mut entries, num_shreds, slot_full) = { + let (entries, num_shreds, slot_full) = { let mut load_elapsed = Measure::start("load_elapsed"); let load_result = blockstore .get_slot_entries_with_shred_info(slot, progress.num_shreds, allow_dead_slots) @@ -711,13 +716,10 @@ pub fn confirm_slot( })?; } + let last_entry_hash = entries.last().map(|e| e.hash); let verifier = if !skip_verification { datapoint_debug!("verify-batch-size", ("size", num_entries as i64, i64)); - let entry_state = entries.start_verify( - &progress.last_entry, - recyclers.clone(), - bank.secp256k1_program_enabled(), - ); + let entry_state = entries.start_verify(&progress.last_entry, recyclers.clone()); if entry_state.status() == EntryVerificationStatus::Failure { warn!("Ledger proof of history failed at slot: {}", slot); return Err(BlockError::InvalidEntryHash.into()); @@ -727,6 +729,16 @@ pub fn confirm_slot( None }; + let check_start = Instant::now(); + let check_result = + entries.verify_and_hash_transactions(skip_verification, bank.secp256k1_program_enabled()); + if check_result.is_none() { + warn!("Ledger proof of history failed at slot: {}", slot); + return Err(BlockError::InvalidEntryHash.into()); + } + let transaction_duration_us = timing::duration_as_us(&check_start.elapsed()); + + let mut entries = check_result.unwrap(); let mut replay_elapsed = Measure::start("replay_elapsed"); let mut execute_timings = ExecuteTimings::default(); // Note: This will shuffle entries' transactions in-place. @@ -746,9 +758,9 @@ pub fn confirm_slot( timing.execute_timings.accumulate(&execute_timings); if let Some(mut verifier) = verifier { - let verified = verifier.finish_verify(&entries); + let verified = verifier.finish_verify(); timing.poh_verify_elapsed += verifier.poh_duration_us(); - timing.transaction_verify_elapsed += verifier.transaction_duration_us(); + timing.transaction_verify_elapsed += transaction_duration_us; if !verified { warn!("Ledger proof of history failed at slot: {}", bank.slot()); return Err(BlockError::InvalidEntryHash.into()); @@ -760,8 +772,8 @@ pub fn confirm_slot( progress.num_shreds += num_shreds; progress.num_entries += num_entries; progress.num_txs += num_txs; - if let Some(last_entry) = entries.last() { - progress.last_entry = last_entry.hash; + if let Some(last_entry_hash) = last_entry_hash { + progress.last_entry = last_entry_hash; } Ok(()) @@ -1070,7 +1082,7 @@ fn process_single_slot( timing: &mut ExecuteTimings, ) -> result::Result<(), BlockstoreProcessorError> { // Mark corrupt slots as dead so validators don't replay this slot and - // see DuplicateSignature errors later in ReplayStage + // see AlreadyProcessed errors later in ReplayStage confirm_full_slot(blockstore, bank, opts, recyclers, progress, transaction_status_sender, replay_vote_sender, timing).map_err(|err| { let slot = bank.slot(); warn!("slot {} failed to verify: {}", slot, err); @@ -1114,7 +1126,7 @@ impl TransactionStatusSender { pub fn send_transaction_status_batch( &self, bank: Arc, - transactions: &[Transaction], + transactions: Vec, statuses: Vec, balances: TransactionBalancesSet, token_balances: TransactionTokenBalancesSet, @@ -1131,7 +1143,7 @@ impl TransactionStatusSender { .sender .send(TransactionStatusMessage::Batch(TransactionStatusBatch { bank, - transactions: transactions.to_vec(), + transactions, statuses, balances, token_balances, @@ -1838,22 +1850,22 @@ pub mod tests { fn test_first_err() { assert_eq!(first_err(&[Ok(())]), Ok(())); assert_eq!( - first_err(&[Ok(()), Err(TransactionError::DuplicateSignature)]), - Err(TransactionError::DuplicateSignature) + first_err(&[Ok(()), Err(TransactionError::AlreadyProcessed)]), + Err(TransactionError::AlreadyProcessed) ); assert_eq!( first_err(&[ Ok(()), - Err(TransactionError::DuplicateSignature), + Err(TransactionError::AlreadyProcessed), Err(TransactionError::AccountInUse) ]), - Err(TransactionError::DuplicateSignature) + Err(TransactionError::AlreadyProcessed) ); assert_eq!( first_err(&[ Ok(()), Err(TransactionError::AccountInUse), - Err(TransactionError::DuplicateSignature) + Err(TransactionError::AlreadyProcessed) ]), Err(TransactionError::AccountInUse) ); @@ -1861,7 +1873,7 @@ pub mod tests { first_err(&[ Err(TransactionError::AccountInUse), Ok(()), - Err(TransactionError::DuplicateSignature) + Err(TransactionError::AlreadyProcessed) ]), Err(TransactionError::AccountInUse) ); @@ -2279,13 +2291,13 @@ pub mod tests { // Check all accounts are unlocked let txs1 = &entry_1_to_mint.transactions[..]; let txs2 = &entry_2_to_3_mint_to_1.transactions[..]; - let batch1 = bank.prepare_batch(txs1); + let batch1 = bank.prepare_batch(txs1.iter()); for result in batch1.lock_results() { assert!(result.is_ok()); } // txs1 and txs2 have accounts that conflict, so we must drop txs1 first drop(batch1); - let batch2 = bank.prepare_batch(txs2); + let batch2 = bank.prepare_batch(txs2.iter()); for result in batch2.lock_results() { assert!(result.is_ok()); } @@ -2656,7 +2668,7 @@ pub mod tests { ); assert_eq!( bank.transfer(10_001, &mint_keypair, &pubkey), - Err(TransactionError::DuplicateSignature) + Err(TransactionError::AlreadyProcessed) ); // Make sure other errors don't update the signature cache @@ -2964,16 +2976,7 @@ pub mod tests { let entry = next_entry(&new_blockhash, 1, vec![tx]); entries.push(entry); - process_entries_with_callback( - &bank0, - &mut entries, - true, - None, - None, - None, - &mut ExecuteTimings::default(), - ) - .unwrap(); + process_entries(&bank0, &mut entries, true, None, None).unwrap(); assert_eq!(bank0.get_balance(&keypair.pubkey()), 1) } @@ -3047,7 +3050,7 @@ pub mod tests { ); account_loaded_twice.message.account_keys[1] = mint_keypair.pubkey(); let transactions = [account_not_found_tx, account_loaded_twice]; - let batch = bank.prepare_batch(&transactions); + let batch = bank.prepare_batch(transactions.iter()); let ( TransactionResults { fee_collection_results, diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index c52e6f4db3..4df5257386 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -17,10 +17,12 @@ use solana_perf::cuda_runtime::PinnedVec; use solana_perf::perf_libs; use solana_perf::recycler::Recycler; use solana_rayon_threadlimit::get_thread_count; +use solana_runtime::hashed_transaction::HashedTransaction; use solana_sdk::hash::Hash; use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::timing; use solana_sdk::transaction::Transaction; +use std::borrow::Cow; use std::cell::RefCell; use std::ffi::OsStr; use std::sync::mpsc::{Receiver, Sender}; @@ -118,6 +120,28 @@ pub struct Entry { pub transactions: Vec, } +/// Typed entry to distinguish between transaction and tick entries +pub enum EntryType<'a> { + Transactions(Vec>), + Tick(Hash), +} + +impl<'a> From<&'a Entry> for EntryType<'a> { + fn from(entry: &'a Entry) -> Self { + if entry.transactions.is_empty() { + EntryType::Tick(entry.hash) + } else { + EntryType::Transactions( + entry + .transactions + .iter() + .map(HashedTransaction::from) + .collect(), + ) + } + } +} + impl Entry { /// Creates the next Entry `num_hashes` after `start_hash`. pub fn new(prev_hash: &Hash, mut num_hashes: u64, transactions: Vec) -> Self { @@ -207,10 +231,20 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction } } +/// Last action required to verify an entry +enum VerifyAction { + /// Mixin a hash before computing the last hash for a transaction entry + Mixin(Hash), + /// Compute one last hash for a tick entry + Tick, + /// No action needed (tick entry with no hashes) + None, +} + pub struct GpuVerificationData { thread_h: Option>, hashes: Option>>>, - tx_hashes: Vec>, + verifications: Option>, } pub enum DeviceVerificationData { @@ -221,7 +255,6 @@ pub enum DeviceVerificationData { pub struct EntryVerificationState { verification_status: EntryVerificationStatus, poh_duration_us: u64, - transaction_duration_us: u64, device_verification_data: DeviceVerificationData, } @@ -256,15 +289,7 @@ impl EntryVerificationState { self.poh_duration_us } - pub fn set_transaction_duration_us(&mut self, new: u64) { - self.transaction_duration_us = new; - } - - pub fn transaction_duration_us(&self) -> u64 { - self.transaction_duration_us - } - - pub fn finish_verify(&mut self, entries: &[Entry]) -> bool { + pub fn finish_verify(&mut self) -> bool { match &mut self.device_verification_data { DeviceVerificationData::Gpu(verification_state) => { let gpu_time_us = verification_state.thread_h.take().unwrap().join().unwrap(); @@ -279,19 +304,17 @@ impl EntryVerificationState { thread_pool.borrow().install(|| { hashes .into_par_iter() - .zip(&verification_state.tx_hashes) - .zip(entries) - .all(|((hash, tx_hash), answer)| { - if answer.num_hashes == 0 { - *hash == answer.hash - } else { - let mut poh = Poh::new(*hash, None); - if let Some(mixin) = tx_hash { - poh.record(*mixin).unwrap().hash == answer.hash - } else { - poh.tick().unwrap().hash == answer.hash + .cloned() + .zip(verification_state.verifications.take().unwrap()) + .all(|(hash, (action, expected))| { + let actual = match action { + VerifyAction::Mixin(mixin) => { + Poh::new(hash, None).record(mixin).unwrap().hash } - } + VerifyAction::Tick => Poh::new(hash, None).tick().unwrap().hash, + VerifyAction::None => hash, + }; + actual == expected }) }) }); @@ -314,17 +337,17 @@ impl EntryVerificationState { } fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool { - if ref_entry.num_hashes == 0 { - computed_hash == ref_entry.hash - } else { + let actual = if !ref_entry.transactions.is_empty() { + let tx_hash = hash_transactions(&ref_entry.transactions); let mut poh = Poh::new(computed_hash, None); - if ref_entry.transactions.is_empty() { - poh.tick().unwrap().hash == ref_entry.hash - } else { - let tx_hash = hash_transactions(&ref_entry.transactions); - poh.record(tx_hash).unwrap().hash == ref_entry.hash - } - } + poh.record(tx_hash).unwrap().hash + } else if ref_entry.num_hashes > 0 { + let mut poh = Poh::new(computed_hash, None); + poh.tick().unwrap().hash + } else { + computed_hash + }; + actual == ref_entry.hash } // an EntrySlice is a slice of Entries @@ -333,12 +356,8 @@ pub trait EntrySlice { fn verify_cpu(&self, start_hash: &Hash) -> EntryVerificationState; fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState; fn verify_cpu_x86_simd(&self, start_hash: &Hash, simd_len: usize) -> EntryVerificationState; - fn start_verify( - &self, - start_hash: &Hash, - recyclers: VerifyRecyclers, - secp256k1_program_enabled: bool, - ) -> EntryVerificationState; + fn start_verify(&self, start_hash: &Hash, recyclers: VerifyRecyclers) + -> EntryVerificationState; fn verify(&self, start_hash: &Hash) -> bool; /// Checks that each entry tick has the correct number of hashes. Entry slices do not /// necessarily end in a tick, so `tick_hash_count` is used to carry over the hash count @@ -346,13 +365,17 @@ pub trait EntrySlice { fn verify_tick_hash_count(&self, tick_hash_count: &mut u64, hashes_per_tick: u64) -> bool; /// Counts tick entries fn tick_count(&self) -> u64; - fn verify_transaction_signatures(&self, secp256k1_program_enabled: bool) -> bool; + fn verify_and_hash_transactions( + &self, + skip_verification: bool, + secp256k1_program_enabled: bool, + ) -> Option>>; } impl EntrySlice for [Entry] { fn verify(&self, start_hash: &Hash) -> bool { - self.start_verify(start_hash, VerifyRecyclers::default(), true) - .finish_verify(self) + self.start_verify(start_hash, VerifyRecyclers::default()) + .finish_verify() } fn verify_cpu_generic(&self, start_hash: &Hash) -> EntryVerificationState { @@ -388,7 +411,6 @@ impl EntrySlice for [Entry] { EntryVerificationStatus::Failure }, poh_duration_us, - transaction_duration_us: 0, device_verification_data: DeviceVerificationData::Cpu(), } } @@ -472,7 +494,6 @@ impl EntrySlice for [Entry] { EntryVerificationStatus::Failure }, poh_duration_us, - transaction_duration_us: 0, device_verification_data: DeviceVerificationData::Cpu(), } } @@ -499,25 +520,46 @@ impl EntrySlice for [Entry] { } } - fn verify_transaction_signatures(&self, secp256k1_program_enabled: bool) -> bool { - let verify = |tx: &Transaction| { - tx.verify().is_ok() - && { - match bincode::serialized_size(tx) { - Ok(size) => size <= PACKET_DATA_SIZE as u64, - Err(_) => false, - } + fn verify_and_hash_transactions<'a>( + &'a self, + skip_verification: bool, + secp256k1_program_enabled: bool, + ) -> Option>> { + let verify_and_hash = |tx: &'a Transaction| -> Option> { + let message_hash = if !skip_verification { + let size = bincode::serialized_size(tx).ok()?; + if size > PACKET_DATA_SIZE as u64 { + return None; } - && ( + if secp256k1_program_enabled { // Verify tx precompiles if secp256k1 program is enabled. - !secp256k1_program_enabled || tx.verify_precompiles().is_ok() - ) + tx.verify_precompiles().ok()?; + } + tx.verify_and_hash_message().ok()? + } else { + tx.message().hash() + }; + + Some(HashedTransaction::new(Cow::Borrowed(tx), message_hash)) }; + PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { self.par_iter() - .flat_map(|entry| &entry.transactions) - .all(verify) + .map(|entry| { + if entry.transactions.is_empty() { + Some(EntryType::Tick(entry.hash)) + } else { + Some(EntryType::Transactions( + entry + .transactions + .par_iter() + .map(verify_and_hash) + .collect::>>()?, + )) + } + }) + .collect() }) }) } @@ -526,26 +568,11 @@ impl EntrySlice for [Entry] { &self, start_hash: &Hash, recyclers: VerifyRecyclers, - secp256k1_program_enabled: bool, ) -> EntryVerificationState { - let start = Instant::now(); - let res = self.verify_transaction_signatures(secp256k1_program_enabled); - let transaction_duration_us = timing::duration_as_us(&start.elapsed()); - if !res { - return EntryVerificationState { - verification_status: EntryVerificationStatus::Failure, - transaction_duration_us, - poh_duration_us: 0, - device_verification_data: DeviceVerificationData::Cpu(), - }; - } - let start = Instant::now(); let api = perf_libs::api(); if api.is_none() { - let mut res: EntryVerificationState = self.verify_cpu(start_hash); - res.set_transaction_duration_us(transaction_duration_us); - return res; + return self.verify_cpu(start_hash); } let api = api.unwrap(); inc_new_counter_info!("entry_verify-num_entries", self.len() as usize); @@ -600,15 +627,21 @@ impl EntrySlice for [Entry] { timing::duration_as_us(&gpu_wait.elapsed()) }); - let tx_hashes = PAR_THREAD_POOL.with(|thread_pool| { + let verifications = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { self.into_par_iter() .map(|entry| { - if entry.transactions.is_empty() { - None + let answer = entry.hash; + let action = if entry.transactions.is_empty() { + if entry.num_hashes == 0 { + VerifyAction::None + } else { + VerifyAction::Tick + } } else { - Some(hash_transactions(&entry.transactions)) - } + VerifyAction::Mixin(hash_transactions(&entry.transactions)) + }; + (action, answer) }) .collect() }) @@ -616,13 +649,12 @@ impl EntrySlice for [Entry] { let device_verification_data = DeviceVerificationData::Gpu(GpuVerificationData { thread_h: Some(gpu_verify_thread), - tx_hashes, + verifications: Some(verifications), hashes: Some(hashes), }); EntryVerificationState { verification_status: EntryVerificationStatus::Pending, poh_duration_us: timing::duration_as_us(&start.elapsed()), - transaction_duration_us, device_verification_data, } } @@ -704,6 +736,7 @@ mod tests { use solana_sdk::{ hash::{hash, new_rand as hash_new_rand, Hash}, message::Message, + packet::PACKET_DATA_SIZE, signature::{Keypair, Signer}, system_transaction, transaction::Transaction, @@ -909,7 +942,7 @@ mod tests { } #[test] - fn test_verify_transaction_signatures_packet_data_size() { + fn test_verify_and_hash_transactions_packet_data_size() { let mut rng = rand::thread_rng(); let recent_blockhash = hash_new_rand(&mut rng); let keypair = Keypair::new(); @@ -931,14 +964,18 @@ mod tests { let tx = make_transaction(5); let entries = vec![next_entry(&recent_blockhash, 1, vec![tx.clone()])]; assert!(bincode::serialized_size(&tx).unwrap() <= PACKET_DATA_SIZE as u64); - assert!(entries[..].verify_transaction_signatures(false)); + assert!(entries[..] + .verify_and_hash_transactions(false, false) + .is_some()); } // Big transaction. { let tx = make_transaction(15); let entries = vec![next_entry(&recent_blockhash, 1, vec![tx.clone()])]; assert!(bincode::serialized_size(&tx).unwrap() > PACKET_DATA_SIZE as u64); - assert!(!entries[..].verify_transaction_signatures(false)); + assert!(entries[..] + .verify_and_hash_transactions(false, false) + .is_none()); } // Assert that verify fails as soon as serialized // size exceeds packet data size. @@ -947,7 +984,9 @@ mod tests { let entries = vec![next_entry(&recent_blockhash, 1, vec![tx.clone()])]; assert_eq!( bincode::serialized_size(&tx).unwrap() <= PACKET_DATA_SIZE as u64, - entries[..].verify_transaction_signatures(false), + entries[..] + .verify_and_hash_transactions(false, false) + .is_some(), ); } } diff --git a/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json b/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json index c38eddb27b..6a48736689 100644 --- a/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json +++ b/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json @@ -3634,7 +3634,7 @@ ], "orderByTime": "ASC", "policy": "default", - "query": "SELECT sum(\"count\") AS \"duplicate_signature\" FROM \"$testnet\".\"autogen\".\"bank-process_transactions-error-duplicate_signature\" WHERE $timeFilter GROUP BY time(1s) FILL(0)\n\n\n\n", + "query": "SELECT sum(\"count\") AS \"already_processed\" FROM \"$testnet\".\"autogen\".\"bank-process_transactions-error-already_processed\" WHERE $timeFilter GROUP BY time(1s) FILL(0)\n\n\n\n", "rawQuery": true, "refId": "G", "resultFormat": "time_series", diff --git a/poh-bench/src/main.rs b/poh-bench/src/main.rs index 03228cf90c..98e83f4f0f 100644 --- a/poh-bench/src/main.rs +++ b/poh-bench/src/main.rs @@ -75,7 +75,7 @@ fn main() { for _ in 0..iterations { assert!(ticks[..num_entries] .verify_cpu_generic(&start_hash) - .finish_verify(&ticks[..num_entries])); + .finish_verify()); } time.stop(); println!( @@ -89,7 +89,7 @@ fn main() { for _ in 0..iterations { assert!(ticks[..num_entries] .verify_cpu_x86_simd(&start_hash, 8) - .finish_verify(&ticks[..num_entries])); + .finish_verify()); } time.stop(); println!( @@ -104,7 +104,7 @@ fn main() { for _ in 0..iterations { assert!(ticks[..num_entries] .verify_cpu_x86_simd(&start_hash, 16) - .finish_verify(&ticks[..num_entries])); + .finish_verify()); } time.stop(); println!( @@ -119,8 +119,8 @@ fn main() { let recyclers = VerifyRecyclers::default(); for _ in 0..iterations { assert!(ticks[..num_entries] - .start_verify(&start_hash, recyclers.clone(), true) - .finish_verify(&ticks[..num_entries])); + .start_verify(&start_hash, recyclers.clone()) + .finish_verify()); } time.stop(); println!( diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index a4cfa7326f..63639f063b 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -3280,6 +3280,7 @@ name = "solana-program" version = "1.7.0" dependencies = [ "bincode", + "blake3", "borsh", "borsh-derive", "bs58", @@ -3359,6 +3360,7 @@ dependencies = [ name = "solana-runtime" version = "1.7.0" dependencies = [ + "arrayref", "bincode", "blake3", "bv", diff --git a/programs/bpf/tests/programs.rs b/programs/bpf/tests/programs.rs index 6707687546..d76261047a 100644 --- a/programs/bpf/tests/programs.rs +++ b/programs/bpf/tests/programs.rs @@ -269,7 +269,7 @@ fn process_transaction_and_record_inner( ) -> (Result<(), TransactionError>, Vec>) { let signature = tx.signatures.get(0).unwrap().clone(); let txs = vec![tx]; - let tx_batch = bank.prepare_batch(&txs); + let tx_batch = bank.prepare_batch(txs.iter()); let (mut results, _, mut inner, _transaction_logs) = bank.load_execute_and_commit_transactions( &tx_batch, MAX_PROCESSING_AGE, @@ -294,7 +294,7 @@ fn process_transaction_and_record_inner( } fn execute_transactions(bank: &Bank, txs: &[Transaction]) -> Vec { - let batch = bank.prepare_batch(txs); + let batch = bank.prepare_batch(txs.iter()); let mut timings = ExecuteTimings::default(); let mut mint_decimals = HashMap::new(); let tx_pre_token_balances = collect_token_balances(&bank, &batch, &mut mint_decimals); diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 7d582fbe00..58d2bf8490 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-runtime" edition = "2018" [dependencies] +arrayref = "0.3.6" bincode = "1.3.1" blake3 = "0.3.6" bv = { version = "0.11.1", features = ["serde"] } diff --git a/runtime/benches/bank.rs b/runtime/benches/bank.rs index ba196fbfe6..6fd9aa445b 100644 --- a/runtime/benches/bank.rs +++ b/runtime/benches/bank.rs @@ -86,7 +86,7 @@ pub fn create_native_loader_transactions( } fn sync_bencher(bank: &Arc, _bank_client: &BankClient, transactions: &[Transaction]) { - let results = bank.process_transactions(&transactions); + let results = bank.process_transactions(transactions); assert!(results.iter().all(Result::is_ok)); } diff --git a/runtime/benches/status_cache.rs b/runtime/benches/status_cache.rs index 5d61f16dab..66c4ec7883 100644 --- a/runtime/benches/status_cache.rs +++ b/runtime/benches/status_cache.rs @@ -10,7 +10,7 @@ use solana_sdk::{ }; use test::Bencher; -type BankStatusCache = StatusCache; +type BankStatusCache = StatusCache<()>; #[bench] fn test_statuscache_serialize(bencher: &mut Bencher) { diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 94b6cc666e..598afec098 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -386,10 +386,10 @@ impl Accounts { Ok(accounts) } - pub fn load_accounts( + pub fn load_accounts<'a>( &self, ancestors: &Ancestors, - txs: &[Transaction], + txs: impl Iterator, lock_results: Vec, hash_queue: &BlockhashQueue, error_counters: &mut ErrorCounters, @@ -400,8 +400,7 @@ impl Accounts { secp256k1_program_enabled: feature_set .is_active(&feature_set::secp256k1_program_enabled::id()), }; - txs.iter() - .zip(lock_results) + txs.zip(lock_results) .map(|etx| match etx { (tx, (Ok(()), nonce_rollback)) => { let fee_calculator = nonce_rollback @@ -793,14 +792,13 @@ impl Accounts { /// This function will prevent multiple threads from modifying the same account state at the /// same time #[must_use] - pub fn lock_accounts( + pub fn lock_accounts<'a>( &self, - txs: &[Transaction], + txs: impl Iterator, demote_sysvar_write_locks: bool, ) -> Vec> { use solana_sdk::sanitize::Sanitize; let keys: Vec> = txs - .iter() .map(|tx| { tx.sanitize().map_err(TransactionError::from)?; @@ -825,15 +823,15 @@ impl Accounts { } /// Once accounts are unlocked, new transactions that modify that state can enter the pipeline - pub fn unlock_accounts( + pub fn unlock_accounts<'a>( &self, - txs: &[Transaction], + txs: impl Iterator, results: &[Result<()>], demote_sysvar_write_locks: bool, ) { let mut account_locks = self.account_locks.lock().unwrap(); debug!("bank unlock accounts"); - for (tx, lock_result) in txs.iter().zip(results) { + for (tx, lock_result) in txs.zip(results) { self.unlock_account( tx, lock_result, @@ -846,12 +844,12 @@ impl Accounts { /// Store the accounts into the DB // allow(clippy) needed for various gating flags #[allow(clippy::too_many_arguments)] - pub fn store_cached( + pub fn store_cached<'a>( &self, slot: Slot, - txs: &[Transaction], - res: &[TransactionExecutionResult], - loaded: &mut [TransactionLoadResult], + txs: impl Iterator, + res: &'a [TransactionExecutionResult], + loaded: &'a mut [TransactionLoadResult], rent_collector: &RentCollector, last_blockhash_with_fee_calculator: &(Hash, FeeCalculator), fix_recent_blockhashes_sysvar_delay: bool, @@ -882,7 +880,7 @@ impl Accounts { fn collect_accounts_to_store<'a>( &self, - txs: &'a [Transaction], + txs: impl Iterator, res: &'a [TransactionExecutionResult], loaded: &'a mut [TransactionLoadResult], rent_collector: &RentCollector, @@ -1071,7 +1069,7 @@ mod tests { let ancestors = vec![(0, 0)].into_iter().collect(); accounts.load_accounts( &ancestors, - &[tx], + [tx].iter(), vec![(Ok(()), None)], &hash_queue, error_counters, @@ -1673,7 +1671,7 @@ mod tests { ); let tx = Transaction::new(&[&keypair0], message, Hash::default()); let results0 = accounts.lock_accounts( - &[tx.clone()], + [tx.clone()].iter(), true, // demote_sysvar_write_locks ); @@ -1711,7 +1709,8 @@ mod tests { let tx1 = Transaction::new(&[&keypair1], message, Hash::default()); let txs = vec![tx0, tx1]; let results1 = accounts.lock_accounts( - &txs, true, // demote_sysvar_write_locks + txs.iter(), + true, // demote_sysvar_write_locks ); assert!(results1[0].is_ok()); // Read-only account (keypair1) can be referenced multiple times @@ -1728,12 +1727,14 @@ mod tests { ); accounts.unlock_accounts( - &[tx], + [tx].iter(), &results0, true, // demote_sysvar_write_locks ); accounts.unlock_accounts( - &txs, &results1, true, // demote_sysvar_write_locks + txs.iter(), + &results1, + true, // demote_sysvar_write_locks ); let instructions = vec![CompiledInstruction::new(2, &(), vec![0, 1])]; let message = Message::new_with_compiled_instructions( @@ -1746,7 +1747,7 @@ mod tests { ); let tx = Transaction::new(&[&keypair1], message, Hash::default()); let results2 = accounts.lock_accounts( - &[tx], + [tx].iter(), true, // demote_sysvar_write_locks ); assert!(results2[0].is_ok()); // Now keypair1 account can be locked as writable @@ -1813,7 +1814,8 @@ mod tests { loop { let txs = vec![writable_tx.clone()]; let results = accounts_clone.clone().lock_accounts( - &txs, true, // demote_sysvar_write_locks + txs.iter(), + true, // demote_sysvar_write_locks ); for result in results.iter() { if result.is_ok() { @@ -1821,7 +1823,9 @@ mod tests { } } accounts_clone.unlock_accounts( - &txs, &results, true, // demote_sysvar_write_locks + txs.iter(), + &results, + true, // demote_sysvar_write_locks ); if exit_clone.clone().load(Ordering::Relaxed) { break; @@ -1832,7 +1836,8 @@ mod tests { for _ in 0..5 { let txs = vec![readonly_tx.clone()]; let results = accounts_arc.clone().lock_accounts( - &txs, true, // demote_sysvar_write_locks + txs.iter(), + true, // demote_sysvar_write_locks ); if results[0].is_ok() { let counter_value = counter_clone.clone().load(Ordering::SeqCst); @@ -1840,7 +1845,9 @@ mod tests { assert_eq!(counter_value, counter_clone.clone().load(Ordering::SeqCst)); } accounts_arc.unlock_accounts( - &txs, &results, true, // demote_sysvar_write_locks + txs.iter(), + &results, + true, // demote_sysvar_write_locks ); thread::sleep(time::Duration::from_millis(50)); } @@ -1922,7 +1929,7 @@ mod tests { .insert_new_readonly(&pubkey); } let collected_accounts = accounts.collect_accounts_to_store( - &txs, + txs.iter(), &loaders, loaded.as_mut_slice(), &rent_collector, @@ -1991,7 +1998,7 @@ mod tests { let mut error_counters = ErrorCounters::default(); accounts.load_accounts( &ancestors, - &[tx], + [tx].iter(), vec![(Ok(()), None)], &hash_queue, &mut error_counters, @@ -2286,7 +2293,7 @@ mod tests { let accounts = Accounts::new_with_config(Vec::new(), &ClusterType::Development, HashSet::new(), false); let collected_accounts = accounts.collect_accounts_to_store( - &txs, + txs.iter(), &loaders, loaded.as_mut_slice(), &rent_collector, @@ -2397,7 +2404,7 @@ mod tests { let accounts = Accounts::new_with_config(Vec::new(), &ClusterType::Development, HashSet::new(), false); let collected_accounts = accounts.collect_accounts_to_store( - &txs, + txs.iter(), &loaders, loaded.as_mut_slice(), &rent_collector, diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 2633e0c74a..93dc260e9f 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -118,7 +118,7 @@ pub struct ErrorCounters { pub blockhash_not_found: usize, pub blockhash_too_old: usize, pub call_chain_too_deep: usize, - pub duplicate_signature: usize, + pub already_processed: usize, pub instruction_error: usize, pub insufficient_funds: usize, pub invalid_account_for_fee: usize, diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 038302cc0a..8aa76e4bec 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -12,6 +12,7 @@ use crate::{ blockhash_queue::BlockhashQueue, builtins::{self, ActivationType}, epoch_stakes::{EpochStakes, NodeVoteAccounts}, + hashed_transaction::{HashedTransaction, HashedTransactionSlice}, inline_spl_token_v2_0, instruction_recorder::InstructionRecorder, log_collector::LogCollector, @@ -76,6 +77,7 @@ use solana_stake_program::stake_state::{ }; use solana_vote_program::vote_instruction::VoteInstruction; use std::{ + borrow::Cow, cell::RefCell, collections::{HashMap, HashSet}, convert::{TryFrom, TryInto}, @@ -98,6 +100,7 @@ pub const MAX_LEADER_SCHEDULE_STAKES: Epoch = 5; #[derive(Default, Debug)] pub struct ExecuteTimings { + pub check_us: u64, pub load_us: u64, pub execute_us: u64, pub store_us: u64, @@ -106,6 +109,7 @@ pub struct ExecuteTimings { impl ExecuteTimings { pub fn accumulate(&mut self, other: &ExecuteTimings) { + self.check_us += other.check_us; self.load_us += other.load_us; self.execute_us += other.execute_us; self.store_us += other.store_us; @@ -113,8 +117,8 @@ impl ExecuteTimings { } } -type BankStatusCache = StatusCache>; -#[frozen_abi(digest = "4mSWwHd4RrLjCXH7RFrm6K3wZSsi9DfVJK3Ngz9jKk7D")] +type BankStatusCache = StatusCache>; +#[frozen_abi(digest = "3TYCJ7hSJ5ig2NmnwxSn1ggkzm6JCmMHoyRMBQQsLCa3")] pub type BankSlotDelta = SlotDelta>; type TransactionAccountRefCells = Vec>>; type TransactionAccountDepRefCells = Vec<(Pubkey, Rc>)>; @@ -2350,10 +2354,15 @@ impl Bank { } } - fn update_transaction_statuses(&self, txs: &[Transaction], res: &[TransactionExecutionResult]) { + fn update_transaction_statuses( + &self, + hashed_txs: &[HashedTransaction], + res: &[TransactionExecutionResult], + ) { let mut status_cache = self.src.status_cache.write().unwrap(); - assert_eq!(txs.len(), res.len()); - for (tx, (res, _nonce_rollback)) in txs.iter().zip(res) { + assert_eq!(hashed_txs.len(), res.len()); + for (hashed_tx, (res, _nonce_rollback)) in hashed_txs.iter().zip(res) { + let tx = hashed_tx.transaction(); if Self::can_commit(res) && !tx.signatures.is_empty() { status_cache.insert( &tx.message().recent_blockhash, @@ -2361,6 +2370,12 @@ impl Bank { self.slot(), res.clone(), ); + status_cache.insert( + &tx.message().recent_blockhash, + &hashed_tx.message_hash, + self.slot(), + res.clone(), + ); } } } @@ -2399,27 +2414,32 @@ impl Bank { tick_height % self.ticks_per_slot == 0 } - /// Process a Transaction. This is used for unit tests and simply calls the vector - /// Bank::process_transactions method - pub fn process_transaction(&self, tx: &Transaction) -> Result<()> { - let txs = vec![tx.clone()]; - self.process_transactions(&txs)[0].clone()?; - tx.signatures - .get(0) - .map_or(Ok(()), |sig| self.get_signature_status(sig).unwrap()) - } - pub fn demote_sysvar_write_locks(&self) -> bool { self.feature_set .is_active(&feature_set::demote_sysvar_write_locks::id()) } - pub fn prepare_batch<'a, 'b>(&'a self, txs: &'b [Transaction]) -> TransactionBatch<'a, 'b> { - let lock_results = self - .rc - .accounts - .lock_accounts(txs, self.demote_sysvar_write_locks()); - TransactionBatch::new(lock_results, &self, txs) + pub fn prepare_batch<'a, 'b>( + &'a self, + txs: impl Iterator, + ) -> TransactionBatch<'a, 'b> { + let hashed_txs: Vec = txs.map(HashedTransaction::from).collect(); + let lock_results = self.rc.accounts.lock_accounts( + hashed_txs.as_transactions_iter(), + self.demote_sysvar_write_locks(), + ); + TransactionBatch::new(lock_results, &self, Cow::Owned(hashed_txs)) + } + + pub fn prepare_hashed_batch<'a, 'b>( + &'a self, + hashed_txs: &'b [HashedTransaction], + ) -> TransactionBatch<'a, 'b> { + let lock_results = self.rc.accounts.lock_accounts( + hashed_txs.as_transactions_iter(), + self.demote_sysvar_write_locks(), + ); + TransactionBatch::new(lock_results, &self, Cow::Borrowed(hashed_txs)) } pub fn prepare_simulation_batch<'a, 'b>( @@ -2430,7 +2450,8 @@ impl Bank { .iter() .map(|tx| tx.sanitize().map_err(|e| e.into())) .collect(); - let mut batch = TransactionBatch::new(lock_results, &self, txs); + let hashed_txs = txs.iter().map(HashedTransaction::from).collect(); + let mut batch = TransactionBatch::new(lock_results, &self, hashed_txs); batch.needs_unlock = false; batch } @@ -2480,7 +2501,7 @@ impl Bank { if batch.needs_unlock { batch.needs_unlock = false; self.rc.accounts.unlock_accounts( - batch.transactions(), + batch.transactions_iter(), batch.lock_results(), self.demote_sysvar_write_locks(), ) @@ -2495,16 +2516,15 @@ impl Bank { self.rc.accounts.accounts_db.set_shrink_paths(paths); } - fn check_age( + fn check_age<'a>( &self, - txs: &[Transaction], + txs: impl Iterator, lock_results: Vec>, max_age: usize, error_counters: &mut ErrorCounters, ) -> Vec { let hash_queue = self.blockhash_queue.read().unwrap(); - txs.iter() - .zip(lock_results) + txs.zip(lock_results) .map(|(tx, lock_res)| match lock_res { Ok(()) => { let message = tx.message(); @@ -2526,47 +2546,62 @@ impl Bank { .collect() } - fn check_signatures( + fn is_tx_already_processed( &self, - txs: &[Transaction], + hashed_tx: &HashedTransaction, + status_cache: &StatusCache>, + ) -> bool { + let tx = hashed_tx.transaction(); + if status_cache + .get_status( + &hashed_tx.message_hash, + &tx.message().recent_blockhash, + &self.ancestors, + ) + .is_some() + { + return true; + } + + // Fallback to signature check in case this validator has only recently started + // adding the message hash to the status cache. + return tx + .signatures + .get(0) + .and_then(|sig0| { + status_cache.get_status(sig0, &tx.message().recent_blockhash, &self.ancestors) + }) + .is_some(); + } + + fn check_status_cache( + &self, + hashed_txs: &[HashedTransaction], lock_results: Vec, error_counters: &mut ErrorCounters, ) -> Vec { let rcache = self.src.status_cache.read().unwrap(); - txs.iter() + hashed_txs + .iter() .zip(lock_results) - .map(|(tx, lock_res)| { - if tx.signatures.is_empty() { - return lock_res; + .map(|(hashed_tx, (lock_res, nonce_rollback))| { + if lock_res.is_ok() && self.is_tx_already_processed(hashed_tx, &rcache) { + error_counters.already_processed += 1; + return (Err(TransactionError::AlreadyProcessed), None); } - { - let (lock_res, _nonce_rollback) = &lock_res; - if lock_res.is_ok() - && rcache - .get_status( - &tx.signatures[0], - &tx.message().recent_blockhash, - &self.ancestors, - ) - .is_some() - { - error_counters.duplicate_signature += 1; - return (Err(TransactionError::DuplicateSignature), None); - } - } - lock_res + + (lock_res, nonce_rollback) }) .collect() } - fn filter_by_vote_transactions( + fn filter_by_vote_transactions<'a>( &self, - txs: &[Transaction], + txs: impl Iterator, lock_results: Vec, error_counters: &mut ErrorCounters, ) -> Vec { - txs.iter() - .zip(lock_results) + txs.zip(lock_results) .map(|(tx, lock_res)| { if lock_res.0.is_ok() { if is_simple_vote_transaction(tx) { @@ -2615,24 +2650,33 @@ impl Bank { pub fn check_transactions( &self, - txs: &[Transaction], + hashed_txs: &[HashedTransaction], lock_results: &[Result<()>], max_age: usize, mut error_counters: &mut ErrorCounters, ) -> Vec { - let age_results = self.check_age(txs, lock_results.to_vec(), max_age, &mut error_counters); - let sigcheck_results = self.check_signatures(txs, age_results, &mut error_counters); + let age_results = self.check_age( + hashed_txs.as_transactions_iter(), + lock_results.to_vec(), + max_age, + &mut error_counters, + ); + let cache_results = self.check_status_cache(hashed_txs, age_results, &mut error_counters); if self.upgrade_epoch() { // Reject all non-vote transactions - self.filter_by_vote_transactions(txs, sigcheck_results, &mut error_counters) + self.filter_by_vote_transactions( + hashed_txs.as_transactions_iter(), + cache_results, + &mut error_counters, + ) } else { - sigcheck_results + cache_results } } pub fn collect_balances(&self, batch: &TransactionBatch) -> TransactionBalances { let mut balances: TransactionBalances = vec![]; - for transaction in batch.transactions() { + for transaction in batch.transactions_iter() { let mut transaction_balances: Vec = vec![]; for account_key in transaction.message.account_keys.iter() { transaction_balances.push(self.get_balance(account_key)); @@ -2704,10 +2748,10 @@ impl Bank { error_counters.instruction_error ); } - if 0 != error_counters.duplicate_signature { + if 0 != error_counters.already_processed { inc_new_counter_info!( - "bank-process_transactions-error-duplicate_signature", - error_counters.duplicate_signature + "bank-process_transactions-error-already_processed", + error_counters.already_processed ); } if 0 != error_counters.not_allowed_during_cluster_maintenance { @@ -2851,11 +2895,10 @@ impl Bank { u64, u64, ) { - let txs = batch.transactions(); - debug!("processing transactions: {}", txs.len()); - inc_new_counter_info!("bank-process_transactions", txs.len()); + let hashed_txs = batch.hashed_transactions(); + debug!("processing transactions: {}", hashed_txs.len()); + inc_new_counter_info!("bank-process_transactions", hashed_txs.len()); let mut error_counters = ErrorCounters::default(); - let mut load_time = Measure::start("accounts_load"); let retryable_txs: Vec<_> = batch .lock_results() @@ -2871,12 +2914,20 @@ impl Bank { }) .collect(); - let sig_results = - self.check_transactions(txs, batch.lock_results(), max_age, &mut error_counters); + let mut check_time = Measure::start("check_transactions"); + let check_results = self.check_transactions( + hashed_txs, + batch.lock_results(), + max_age, + &mut error_counters, + ); + check_time.stop(); + + let mut load_time = Measure::start("accounts_load"); let mut loaded_accounts = self.rc.accounts.load_accounts( &self.ancestors, - txs, - sig_results, + hashed_txs.as_transactions_iter(), + check_results, &self.blockhash_queue.read().unwrap(), &mut error_counters, &self.rent_collector, @@ -2887,20 +2938,19 @@ impl Bank { let mut execution_time = Measure::start("execution_time"); let mut signature_count: u64 = 0; let mut inner_instructions: Vec> = - Vec::with_capacity(txs.len()); - let mut transaction_log_messages = Vec::with_capacity(txs.len()); + Vec::with_capacity(hashed_txs.len()); + let mut transaction_log_messages = Vec::with_capacity(hashed_txs.len()); let bpf_compute_budget = self .bpf_compute_budget .unwrap_or_else(BpfComputeBudget::new); let executed: Vec = loaded_accounts .iter_mut() - .zip(txs) + .zip(hashed_txs.as_transactions_iter()) .map(|(accs, tx)| match accs { (Err(e), _nonce_rollback) => (Err(e.clone()), None), (Ok(loaded_transaction), nonce_rollback) => { signature_count += u64::from(tx.message().header.num_required_signatures); - let executors = self.get_executors(&tx.message, &loaded_transaction.loaders); let (account_refcells, account_dep_refcells, loader_refcells) = @@ -2984,11 +3034,13 @@ impl Bank { execution_time.stop(); debug!( - "load: {}us execute: {}us txs_len={}", + "check: {}us load: {}us execute: {}us txs_len={}", + check_time.as_us(), load_time.as_us(), execution_time.as_us(), - txs.len(), + hashed_txs.len(), ); + timings.check_us += check_time.as_us(); timings.load_us += load_time.as_us(); timings.execute_us += execution_time.as_us(); @@ -2997,7 +3049,8 @@ impl Bank { let transaction_log_collector_config = self.transaction_log_collector_config.read().unwrap(); - for (i, ((r, _nonce_rollback), tx)) in executed.iter().zip(txs).enumerate() { + for (i, ((r, _nonce_rollback), hashed_tx)) in executed.iter().zip(hashed_txs).enumerate() { + let tx = hashed_tx.transaction(); if let Some(debug_keys) = &self.transaction_debug_keys { for key in &tx.message.account_keys { if debug_keys.contains(key) { @@ -3080,9 +3133,9 @@ impl Bank { ) } - fn filter_program_errors_and_collect_fee( + fn filter_program_errors_and_collect_fee<'a>( &self, - txs: &[Transaction], + txs: impl Iterator, executed: &[TransactionExecutionResult], ) -> Vec> { let hash_queue = self.blockhash_queue.read().unwrap(); @@ -3093,7 +3146,6 @@ impl Bank { }; let results = txs - .iter() .zip(executed) .map(|(tx, (res, nonce_rollback))| { let (fee_calculator, is_durable_nonce) = nonce_rollback @@ -3142,7 +3194,7 @@ impl Bank { pub fn commit_transactions( &self, - txs: &[Transaction], + hashed_txs: &[HashedTransaction], loaded_accounts: &mut [TransactionLoadResult], executed: &[TransactionExecutionResult], tx_count: u64, @@ -3160,8 +3212,8 @@ impl Bank { inc_new_counter_info!("bank-process_transactions-txs", tx_count as usize); inc_new_counter_info!("bank-process_transactions-sigs", signature_count as usize); - if !txs.is_empty() { - let processed_tx_count = txs.len() as u64; + if !hashed_txs.is_empty() { + let processed_tx_count = hashed_txs.len() as u64; let failed_tx_count = processed_tx_count.saturating_sub(tx_count); self.transaction_error_count .fetch_add(failed_tx_count, Relaxed); @@ -3180,7 +3232,7 @@ impl Bank { let mut write_time = Measure::start("write_time"); self.rc.accounts.store_cached( self.slot(), - txs, + hashed_txs.as_transactions_iter(), executed, loaded_accounts, &self.rent_collector, @@ -3190,14 +3242,23 @@ impl Bank { ); self.collect_rent(executed, loaded_accounts); - let overwritten_vote_accounts = self.update_cached_accounts(txs, executed, loaded_accounts); + let overwritten_vote_accounts = self.update_cached_accounts( + hashed_txs.as_transactions_iter(), + executed, + loaded_accounts, + ); // once committed there is no way to unroll write_time.stop(); - debug!("store: {}us txs_len={}", write_time.as_us(), txs.len(),); + debug!( + "store: {}us txs_len={}", + write_time.as_us(), + hashed_txs.len() + ); timings.store_us += write_time.as_us(); - self.update_transaction_statuses(txs, &executed); - let fee_collection_results = self.filter_program_errors_and_collect_fee(txs, executed); + self.update_transaction_statuses(hashed_txs, &executed); + let fee_collection_results = + self.filter_program_errors_and_collect_fee(hashed_txs.as_transactions_iter(), executed); TransactionResults { fee_collection_results, @@ -3796,7 +3857,7 @@ impl Bank { ); let results = self.commit_transactions( - batch.transactions(), + batch.hashed_transactions(), &mut loaded_accounts, &executed, tx_count, @@ -3816,11 +3877,26 @@ impl Bank { ) } + /// Process a Transaction. This is used for unit tests and simply calls the vector + /// Bank::process_transactions method + pub fn process_transaction(&self, tx: &Transaction) -> Result<()> { + let batch = self.prepare_batch(std::iter::once(tx)); + self.process_transaction_batch(&batch)[0].clone()?; + tx.signatures + .get(0) + .map_or(Ok(()), |sig| self.get_signature_status(sig).unwrap()) + } + #[must_use] pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { - let batch = self.prepare_batch(txs); + let batch = self.prepare_batch(txs.iter()); + self.process_transaction_batch(&batch) + } + + #[must_use] + fn process_transaction_batch(&self, batch: &TransactionBatch) -> Vec> { self.load_execute_and_commit_transactions( - &batch, + batch, MAX_PROCESSING_AGE, false, false, @@ -4384,9 +4460,9 @@ impl Bank { } /// a bank-level cache of vote accounts - fn update_cached_accounts( + fn update_cached_accounts<'a>( &self, - txs: &[Transaction], + txs: impl Iterator, res: &[TransactionExecutionResult], loaded: &[TransactionLoadResult], ) -> Vec { @@ -7562,7 +7638,7 @@ pub(crate) mod tests { ]; let initial_balance = bank.get_balance(&leader); - let results = bank.filter_program_errors_and_collect_fee(&[tx1, tx2], &results); + let results = bank.filter_program_errors_and_collect_fee([tx1, tx2].iter(), &results); bank.freeze(); assert_eq!( bank.get_balance(&leader), @@ -7688,7 +7764,7 @@ pub(crate) mod tests { system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, genesis_config.hash()); let pay_alice = vec![tx1]; - let lock_result = bank.prepare_batch(&pay_alice); + let lock_result = bank.prepare_batch(pay_alice.iter()); let results_alice = bank .load_execute_and_commit_transactions( &lock_result, @@ -7741,7 +7817,7 @@ pub(crate) mod tests { let tx = Transaction::new(&[&key0], message, genesis_config.hash()); let txs = vec![tx]; - let batch0 = bank.prepare_batch(&txs); + let batch0 = bank.prepare_batch(txs.iter()); assert!(batch0.lock_results()[0].is_ok()); // Try locking accounts, locking a previously read-only account as writable @@ -7759,7 +7835,7 @@ pub(crate) mod tests { let tx = Transaction::new(&[&key1], message, genesis_config.hash()); let txs = vec![tx]; - let batch1 = bank.prepare_batch(&txs); + let batch1 = bank.prepare_batch(txs.iter()); assert!(batch1.lock_results()[0].is_err()); // Try locking a previously read-only account a 2nd time; should succeed @@ -7776,7 +7852,7 @@ pub(crate) mod tests { let tx = Transaction::new(&[&key2], message, genesis_config.hash()); let txs = vec![tx]; - let batch2 = bank.prepare_batch(&txs); + let batch2 = bank.prepare_batch(txs.iter()); assert!(batch2.lock_results()[0].is_ok()); } @@ -7839,9 +7915,38 @@ pub(crate) mod tests { assert!(Arc::ptr_eq(&bank.parents()[0], &parent)); } + /// Verifies that transactions are dropped if they have already been processed + #[test] + fn test_tx_already_processed() { + let (genesis_config, mint_keypair) = create_genesis_config(2); + let bank = Bank::new(&genesis_config); + + let key1 = Keypair::new(); + let mut tx = + system_transaction::transfer(&mint_keypair, &key1.pubkey(), 1, genesis_config.hash()); + + // First process `tx` so that the status cache is updated + assert_eq!(bank.process_transaction(&tx), Ok(())); + + // Ensure that signature check works + assert_eq!( + bank.process_transaction(&tx), + Err(TransactionError::AlreadyProcessed) + ); + + // Clear transaction signature + tx.signatures[0] = Signature::default(); + + // Ensure that message hash check works + assert_eq!( + bank.process_transaction(&tx), + Err(TransactionError::AlreadyProcessed) + ); + } + /// Verifies that last ids and status cache are correctly referenced from parent #[test] - fn test_bank_parent_duplicate_signature() { + fn test_bank_parent_already_processed() { let (genesis_config, mint_keypair) = create_genesis_config(2); let key1 = Keypair::new(); let parent = Arc::new(Bank::new(&genesis_config)); @@ -7852,7 +7957,7 @@ pub(crate) mod tests { let bank = new_from_parent(&parent); assert_eq!( bank.process_transaction(&tx), - Err(TransactionError::DuplicateSignature) + Err(TransactionError::AlreadyProcessed) ); } @@ -9612,14 +9717,14 @@ pub(crate) mod tests { instructions, ); let txs = vec![tx0, tx1]; - let batch = bank0.prepare_batch(&txs); + let batch = bank0.prepare_batch(txs.iter()); let balances = bank0.collect_balances(&batch); assert_eq!(balances.len(), 2); assert_eq!(balances[0], vec![8, 11, 1]); assert_eq!(balances[1], vec![8, 0, 1]); let txs: Vec<_> = txs.iter().rev().cloned().collect(); - let batch = bank0.prepare_batch(&txs); + let batch = bank0.prepare_batch(txs.iter()); let balances = bank0.collect_balances(&batch); assert_eq!(balances.len(), 2); assert_eq!(balances[0], vec![8, 0, 1]); @@ -9653,7 +9758,7 @@ pub(crate) mod tests { let tx2 = system_transaction::transfer(&keypair1, &pubkey2, 12, blockhash); let txs = vec![tx0, tx1, tx2]; - let lock_result = bank0.prepare_batch(&txs); + let lock_result = bank0.prepare_batch(txs.iter()); let (transaction_results, transaction_balances_set, inner_instructions, transaction_logs) = bank0.load_execute_and_commit_transactions( &lock_result, @@ -12088,7 +12193,7 @@ pub(crate) mod tests { let tx1 = system_transaction::transfer(&sender1, &recipient1, 110, blockhash); // Should produce insufficient funds log let failure_sig = tx1.signatures[0]; let txs = vec![tx1, tx0]; - let batch = bank.prepare_batch(&txs); + let batch = bank.prepare_batch(txs.iter()); let log_results = bank .load_execute_and_commit_transactions( diff --git a/runtime/src/bank_utils.rs b/runtime/src/bank_utils.rs index 6530269043..87ec5caef3 100644 --- a/runtime/src/bank_utils.rs +++ b/runtime/src/bank_utils.rs @@ -1,9 +1,10 @@ use crate::{ bank::{Bank, TransactionResults}, genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, + hashed_transaction::HashedTransaction, vote_sender_types::ReplayVoteSender, }; -use solana_sdk::{pubkey::Pubkey, signature::Signer, transaction::Transaction}; +use solana_sdk::{pubkey::Pubkey, signature::Signer}; use solana_vote_program::vote_transaction; pub fn setup_bank_and_vote_pubkeys(num_vote_accounts: usize, stake: u64) -> (Bank, Vec) { @@ -27,7 +28,7 @@ pub fn setup_bank_and_vote_pubkeys(num_vote_accounts: usize, stake: u64) -> (Ban } pub fn find_and_send_votes( - txs: &[Transaction], + hashed_txs: &[HashedTransaction], tx_results: &TransactionResults, vote_sender: Option<&ReplayVoteSender>, ) { @@ -41,7 +42,7 @@ pub fn find_and_send_votes( assert!(execution_results[old_account.transaction_result_index] .0 .is_ok()); - let transaction = &txs[old_account.transaction_index]; + let transaction = hashed_txs[old_account.transaction_index].transaction(); if let Some(parsed_vote) = vote_transaction::parse_vote_transaction(transaction) { if parsed_vote.1.slots.last().is_some() { let _ = vote_sender.send(parsed_vote); diff --git a/runtime/src/hashed_transaction.rs b/runtime/src/hashed_transaction.rs new file mode 100644 index 0000000000..3f3b35f918 --- /dev/null +++ b/runtime/src/hashed_transaction.rs @@ -0,0 +1,50 @@ +use solana_sdk::{hash::Hash, transaction::Transaction}; +use std::borrow::Cow; + +/// Transaction and the hash of its message +#[derive(Debug, Clone)] +pub struct HashedTransaction<'a> { + transaction: Cow<'a, Transaction>, + pub message_hash: Hash, +} + +impl<'a> HashedTransaction<'a> { + pub fn new(transaction: Cow<'a, Transaction>, message_hash: Hash) -> Self { + Self { + transaction, + message_hash, + } + } + + pub fn transaction(&self) -> &Transaction { + self.transaction.as_ref() + } +} + +impl<'a> From for HashedTransaction<'_> { + fn from(transaction: Transaction) -> Self { + Self { + message_hash: transaction.message().hash(), + transaction: Cow::Owned(transaction), + } + } +} + +impl<'a> From<&'a Transaction> for HashedTransaction<'a> { + fn from(transaction: &'a Transaction) -> Self { + Self { + message_hash: transaction.message().hash(), + transaction: Cow::Borrowed(transaction), + } + } +} + +pub trait HashedTransactionSlice<'a> { + fn as_transactions_iter(&'a self) -> Box + '_>; +} + +impl<'a> HashedTransactionSlice<'a> for [HashedTransaction<'a>] { + fn as_transactions_iter(&'a self) -> Box + '_> { + Box::new(self.iter().map(|h| h.transaction.as_ref())) + } +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index f49a5a760a..bce1573251 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -19,6 +19,7 @@ pub mod contains; pub mod epoch_stakes; pub mod genesis_utils; pub mod hardened_unpack; +pub mod hashed_transaction; pub mod inline_spl_token_v2_0; pub mod instruction_recorder; pub mod loader_utils; diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index 8bfb91acbe..84cce73898 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -9,7 +9,6 @@ use solana_sdk::{ }; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, - marker::PhantomData, sync::{Arc, Mutex}, }; @@ -42,27 +41,25 @@ pub struct SignatureConfirmationStatus { } #[derive(Clone, Debug, AbiExample)] -pub struct StatusCache { +pub struct StatusCache { cache: KeyStatusMap, roots: HashSet, /// all keys seen during a fork/slot slot_deltas: SlotDeltaMap, - phantom: PhantomData, } -impl Default for StatusCache { +impl Default for StatusCache { fn default() -> Self { Self { cache: HashMap::default(), // 0 is always a root roots: [0].iter().cloned().collect(), slot_deltas: HashMap::default(), - phantom: PhantomData::default(), } } } -impl, T: Serialize + Clone + PartialEq> PartialEq for StatusCache { +impl PartialEq for StatusCache { fn eq(&self, other: &Self) -> bool { self.roots == other.roots && self @@ -88,7 +85,7 @@ impl, T: Serialize + Clone + PartialEq> PartialEq for StatusCache } } -impl, T: Serialize + Clone> StatusCache { +impl StatusCache { pub fn clear_slot_entries(&mut self, slot: Slot) { let slot_deltas = self.slot_deltas.remove(&slot); if let Some(slot_deltas) = slot_deltas { @@ -131,17 +128,19 @@ impl, T: Serialize + Clone> StatusCache { /// Check if the key is in any of the forks in the ancestors set and /// with a certain blockhash. - pub fn get_status( + pub fn get_status>( &self, - key: &K, + key: K, transaction_blockhash: &Hash, ancestors: &Ancestors, ) -> Option<(Slot, T)> { let map = self.cache.get(transaction_blockhash)?; let (_, index, keymap) = map; - let mut key_slice = [0u8; CACHED_KEY_SIZE]; - key_slice.clone_from_slice(&key.as_ref()[*index..*index + CACHED_KEY_SIZE]); - if let Some(stored_forks) = keymap.get(&key_slice) { + let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1); + let index = (*index).min(max_key_index); + let key_slice: &[u8; CACHED_KEY_SIZE] = + arrayref::array_ref![key.as_ref(), index, CACHED_KEY_SIZE]; + if let Some(stored_forks) = keymap.get(key_slice) { let res = stored_forks .iter() .find(|(f, _)| ancestors.get(f).is_some() || self.roots.get(f).is_some()) @@ -156,7 +155,11 @@ impl, T: Serialize + Clone> StatusCache { /// Search for a key with any blockhash /// Prefer get_status for performance reasons, it doesn't need /// to search all blockhashes. - pub fn get_status_any_blockhash(&self, key: &K, ancestors: &Ancestors) -> Option<(Slot, T)> { + pub fn get_status_any_blockhash>( + &self, + key: &K, + ancestors: &Ancestors, + ) -> Option<(Slot, T)> { let mut keys = vec![]; let mut val: Vec<_> = self.cache.iter().map(|(k, _)| *k).collect(); keys.append(&mut val); @@ -183,22 +186,23 @@ impl, T: Serialize + Clone> StatusCache { } /// Insert a new key for a specific slot. - pub fn insert(&mut self, transaction_blockhash: &Hash, key: &K, slot: Slot, res: T) { - let key_index: usize; - if let Some(hash_map) = self.cache.get(transaction_blockhash) { - key_index = hash_map.1; - } else { - key_index = thread_rng().gen_range(0, std::mem::size_of::() - CACHED_KEY_SIZE); - } + pub fn insert>( + &mut self, + transaction_blockhash: &Hash, + key: &K, + slot: Slot, + res: T, + ) { + let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1); + let hash_map = self.cache.entry(*transaction_blockhash).or_insert_with(|| { + let key_index = thread_rng().gen_range(0, max_key_index + 1); + (slot, key_index, HashMap::new()) + }); - let hash_map = - self.cache - .entry(*transaction_blockhash) - .or_insert((slot, key_index, HashMap::new())); hash_map.0 = std::cmp::max(slot, hash_map.0); - let index = hash_map.1; + let key_index = hash_map.1.min(max_key_index); let mut key_slice = [0u8; CACHED_KEY_SIZE]; - key_slice.clone_from_slice(&key.as_ref()[index..index + CACHED_KEY_SIZE]); + key_slice.clone_from_slice(&key.as_ref()[key_index..key_index + CACHED_KEY_SIZE]); self.insert_with_slice(transaction_blockhash, slot, key_index, key_slice, res); } @@ -293,7 +297,7 @@ mod tests { use super::*; use solana_sdk::{hash::hash, signature::Signature}; - type BankStatusCache = StatusCache; + type BankStatusCache = StatusCache<()>; #[test] fn test_empty_has_no_sigs() { @@ -418,9 +422,9 @@ mod tests { status_cache.clear(); status_cache.insert(&blockhash, &sig, 0, ()); let (_, index, sig_map) = status_cache.cache.get(&blockhash).unwrap(); - let mut sig_slice = [0u8; CACHED_KEY_SIZE]; - sig_slice.clone_from_slice(&sig.as_ref()[*index..*index + CACHED_KEY_SIZE]); - assert!(sig_map.get(&sig_slice).is_some()); + let sig_slice: &[u8; CACHED_KEY_SIZE] = + arrayref::array_ref![sig.as_ref(), *index, CACHED_KEY_SIZE]; + assert!(sig_map.get(sig_slice).is_some()); } #[test] @@ -510,4 +514,26 @@ mod tests { .is_none()); assert!(status_cache.cache.is_empty()); } + + // Status cache uses a random key offset for each blockhash. Ensure that shorter + // keys can still be used if the offset if greater than the key length. + #[test] + fn test_different_sized_keys() { + let mut status_cache = BankStatusCache::default(); + let ancestors = vec![(0, 0)].into_iter().collect(); + let blockhash = Hash::default(); + for _ in 0..100 { + let blockhash = hash(blockhash.as_ref()); + let sig_key = Signature::default(); + let hash_key = Hash::new_unique(); + status_cache.insert(&blockhash, &sig_key, 0, ()); + status_cache.insert(&blockhash, &hash_key, 0, ()); + assert!(status_cache + .get_status(&sig_key, &blockhash, &ancestors) + .is_some()); + assert!(status_cache + .get_status(&hash_key, &blockhash, &ancestors) + .is_some()); + } + } } diff --git a/runtime/src/transaction_batch.rs b/runtime/src/transaction_batch.rs index cc33ccd626..076ed77d6d 100644 --- a/runtime/src/transaction_batch.rs +++ b/runtime/src/transaction_batch.rs @@ -1,11 +1,13 @@ use crate::bank::Bank; +use crate::hashed_transaction::HashedTransaction; use solana_sdk::transaction::{Result, Transaction}; +use std::borrow::Cow; // Represents the results of trying to lock a set of accounts pub struct TransactionBatch<'a, 'b> { lock_results: Vec>, bank: &'a Bank, - transactions: &'b [Transaction], + hashed_txs: Cow<'b, [HashedTransaction<'b>]>, pub(crate) needs_unlock: bool, } @@ -13,13 +15,13 @@ impl<'a, 'b> TransactionBatch<'a, 'b> { pub fn new( lock_results: Vec>, bank: &'a Bank, - transactions: &'b [Transaction], + hashed_txs: Cow<'b, [HashedTransaction<'b>]>, ) -> Self { - assert_eq!(lock_results.len(), transactions.len()); + assert_eq!(lock_results.len(), hashed_txs.len()); Self { lock_results, bank, - transactions, + hashed_txs, needs_unlock: true, } } @@ -28,8 +30,12 @@ impl<'a, 'b> TransactionBatch<'a, 'b> { &self.lock_results } - pub fn transactions(&self) -> &[Transaction] { - self.transactions + pub fn hashed_transactions(&self) -> &[HashedTransaction] { + &self.hashed_txs + } + + pub fn transactions_iter(&self) -> impl Iterator { + self.hashed_txs.iter().map(|h| h.transaction()) } pub fn bank(&self) -> &Bank { @@ -55,20 +61,20 @@ mod tests { let (bank, txs) = setup(); // Test getting locked accounts - let batch = bank.prepare_batch(&txs); + let batch = bank.prepare_batch(txs.iter()); // Grab locks assert!(batch.lock_results().iter().all(|x| x.is_ok())); // Trying to grab locks again should fail - let batch2 = bank.prepare_batch(&txs); + let batch2 = bank.prepare_batch(txs.iter()); assert!(batch2.lock_results().iter().all(|x| x.is_err())); // Drop the first set of locks drop(batch); // Now grabbing locks should work again - let batch2 = bank.prepare_batch(&txs); + let batch2 = bank.prepare_batch(txs.iter()); assert!(batch2.lock_results().iter().all(|x| x.is_ok())); } @@ -81,7 +87,7 @@ mod tests { assert!(batch.lock_results().iter().all(|x| x.is_ok())); // Grab locks - let batch2 = bank.prepare_batch(&txs); + let batch2 = bank.prepare_batch(txs.iter()); assert!(batch2.lock_results().iter().all(|x| x.is_ok())); // Prepare another batch without locks diff --git a/sdk/program/Cargo.toml b/sdk/program/Cargo.toml index 2743d6fbb1..e54a7b8c93 100644 --- a/sdk/program/Cargo.toml +++ b/sdk/program/Cargo.toml @@ -11,6 +11,7 @@ edition = "2018" [dependencies] bincode = "1.3.1" +blake3 = "0.3.7" borsh = "0.8.1" borsh-derive = "0.8.1" bs58 = "0.3.1" @@ -32,7 +33,8 @@ solana-sdk-macro = { path = "../macro", version = "=1.7.0" } thiserror = "1.0" [target.'cfg(not(target_arch = "bpf"))'.dependencies] -curve25519-dalek = { version = "2.1.0" } +blake3 = "0.3.6" +curve25519-dalek = "2.1.0" rand = "0.7.0" solana-logger = { path = "../../logger", version = "=1.7.0" } diff --git a/sdk/program/src/message.rs b/sdk/program/src/message.rs index 2ff13c86aa..f070f65965 100644 --- a/sdk/program/src/message.rs +++ b/sdk/program/src/message.rs @@ -7,11 +7,12 @@ use crate::serialize_utils::{ }; use crate::{ bpf_loader, bpf_loader_deprecated, bpf_loader_upgradeable, - hash::Hash, + hash::{Hash, HASH_BYTES}, instruction::{AccountMeta, CompiledInstruction, Instruction}, pubkey::Pubkey, short_vec, system_instruction, system_program, sysvar, }; +use blake3::traits::digest::Digest; use itertools::Itertools; use lazy_static::lazy_static; use std::{convert::TryFrom, str::FromStr}; @@ -296,6 +297,20 @@ impl Message { Self::new(&instructions, payer) } + /// Compute the blake3 hash of this transaction's message + pub fn hash(&self) -> Hash { + let message_bytes = self.serialize(); + Self::hash_raw_message(&message_bytes) + } + + /// Compute the blake3 hash of a raw transaction message + pub fn hash_raw_message(message_bytes: &[u8]) -> Hash { + let mut hasher = blake3::Hasher::new(); + hasher.update(b"solana-tx-message-v1"); + hasher.update(message_bytes); + Hash(<[u8; HASH_BYTES]>::try_from(hasher.finalize().as_slice()).unwrap()) + } + pub fn compile_instruction(&self, ix: &Instruction) -> CompiledInstruction { compile_instruction(ix, &self.account_keys) } @@ -1009,4 +1024,36 @@ mod tests { MESSAGE_HEADER_LENGTH ); } + + #[test] + fn test_message_hash() { + // when this test fails, it's most likely due to a new serialized format of a message. + // in this case, the domain prefix `solana-tx-message-v1` should be updated. + let program_id0 = Pubkey::from_str("4uQeVj5tqViQh7yWWGStvkEG1Zmhx6uasJtWCJziofM").unwrap(); + let program_id1 = Pubkey::from_str("8opHzTAnfzRpPEx21XtnrVTX28YQuCpAjcn1PczScKh").unwrap(); + let id0 = Pubkey::from_str("CiDwVBFgWV9E5MvXWoLgnEgn2hK7rJikbvfWavzAQz3").unwrap(); + let id1 = Pubkey::from_str("GcdayuLaLyrdmUu324nahyv33G5poQdLUEZ1nEytDeP").unwrap(); + let id2 = Pubkey::from_str("LX3EUdRUBUa3TbsYXLEUdj9J3prXkWXvLYSWyYyc2Jj").unwrap(); + let id3 = Pubkey::from_str("QRSsyMWN1yHT9ir42bgNZUNZ4PdEhcSWCrL2AryKpy5").unwrap(); + let instructions = vec![ + Instruction::new_with_bincode(program_id0, &0, vec![AccountMeta::new(id0, false)]), + Instruction::new_with_bincode(program_id0, &0, vec![AccountMeta::new(id1, true)]), + Instruction::new_with_bincode( + program_id1, + &0, + vec![AccountMeta::new_readonly(id2, false)], + ), + Instruction::new_with_bincode( + program_id1, + &0, + vec![AccountMeta::new_readonly(id3, true)], + ), + ]; + + let message = Message::new(&instructions, Some(&id1)); + assert_eq!( + message.hash(), + Hash::from_str("CXRH7GHLieaQZRUjH1mpnNnUZQtU4V4RpJpAFgy77i3z").unwrap() + ) + } } diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 31f6152a90..d09601b7f8 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -39,9 +39,9 @@ impl Packet { Self { data, meta } } - pub fn from_data(dest: &SocketAddr, data: T) -> Result { + pub fn from_data(dest: Option<&SocketAddr>, data: T) -> Result { let mut packet = Packet::default(); - Self::populate_packet(&mut packet, Some(dest), &data)?; + Self::populate_packet(&mut packet, dest, &data)?; Ok(packet) } diff --git a/sdk/src/transaction.rs b/sdk/src/transaction.rs index efdb1a0f05..781b5fb958 100644 --- a/sdk/src/transaction.rs +++ b/sdk/src/transaction.rs @@ -50,11 +50,11 @@ pub enum TransactionError { #[error("This account may not be used to pay transaction fees")] InvalidAccountForFee, - /// The bank has seen this `Signature` before. This can occur under normal operation + /// The bank has seen this transaction before. This can occur under normal operation /// when a UDP packet is duplicated, as a user error from a client not updating /// its `recent_blockhash`, or as a double-spend attack. - #[error("The bank has seen this signature before")] - DuplicateSignature, + #[error("This transaction has already been processed")] + AlreadyProcessed, /// The bank has not seen the given `recent_blockhash` or the transaction is too old and /// the `recent_blockhash` has been discarded. @@ -317,18 +317,11 @@ impl Transaction { Ok(()) } - pub fn verify_with_results(&self) -> Vec { - self.signatures - .iter() - .zip(&self.message.account_keys) - .map(|(signature, pubkey)| signature.verify(pubkey.as_ref(), &self.message_data())) - .collect() - } - /// Verify the transaction pub fn verify(&self) -> Result<()> { + let message_bytes = self.message_data(); if !self - .verify_with_results() + ._verify_with_results(&message_bytes) .iter() .all(|verify_result| *verify_result) { @@ -338,6 +331,32 @@ impl Transaction { } } + /// Verify the transaction and hash its message + pub fn verify_and_hash_message(&self) -> Result { + let message_bytes = self.message_data(); + if !self + ._verify_with_results(&message_bytes) + .iter() + .all(|verify_result| *verify_result) + { + Err(TransactionError::SignatureFailure) + } else { + Ok(Message::hash_raw_message(&message_bytes)) + } + } + + pub fn verify_with_results(&self) -> Vec { + self._verify_with_results(&self.message_data()) + } + + pub(crate) fn _verify_with_results(&self, message_bytes: &[u8]) -> Vec { + self.signatures + .iter() + .zip(&self.message.account_keys) + .map(|(signature, pubkey)| signature.verify(pubkey.as_ref(), message_bytes)) + .collect() + } + pub fn verify_precompiles(&self) -> Result<()> { for instruction in &self.message().instructions { // The Transaction may not be sanitized at this point diff --git a/storage-proto/proto/solana.storage.transaction_by_addr.rs b/storage-proto/proto/solana.storage.transaction_by_addr.rs index 692a9b939e..49aa771b64 100644 --- a/storage-proto/proto/solana.storage.transaction_by_addr.rs +++ b/storage-proto/proto/solana.storage.transaction_by_addr.rs @@ -56,7 +56,7 @@ pub enum TransactionErrorType { ProgramAccountNotFound = 3, InsufficientFundsForFee = 4, InvalidAccountForFee = 5, - DuplicateSignature = 6, + AlreadyProcessed = 6, BlockhashNotFound = 7, InstructionError = 8, CallChainTooDeep = 9, diff --git a/storage-proto/src/convert.rs b/storage-proto/src/convert.rs index 88cf13cbf9..99b45c563c 100644 --- a/storage-proto/src/convert.rs +++ b/storage-proto/src/convert.rs @@ -520,7 +520,7 @@ impl TryFrom for TransactionError { 3 => TransactionError::ProgramAccountNotFound, 4 => TransactionError::InsufficientFundsForFee, 5 => TransactionError::InvalidAccountForFee, - 6 => TransactionError::DuplicateSignature, + 6 => TransactionError::AlreadyProcessed, 7 => TransactionError::BlockhashNotFound, 9 => TransactionError::CallChainTooDeep, 10 => TransactionError::MissingSignatureForFee, @@ -554,8 +554,8 @@ impl From for tx_by_addr::TransactionError { TransactionError::InvalidAccountForFee => { tx_by_addr::TransactionErrorType::InvalidAccountForFee } - TransactionError::DuplicateSignature => { - tx_by_addr::TransactionErrorType::DuplicateSignature + TransactionError::AlreadyProcessed => { + tx_by_addr::TransactionErrorType::AlreadyProcessed } TransactionError::BlockhashNotFound => { tx_by_addr::TransactionErrorType::BlockhashNotFound @@ -903,7 +903,7 @@ mod test { tx_by_addr_transaction_error.try_into().unwrap() ); - let transaction_error = TransactionError::DuplicateSignature; + let transaction_error = TransactionError::AlreadyProcessed; let tx_by_addr_transaction_error: tx_by_addr::TransactionError = transaction_error.clone().into(); assert_eq!( diff --git a/transaction-status/src/token_balances.rs b/transaction-status/src/token_balances.rs index 087ae99fff..ab3bfa87ed 100644 --- a/transaction-status/src/token_balances.rs +++ b/transaction-status/src/token_balances.rs @@ -55,7 +55,7 @@ pub fn collect_token_balances( ) -> TransactionTokenBalances { let mut balances: TransactionTokenBalances = vec![]; - for transaction in batch.transactions() { + for transaction in batch.transactions_iter() { let account_keys = &transaction.message.account_keys; let has_token_program = account_keys.iter().any(|p| is_token_program(p));