From 70ec2cd2444097eb90ffdc3367b92981f8ce5c8a Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 4 Feb 2022 03:52:11 +0000 Subject: [PATCH] Return actual committed transactions from process_transactions() (backport #22802) (#22904) * Return actual committed transactions from process_transactions() * resolve conflicts * fixup comment * Fixup banking_stage-dropped_tx_before_forwarding counter * Count cost model throttled transactions * fixup tx_count moved * Fixup tests * remove qos service * Cleanup clippy Co-authored-by: Carl Lin --- core/src/banking_stage.rs | 659 ++++++++++++++++++++++++++++++-------- program-test/src/lib.rs | 5 +- runtime/src/bank.rs | 184 +++++++---- sdk/src/feature_set.rs | 5 + 4 files changed, 643 insertions(+), 210 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index c9af213a0c..1ecde06dd3 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -19,7 +19,10 @@ use { solana_poh::poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder}, solana_runtime::{ accounts_db::ErrorCounters, - bank::{Bank, TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult}, + bank::{ + Bank, LoadAndExecuteTransactionsOutput, TransactionBalancesSet, TransactionCheckResult, + TransactionExecutionResult, + }, bank_utils, cost_model::CostModel, cost_tracker::CostTracker, @@ -80,6 +83,65 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; +/// A summary of what happened to transactions passed to the execution pipeline. +/// Transactions can +/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse +/// lock conflictss or CostModel compute limits. These types of errors are retryable and +/// counted in `Self::retryable_transaction_indexes`. +/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These +/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes` +/// 3) Were executed and committed, captured by `committed_transactions_count` below. +/// 4) Were executed and failed commit, captured by `failed_commit_count` below. +struct ProcessTransactionsSummary { + // Returns true if we hit the end of the block/max PoH height for the block before + // processing all the transactions in the batch. + reached_max_poh_height: bool, + + // Total number of transactions that were passed as candidates for execution. See description + // of struct above for possible outcomes for these transactions + #[allow(dead_code)] + transactions_attempted_execution_count: usize, + + // Total number of transactions that made it into the block + #[allow(dead_code)] + committed_transactions_count: usize, + + // Total number of transactions that made it into the block where the transactions + // output from execution was success/no error. + #[allow(dead_code)] + committed_transactions_with_successful_result_count: usize, + + // All transactions that were executed but then failed record because the + // slot ended + #[allow(dead_code)] + failed_commit_count: usize, + + // Indexes of transactions in the transactions slice that were not committed but are retryable + retryable_transaction_indexes: Vec, + + // The number of transactions filtered out by the cost model + #[allow(dead_code)] + cost_model_throttled_transactions_count: usize, +} + +pub struct ExecuteAndCommitTransactionsOutput { + // Total number of transactions that were passed as candidates for execution + transactions_attempted_execution_count: usize, + // The number of transactions of that were executed. See description of in `ProcessTransactionsSummary` + // for possible outcomes of execution. + executed_transactions_count: usize, + // Total number of the executed transactions that returned success/not + // an error. + executed_with_successful_result_count: usize, + // Transactions that either were not executed, or were executed and failed to be committed due + // to the block ending. + retryable_transaction_indexes: Vec, + // A result that indicates whether transactions were successfully + // committed into the Poh stream. If so, the result tells us + // how many such transactions were committed + commit_transactions_result: Result<(), PohRecorderError>, +} + #[derive(Debug, Default)] pub struct BankingStageStats { last_report: AtomicInterval, @@ -458,7 +520,7 @@ impl BankingStage { cost_model: &Arc>, ) { let mut rebuffered_packet_count = 0; - let mut new_tx_count = 0; + let mut consumed_buffered_packets_count = 0; let buffered_packet_batches_len = buffered_packet_batches.len(); let mut proc_start = Measure::start("consume_buffered_process"); let mut reached_end_of_slot = None; @@ -485,19 +547,23 @@ impl BankingStage { } else { let bank_start = poh_recorder.lock().unwrap().bank_start(); if let Some((bank, bank_creation_time)) = bank_start { - let (processed, verified_txs_len, new_unprocessed_indexes) = - Self::process_packets_transactions( - &bank, - &bank_creation_time, - recorder, - packet_batch, - original_unprocessed_indexes.to_owned(), - transaction_status_sender.clone(), - gossip_vote_sender, - banking_stage_stats, - cost_model, - ); - if processed < verified_txs_len + let process_transactions_summary = Self::process_packets_transactions( + &bank, + &bank_creation_time, + recorder, + packet_batch, + original_unprocessed_indexes.to_owned(), + transaction_status_sender.clone(), + gossip_vote_sender, + banking_stage_stats, + cost_model, + ); + let ProcessTransactionsSummary { + reached_max_poh_height, + retryable_transaction_indexes, + .. + } = process_transactions_summary; + if reached_max_poh_height || !Bank::should_bank_still_be_processing_txs( &bank_creation_time, max_tx_ingestion_ns, @@ -506,21 +572,30 @@ impl BankingStage { reached_end_of_slot = Some((poh_recorder.lock().unwrap().next_slot_leader(), bank)); } - new_tx_count += processed; + // The difference between all transactions passed to execution and the ones that + // are retryable were the ones that were either: + // 1) Committed into the block + // 2) Dropped without being committed because they had some fatal error (too old, + // duplicate signature, etc.) + // + // Note: This assumes that every packet deserializes into one transaction! + consumed_buffered_packets_count += original_unprocessed_indexes + .len() + .saturating_sub(retryable_transaction_indexes.len()); + // Out of the buffered packets just retried, collect any still unprocessed // transactions in this batch for forwarding - rebuffered_packet_count += new_unprocessed_indexes.len(); + rebuffered_packet_count += retryable_transaction_indexes.len(); let has_more_unprocessed_transactions = Self::update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes, - new_unprocessed_indexes, + retryable_transaction_indexes, ); if let Some(test_fn) = &test_fn { test_fn(); } has_more_unprocessed_transactions } else { - rebuffered_packet_count += original_unprocessed_indexes.len(); // `original_unprocessed_indexes` must have remaining packets to process // if not yet processed. assert!(Self::packet_has_more_unprocessed_transactions( @@ -538,8 +613,8 @@ impl BankingStage { timestamp(), buffered_packet_batches_len, proc_start.as_ms(), - new_tx_count, - (new_tx_count as f32) / (proc_start.as_s()) + consumed_buffered_packets_count, + (consumed_buffered_packets_count as f32) / (proc_start.as_s()) ); banking_stage_stats @@ -550,7 +625,7 @@ impl BankingStage { .fetch_add(rebuffered_packet_count, Ordering::Relaxed); banking_stage_stats .consumed_buffered_packets_count - .fetch_add(new_tx_count, Ordering::Relaxed); + .fetch_add(consumed_buffered_packets_count, Ordering::Relaxed); } fn consume_or_forward_packets( @@ -850,13 +925,13 @@ impl BankingStage { (Ok(num_to_commit), vec![]) } - fn process_and_record_transactions_locked( + fn execute_and_commit_transactions_locked( bank: &Arc, poh: &TransactionRecorder, batch: &TransactionBatch, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - ) -> (Result, Vec) { + ) -> ExecuteAndCommitTransactionsOutput { let mut load_execute_time = Measure::start("load_execute_time"); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce // the likelihood of any single thread getting starved and processing old ids. @@ -877,15 +952,7 @@ impl BankingStage { }; let mut execute_timings = ExecuteTimings::default(); - let ( - mut loaded_accounts, - results, - inner_instructions, - transaction_logs, - mut retryable_txs, - tx_count, - signature_count, - ) = bank.load_and_execute_transactions( + let load_and_execute_transactions_output = bank.load_and_execute_transactions( batch, MAX_PROCESSING_AGE, transaction_status_sender.is_some(), @@ -894,34 +961,60 @@ impl BankingStage { ); load_execute_time.stop(); + let LoadAndExecuteTransactionsOutput { + mut loaded_transactions, + execution_results, + inner_instructions, + transaction_log_messages, + mut retryable_transaction_indexes, + executed_transactions_count, + executed_with_successful_result_count, + signature_count, + .. + } = load_and_execute_transactions_output; + let freeze_lock = bank.freeze_lock(); let mut record_time = Measure::start("record_time"); - let (num_to_commit, retryable_record_txs) = - Self::record_transactions(bank.slot(), batch.transactions_iter(), &results, poh); + let (commit_transactions_result, retryable_record_transaction_indexes) = + Self::record_transactions( + bank.slot(), + batch.transactions_iter(), + &execution_results, + poh, + ); inc_new_counter_info!( "banking_stage-record_transactions_num_to_commit", - *num_to_commit.as_ref().unwrap_or(&0) + *commit_transactions_result.as_ref().unwrap_or(&0) ); inc_new_counter_info!( "banking_stage-record_transactions_retryable_record_txs", - retryable_record_txs.len() + retryable_record_transaction_indexes.len() ); - retryable_txs.extend(retryable_record_txs); - if num_to_commit.is_err() { - return (num_to_commit, retryable_txs); + retryable_transaction_indexes.extend(retryable_record_transaction_indexes); + let transactions_attempted_execution_count = execution_results.len(); + if let Err(e) = commit_transactions_result { + return ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + executed_with_successful_result_count, + retryable_transaction_indexes, + commit_transactions_result: Err(e), + }; } record_time.stop(); let mut commit_time = Measure::start("commit_time"); let hashed_txs = batch.hashed_transactions(); - let num_to_commit = num_to_commit.unwrap(); - if num_to_commit != 0 { + let committed_transaction_count = commit_transactions_result.unwrap(); + if committed_transaction_count != 0 { let tx_results = bank.commit_transactions( hashed_txs, - &mut loaded_accounts, - &results, - tx_count, + &mut loaded_transactions, + execution_results, + executed_transactions_count as u64, + executed_transactions_count.saturating_sub(executed_with_successful_result_count) + as u64, signature_count, &mut execute_timings, ); @@ -938,7 +1031,7 @@ impl BankingStage { TransactionBalancesSet::new(pre_balances, post_balances), TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances), inner_instructions, - transaction_logs, + transaction_log_messages, tx_results.rent_debits, ); } @@ -957,11 +1050,17 @@ impl BankingStage { ); debug!( - "process_and_record_transactions_locked: {:?}", + "execute_and_commit_transactions_locked: {:?}", execute_timings ); - (Ok(num_to_commit), retryable_txs) + ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + executed_with_successful_result_count, + retryable_transaction_indexes, + commit_transactions_result: Ok(()), + } } pub fn process_and_record_transactions( @@ -971,21 +1070,33 @@ impl BankingStage { chunk_offset: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - ) -> (Result, Vec) { + ) -> ExecuteAndCommitTransactionsOutput { let mut lock_time = Measure::start("lock_time"); // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state let batch = bank.prepare_hashed_batch(txs); lock_time.stop(); - let (result, mut retryable_txs) = Self::process_and_record_transactions_locked( - bank, - poh, - &batch, - transaction_status_sender, - gossip_vote_sender, - ); - retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); + // retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit + // WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit + // and WouldExceedMaxAccountDataCostLimit + let mut execute_and_commit_transactions_output = + Self::execute_and_commit_transactions_locked( + bank, + poh, + &batch, + transaction_status_sender, + gossip_vote_sender, + ); + + let ExecuteAndCommitTransactionsOutput { + ref mut retryable_transaction_indexes, + .. + } = execute_and_commit_transactions_output; + + retryable_transaction_indexes + .iter_mut() + .for_each(|x| *x += chunk_offset); let mut unlock_time = Measure::start("unlock_time"); // Once the accounts are new transactions can enter the pipeline to process them @@ -1000,7 +1111,7 @@ impl BankingStage { txs.len(), ); - (result, retryable_txs) + execute_and_commit_transactions_output } /// Sends transactions to the bank. @@ -1014,16 +1125,26 @@ impl BankingStage { poh: &TransactionRecorder, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - ) -> (usize, Vec) { + ) -> ProcessTransactionsSummary { let mut chunk_start = 0; - let mut unprocessed_txs = vec![]; - + let mut all_retryable_tx_indexes = vec![]; + // All the transactions that attempted execution. See description of + // struct ProcessTransactionsSummary above for possible outcomes. + let mut total_transactions_attempted_execution_count: usize = 0; + // All transactions that were executed and committed + let mut total_committed_transactions_count: usize = 0; + // All transactions that were executed and committed with a successful result + let mut total_committed_transactions_with_successful_result_count: usize = 0; + // All transactions that were executed but then failed record because the + // slot ended + let mut total_failed_commit_count: usize = 0; + let mut reached_max_poh_height = false; while chunk_start != transactions.len() { let chunk_end = std::cmp::min( transactions.len(), chunk_start + MAX_NUM_TRANSACTIONS_PER_BATCH, ); - let (result, retryable_txs_in_chunk) = Self::process_and_record_transactions( + let execute_and_commit_transactions_output = Self::process_and_record_transactions( bank, &transactions[chunk_start..chunk_end], poh, @@ -1031,17 +1152,48 @@ impl BankingStage { transaction_status_sender.clone(), gossip_vote_sender, ); - trace!("process_transactions result: {:?}", result); + + let ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count: new_transactions_attempted_execution_count, + executed_transactions_count: new_executed_transactions_count, + executed_with_successful_result_count: new_executed_with_successful_result_count, + retryable_transaction_indexes: new_retryable_transaction_indexes, + commit_transactions_result: new_commit_transactions_result, + .. + } = execute_and_commit_transactions_output; + + total_transactions_attempted_execution_count = + total_transactions_attempted_execution_count + .saturating_add(new_transactions_attempted_execution_count); + + trace!( + "process_transactions result: {:?}", + new_commit_transactions_result + ); + + if new_commit_transactions_result.is_ok() { + total_committed_transactions_count = total_committed_transactions_count + .saturating_add(new_executed_transactions_count); + total_committed_transactions_with_successful_result_count = + total_committed_transactions_with_successful_result_count + .saturating_add(new_executed_with_successful_result_count); + } else { + total_failed_commit_count = + total_failed_commit_count.saturating_add(new_executed_transactions_count); + } // Add the retryable txs (transactions that errored in a way that warrants a retry) // to the list of unprocessed txs. - unprocessed_txs.extend_from_slice(&retryable_txs_in_chunk); + all_retryable_tx_indexes.extend_from_slice(&new_retryable_transaction_indexes); // If `bank_creation_time` is None, it's a test so ignore the option so // allow processing let should_bank_still_be_processing_txs = Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot); - match (result, should_bank_still_be_processing_txs) { + match ( + new_commit_transactions_result, + should_bank_still_be_processing_txs, + ) { (Err(PohRecorderError::MaxHeightReached), _) | (_, false) => { info!( "process transactions: max height reached slot: {} height: {}", @@ -1051,7 +1203,8 @@ impl BankingStage { // process_and_record_transactions has returned all retryable errors in // transactions[chunk_start..chunk_end], so we just need to push the remaining // transactions into the unprocessed queue. - unprocessed_txs.extend(chunk_end..transactions.len()); + all_retryable_tx_indexes.extend(chunk_end..transactions.len()); + reached_max_poh_height = true; break; } _ => (), @@ -1060,7 +1213,16 @@ impl BankingStage { chunk_start = chunk_end; } - (chunk_start, unprocessed_txs) + ProcessTransactionsSummary { + reached_max_poh_height, + transactions_attempted_execution_count: total_transactions_attempted_execution_count, + committed_transactions_count: total_committed_transactions_count, + committed_transactions_with_successful_result_count: + total_committed_transactions_with_successful_result_count, + failed_commit_count: total_failed_commit_count, + retryable_transaction_indexes: all_retryable_tx_indexes, + cost_model_throttled_transactions_count: 0, + } } // This function creates a filter of transaction results with Ok() for every pending @@ -1248,9 +1410,9 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, cost_model: &Arc>, - ) -> (usize, usize, Vec) { + ) -> ProcessTransactionsSummary { let mut packet_conversion_time = Measure::start("packet_conversion"); - let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) = + let (transactions, transaction_to_packet_indexes, cost_model_throttled_packet_indexes) = Self::transactions_from_packets( packet_batch, &packet_indexes, @@ -1262,22 +1424,22 @@ impl BankingStage { cost_model, ); packet_conversion_time.stop(); + let cost_model_throttled_transactions_count = cost_model_throttled_packet_indexes.len(); inc_new_counter_info!("banking_stage-packet_conversion", 1); banking_stage_stats .cost_forced_retry_transactions_count - .fetch_add(retryable_packet_indexes.len(), Ordering::Relaxed); + .fetch_add(cost_model_throttled_packet_indexes.len(), Ordering::Relaxed); debug!( "bank: {} filtered transactions {} cost limited transactions {}", bank.slot(), transactions.len(), - retryable_packet_indexes.len() + cost_model_throttled_packet_indexes.len() ); - let tx_len = transactions.len(); - let mut process_tx_time = Measure::start("process_tx_time"); - let (processed, unprocessed_tx_indexes) = Self::process_transactions( + + let mut process_transactions_summary = Self::process_transactions( bank, bank_creation_time, &transactions, @@ -1286,16 +1448,18 @@ impl BankingStage { gossip_vote_sender, ); process_tx_time.stop(); - let unprocessed_tx_count = unprocessed_tx_indexes.len(); - inc_new_counter_info!( - "banking_stage-unprocessed_transactions", - unprocessed_tx_count - ); + + process_transactions_summary.cost_model_throttled_transactions_count = + cost_model_throttled_transactions_count; + let ProcessTransactionsSummary { + ref retryable_transaction_indexes, + .. + } = process_transactions_summary; // applying cost of processed transactions to shared cost_tracker let mut cost_tracking_time = Measure::start("cost_tracking_time"); transactions.iter().enumerate().for_each(|(index, tx)| { - if unprocessed_tx_indexes.iter().all(|&i| i != index) { + if retryable_transaction_indexes.iter().all(|&i| i != index) { bank.write_cost_tracker().unwrap().add_transaction_cost( tx.transaction(), &cost_model @@ -1308,23 +1472,26 @@ impl BankingStage { cost_tracking_time.stop(); let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); - let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( + let mut filtered_retryable_tx_indexes = Self::filter_pending_packets_from_pending_txs( bank, &transactions, &transaction_to_packet_indexes, - &unprocessed_tx_indexes, + retryable_transaction_indexes, ); filter_pending_packets_time.stop(); inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", - unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) + retryable_transaction_indexes + .len() + .saturating_sub(filtered_retryable_tx_indexes.len()) ); // combine cost-related unprocessed transactions with bank determined unprocessed for // buffering - filtered_unprocessed_packet_indexes.extend(retryable_packet_indexes); + filtered_retryable_tx_indexes.extend(cost_model_throttled_packet_indexes); + // Increment timing-based metrics banking_stage_stats .packet_conversion_elapsed .fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed); @@ -1338,7 +1505,8 @@ impl BankingStage { .filter_pending_packets_elapsed .fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed); - (processed, tx_len, filtered_unprocessed_packet_indexes) + process_transactions_summary.retryable_transaction_indexes = filtered_retryable_tx_indexes; + process_transactions_summary } fn filter_unprocessed_packets( @@ -1364,7 +1532,7 @@ impl BankingStage { let (transactions, transaction_to_packet_indexes, retry_packet_indexes) = Self::transactions_from_packets( packet_batch, - &transaction_indexes, + transaction_indexes, &bank.feature_set, &bank.read_cost_tracker().unwrap(), banking_stage_stats, @@ -1374,8 +1542,6 @@ impl BankingStage { ); unprocessed_packet_conversion_time.stop(); - let tx_count = transaction_to_packet_indexes.len(); - let unprocessed_tx_indexes = (0..transactions.len()).collect_vec(); let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( bank, @@ -1388,7 +1554,9 @@ impl BankingStage { inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", - tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) + unprocessed_tx_indexes + .len() + .saturating_sub(filtered_unprocessed_packet_indexes.len()) ); banking_stage_stats .unprocessed_packet_conversion_elapsed @@ -1471,26 +1639,31 @@ impl BankingStage { } let (bank, bank_creation_time) = bank_start.unwrap(); - let (processed, verified_txs_len, unprocessed_indexes) = - Self::process_packets_transactions( - &bank, - &bank_creation_time, - recorder, - &packet_batch, - packet_indexes, - transaction_status_sender.clone(), - gossip_vote_sender, - banking_stage_stats, - cost_model, - ); + new_tx_count += packet_indexes.len(); - new_tx_count += processed; + let process_transactions_summary = Self::process_packets_transactions( + &bank, + &bank_creation_time, + recorder, + &packet_batch, + packet_indexes, + transaction_status_sender.clone(), + gossip_vote_sender, + banking_stage_stats, + cost_model, + ); + + let ProcessTransactionsSummary { + reached_max_poh_height, + retryable_transaction_indexes, + .. + } = process_transactions_summary; // Collect any unprocessed transactions in this batch for forwarding Self::push_unprocessed( buffered_packet_batches, packet_batch, - unprocessed_indexes, + retryable_transaction_indexes, &mut dropped_packet_batches_count, &mut dropped_packets_count, &mut newly_buffered_packets_count, @@ -1499,7 +1672,7 @@ impl BankingStage { ); // If there were retryable transactions, add the unexpired ones to the buffered queue - if processed < verified_txs_len { + if reached_max_poh_height { let mut handle_retryable_packets_time = Measure::start("handle_retryable_packets"); let next_leader = poh.lock().unwrap().next_slot_leader(); // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones @@ -2357,16 +2530,26 @@ mod tests { poh_recorder.lock().unwrap().set_working_bank(working_bank); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); - BankingStage::process_and_record_transactions( + let ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + executed_with_successful_result_count, + commit_transactions_result, + .. + } = BankingStage::process_and_record_transactions( &bank, &transactions, &recorder, 0, None, &gossip_vote_sender, - ) - .0 - .unwrap(); + ); + + assert_eq!(transactions_attempted_execution_count, 1); + assert_eq!(executed_transactions_count, 1); + assert_eq!(executed_with_successful_result_count, 1); + assert!(commit_transactions_result.is_ok()); + poh_recorder.lock().unwrap().tick(); let mut done = false; @@ -2394,16 +2577,29 @@ mod tests { ) .into()]; + let ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + executed_with_successful_result_count, + retryable_transaction_indexes, + commit_transactions_result, + .. + } = BankingStage::process_and_record_transactions( + &bank, + &transactions, + &recorder, + 0, + None, + &gossip_vote_sender, + ); + + assert_eq!(transactions_attempted_execution_count, 1); + // Transactions was still executed, just wasn't committed, so should be counted here. + assert_eq!(executed_transactions_count, 1); + assert_eq!(executed_with_successful_result_count, 1); + assert_eq!(retryable_transaction_indexes, vec![0]); assert_matches!( - BankingStage::process_and_record_transactions( - &bank, - &transactions, - &recorder, - 0, - None, - &gossip_vote_sender, - ) - .0, + commit_transactions_result, Err(PohRecorderError::MaxHeightReached) ); @@ -2489,14 +2685,15 @@ mod tests { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); - let (result, unprocessed) = BankingStage::process_and_record_transactions( - &bank, - &transactions, - &recorder, - 0, - None, - &gossip_vote_sender, - ); + let execute_and_commit_transactions_output = + BankingStage::process_and_record_transactions( + &bank, + &transactions, + &recorder, + 0, + None, + &gossip_vote_sender, + ); poh_recorder .lock() @@ -2505,8 +2702,18 @@ mod tests { .store(true, Ordering::Relaxed); let _ = poh_simulator.join(); - assert!(result.is_ok()); - assert_eq!(unprocessed.len(), 1); + let ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + retryable_transaction_indexes, + commit_transactions_result, + .. + } = execute_and_commit_transactions_output; + + assert_eq!(transactions_attempted_execution_count, 2); + assert_eq!(executed_transactions_count, 1); + assert_eq!(retryable_transaction_indexes, vec![1],); + assert!(commit_transactions_result.is_ok()); } Blockstore::destroy(&ledger_path).unwrap(); } @@ -2595,21 +2802,34 @@ mod tests { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); - let (processed_transactions_count, mut retryable_txs) = - BankingStage::process_transactions( - &bank, - &Instant::now(), - &transactions, - &recorder, - None, - &gossip_vote_sender, - ); + let process_transactions_summary = BankingStage::process_transactions( + &bank, + &Instant::now(), + &transactions, + &recorder, + None, + &gossip_vote_sender, + ); - assert_eq!(processed_transactions_count, 0,); + let ProcessTransactionsSummary { + reached_max_poh_height, + transactions_attempted_execution_count, + committed_transactions_count, + committed_transactions_with_successful_result_count, + failed_commit_count, + mut retryable_transaction_indexes, + .. + } = process_transactions_summary; + assert!(reached_max_poh_height); + assert_eq!(transactions_attempted_execution_count, 1); + assert_eq!(failed_commit_count, 1); + // MaxHeightReached error does not commit, should be zero here + assert_eq!(committed_transactions_count, 0); + assert_eq!(committed_transactions_with_successful_result_count, 0); - retryable_txs.sort_unstable(); + retryable_transaction_indexes.sort_unstable(); let expected: Vec = (0..transactions.len()).collect(); - assert_eq!(retryable_txs, expected); + assert_eq!(retryable_transaction_indexes, expected); recorder.is_exited.store(true, Ordering::Relaxed); let _ = poh_simulator.join(); @@ -2618,6 +2838,167 @@ mod tests { Blockstore::destroy(&ledger_path).unwrap(); } + fn execute_transactions_with_dummy_poh_service( + bank: Arc, + transactions: Vec, + ) -> ProcessTransactionsSummary { + let transactions: Vec = + transactions.into_iter().map(|tx| tx.into()).collect(); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = + Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"); + let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + Some((4, 4)), + bank.ticks_per_slot(), + &Pubkey::new_unique(), + &Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &Arc::new(PohConfig::default()), + Arc::new(AtomicBool::default()), + ); + let recorder = poh_recorder.recorder(); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + + poh_recorder.lock().unwrap().set_bank(&bank); + + let poh_simulator = simulate_poh(record_receiver, &poh_recorder); + + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + + let process_transactions_summary = BankingStage::process_transactions( + &bank, + &Instant::now(), + &transactions, + &recorder, + None, + &gossip_vote_sender, + ); + + poh_recorder + .lock() + .unwrap() + .is_exited + .store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); + + process_transactions_summary + } + + #[test] + fn test_process_transactions_instruction_error() { + solana_logger::setup(); + let lamports = 10_000; + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_slow_genesis_config(lamports); + let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config)); + + // Transfer more than the balance of the mint keypair, should cause a + // InstructionError::InsufficientFunds that is then committed. Needs to be + // MAX_NUM_TRANSACTIONS_PER_BATCH at least so it doesn't conflict on account locks + // with the below transaction + let mut transactions = vec![ + system_transaction::transfer( + &mint_keypair, + &Pubkey::new_unique(), + lamports + 1, + genesis_config.hash(), + ); + MAX_NUM_TRANSACTIONS_PER_BATCH + ]; + + // Make one transaction that will succeed. + transactions.push(system_transaction::transfer( + &mint_keypair, + &Pubkey::new_unique(), + 1, + genesis_config.hash(), + )); + + let transactions_count = transactions.len(); + let ProcessTransactionsSummary { + reached_max_poh_height, + transactions_attempted_execution_count, + committed_transactions_count, + committed_transactions_with_successful_result_count, + failed_commit_count, + retryable_transaction_indexes, + .. + } = execute_transactions_with_dummy_poh_service(bank, transactions); + + // All the transactions should have been replayed, but only 1 committed + assert!(!reached_max_poh_height); + assert_eq!(transactions_attempted_execution_count, transactions_count); + // Both transactions should have been committed, even though one was an error, + // because InstructionErrors are committed + assert_eq!(committed_transactions_count, 2); + assert_eq!(committed_transactions_with_successful_result_count, 1); + assert_eq!(failed_commit_count, 0); + assert_eq!( + retryable_transaction_indexes, + (1..transactions_count - 1).collect::>() + ); + } + #[test] + fn test_process_transactions_account_in_use() { + solana_logger::setup(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_slow_genesis_config(10_000); + let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config)); + + // Make all repetitive transactions that conflict on the `mint_keypair`, so only 1 should be executed + let mut transactions = vec![ + system_transaction::transfer( + &mint_keypair, + &Pubkey::new_unique(), + 1, + genesis_config.hash() + ); + MAX_NUM_TRANSACTIONS_PER_BATCH + ]; + + // Make one more in separate batch that also conflicts, but because it's in a separate batch, it + // should be executed + transactions.push(system_transaction::transfer( + &mint_keypair, + &Pubkey::new_unique(), + 1, + genesis_config.hash(), + )); + + let transactions_count = transactions.len(); + let ProcessTransactionsSummary { + reached_max_poh_height, + transactions_attempted_execution_count, + committed_transactions_count, + committed_transactions_with_successful_result_count, + failed_commit_count, + retryable_transaction_indexes, + .. + } = execute_transactions_with_dummy_poh_service(bank, transactions); + + // All the transactions should have been replayed, but only 2 committed (first and last) + assert!(!reached_max_poh_height); + assert_eq!(transactions_attempted_execution_count, transactions_count); + assert_eq!(committed_transactions_count, 2); + assert_eq!(committed_transactions_with_successful_result_count, 2); + assert_eq!(failed_commit_count, 0,); + + // Everything except first and last index of the transactions failed and are last retryable + assert_eq!( + retryable_transaction_indexes, + (1..transactions_count - 1).collect::>() + ); + } + #[test] fn test_write_persist_transaction_status() { solana_logger::setup(); diff --git a/program-test/src/lib.rs b/program-test/src/lib.rs index 83fed81129..0592025c53 100644 --- a/program-test/src/lib.rs +++ b/program-test/src/lib.rs @@ -428,8 +428,9 @@ fn setup_fee_calculator(bank: Bank) -> Bank { bank.commit_transactions( &[], // transactions &mut [], // loaded accounts - &[], // transaction execution results - 0, // tx count + vec![], // transaction execution results + 0, // executed tx count + 0, // executed with failure output tx count 1, // signature count &mut ExecuteTimings::default(), ); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index c1f571d153..55561087b6 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -651,6 +651,22 @@ impl TransactionBalancesSet { } pub type TransactionBalances = Vec>; +pub struct LoadAndExecuteTransactionsOutput { + pub loaded_transactions: Vec, + // Vector of results indicating whether a transaction was executed or could not + // be executed. Note executed transactions can still have failed! + pub execution_results: Vec, + pub inner_instructions: Vec>, + pub transaction_log_messages: Vec>, + pub retryable_transaction_indexes: Vec, + // Total number of transactions that were executed + pub executed_transactions_count: usize, + // Total number of the executed transactions that returned success/not + // an error. + pub executed_with_successful_result_count: usize, + pub signature_count: u64, +} + /// An ordered list of instructions that were invoked during a transaction instruction pub type InnerInstructions = Vec; @@ -3197,15 +3213,12 @@ impl Bank { let mut timings = ExecuteTimings::default(); - let ( - loaded_txs, - executed, - _inner_instructions, - log_messages, - _retryable_transactions, - _transaction_count, - _signature_count, - ) = self.load_and_execute_transactions( + let LoadAndExecuteTransactionsOutput { + loaded_transactions, + execution_results, + transaction_log_messages, + .. + } = self.load_and_execute_transactions( &batch, // After simulation, transactions will need to be forwarded to the leader // for processing. During forwarding, the transaction could expire if the @@ -3216,9 +3229,13 @@ impl Bank { &mut timings, ); - let transaction_result = executed[0].0.clone().map(|_| ()); - let log_messages = log_messages.get(0).cloned().flatten().unwrap_or_default(); - let post_transaction_accounts = loaded_txs + let transaction_result = execution_results[0].0.clone().map(|_| ()); + let transaction_log_messages = transaction_log_messages + .get(0) + .cloned() + .flatten() + .unwrap_or_default(); + let post_transaction_accounts = loaded_transactions .into_iter() .next() .unwrap() @@ -3229,7 +3246,11 @@ impl Bank { debug!("simulate_transaction: {:?}", timings); - (transaction_result, log_messages, post_transaction_accounts) + ( + transaction_result, + transaction_log_messages, + post_transaction_accounts, + ) } pub fn unlock_accounts(&self, batch: &mut TransactionBatch) { @@ -3372,21 +3393,21 @@ impl Bank { hashed_txs: &[HashedTransaction], lock_results: &[Result<()>], max_age: usize, - mut error_counters: &mut ErrorCounters, + error_counters: &mut ErrorCounters, ) -> Vec { let age_results = self.check_age( hashed_txs.as_transactions_iter(), lock_results.to_vec(), max_age, - &mut error_counters, + error_counters, ); - let cache_results = self.check_status_cache(hashed_txs, age_results, &mut error_counters); + let cache_results = self.check_status_cache(hashed_txs, age_results, error_counters); if self.upgrade_epoch() { // Reject all non-vote transactions self.filter_by_vote_transactions( hashed_txs.as_transactions_iter(), cache_results, - &mut error_counters, + error_counters, ) } else { cache_results @@ -3623,21 +3644,13 @@ impl Bank { enable_cpi_recording: bool, enable_log_recording: bool, timings: &mut ExecuteTimings, - ) -> ( - Vec, - Vec, - Vec>, - Vec>, - Vec, - u64, - u64, - ) { + ) -> LoadAndExecuteTransactionsOutput { let hashed_txs = batch.hashed_transactions(); debug!("processing transactions: {}", hashed_txs.len()); inc_new_counter_info!("bank-process_transactions", hashed_txs.len()); let mut error_counters = ErrorCounters::default(); - let retryable_txs: Vec<_> = batch + let retryable_transaction_indexes: Vec<_> = batch .lock_results() .iter() .enumerate() @@ -3661,7 +3674,7 @@ impl Bank { check_time.stop(); let mut load_time = Measure::start("accounts_load"); - let mut loaded_txs = self.rc.accounts.load_accounts( + let mut loaded_transactions = self.rc.accounts.load_accounts( &self.ancestors, hashed_txs.as_transactions_iter(), check_results, @@ -3678,8 +3691,8 @@ impl Bank { Vec::with_capacity(hashed_txs.len()); let mut transaction_log_messages: Vec>> = Vec::with_capacity(hashed_txs.len()); - - let executed: Vec = loaded_txs + let mut executed_transactions_count: usize = 0; + let execution_results: Vec = loaded_transactions .iter_mut() .zip(hashed_txs.as_transactions_iter()) .map(|(accs, tx)| match accs { @@ -3714,6 +3727,8 @@ impl Bank { Ok(()) }; + executed_transactions_count += 1; + if process_result.is_ok() { let mut get_executors_time = Measure::start("get_executors_time"); let executors = self.get_executors( @@ -3831,12 +3846,14 @@ impl Bank { timings.load_us += load_time.as_us(); timings.execute_us += execution_time.as_us(); - let mut tx_count: u64 = 0; + let mut executed_with_successful_result_count: usize = 0; let err_count = &mut error_counters.total; let transaction_log_collector_config = self.transaction_log_collector_config.read().unwrap(); - for (i, ((r, _nonce_rollback), hashed_tx)) in executed.iter().zip(hashed_txs).enumerate() { + for (i, ((r, _nonce_rollback), hashed_tx)) in + execution_results.iter().zip(hashed_txs).enumerate() + { let tx = hashed_tx.transaction(); if let Some(debug_keys) = &self.transaction_debug_keys { for key in &tx.message.account_keys { @@ -3901,7 +3918,7 @@ impl Bank { } if r.is_ok() { - tx_count += 1; + executed_with_successful_result_count += 1; } else { if *err_count == 0 { debug!("tx error: {:?} {:?}", r, tx); @@ -3913,19 +3930,21 @@ impl Bank { debug!( "{} errors of {} txs", *err_count, - *err_count as u64 + tx_count + *err_count + executed_with_successful_result_count ); } Self::update_error_counters(&error_counters); - ( - loaded_txs, - executed, + + LoadAndExecuteTransactionsOutput { + loaded_transactions, + execution_results, inner_instructions, transaction_log_messages, - retryable_txs, - tx_count, + retryable_transaction_indexes, + executed_transactions_count, + executed_with_successful_result_count, signature_count, - ) + } } fn filter_program_errors_and_collect_fee<'a>( @@ -3983,12 +4002,17 @@ impl Bank { results } + /// `committed_transactions_count` is the number of transactions out of `sanitized_txs` + /// that was executed. Of those, `committed_transactions_count`, + /// `committed_with_failure_result_count` is the number of executed transactions that returned + /// a failure result. pub fn commit_transactions( &self, hashed_txs: &[HashedTransaction], loaded_txs: &mut [TransactionLoadResult], - executed: &[TransactionExecutionResult], - tx_count: u64, + execution_results: Vec, + committed_transactions_count: u64, + committed_with_failure_result_count: u64, signature_count: u64, timings: &mut ExecuteTimings, ) -> TransactionResults { @@ -3997,34 +4021,42 @@ impl Bank { "commit_transactions() working on a bank that is already frozen or is undergoing freezing!" ); + let tx_count = if self.bank_tranaction_count_fix_enabled() { + committed_transactions_count + } else { + committed_transactions_count.saturating_sub(committed_with_failure_result_count) + }; + self.increment_transaction_count(tx_count); self.increment_signature_count(signature_count); - inc_new_counter_info!("bank-process_transactions-txs", tx_count as usize); + inc_new_counter_info!( + "bank-process_transactions-txs", + committed_transactions_count as usize + ); inc_new_counter_info!("bank-process_transactions-sigs", signature_count as usize); - if !hashed_txs.is_empty() { - let processed_tx_count = hashed_txs.len() as u64; - let failed_tx_count = processed_tx_count.saturating_sub(tx_count); + if committed_with_failure_result_count > 0 { self.transaction_error_count - .fetch_add(failed_tx_count, Relaxed); - self.transaction_entries_count.fetch_add(1, Relaxed); - self.transactions_per_entry_max - .fetch_max(processed_tx_count, Relaxed); + .fetch_add(committed_with_failure_result_count, Relaxed); } - if executed + // Should be equivalent to checking `committed_transactions_count > 0` + if execution_results .iter() .any(|(res, _nonce_rollback)| Self::can_commit(res)) { self.is_delta.store(true, Relaxed); + self.transaction_entries_count.fetch_add(1, Relaxed); + self.transactions_per_entry_max + .fetch_max(committed_transactions_count, Relaxed); } let mut write_time = Measure::start("write_time"); self.rc.accounts.store_cached( self.slot(), hashed_txs.as_transactions_iter(), - executed, + &execution_results, loaded_txs, &self.rent_collector, &self.last_blockhash_with_fee_calculator(), @@ -4033,10 +4065,14 @@ impl Bank { self.merge_nonce_error_into_system_error(), self.demote_program_write_locks(), ); - let rent_debits = self.collect_rent(executed, loaded_txs); + let rent_debits = self.collect_rent(&execution_results, loaded_txs); let mut update_stakes_cache_time = Measure::start("update_stakes_cache_time"); - self.update_stakes_cache(hashed_txs.as_transactions_iter(), executed, loaded_txs); + self.update_stakes_cache( + hashed_txs.as_transactions_iter(), + &execution_results, + loaded_txs, + ); update_stakes_cache_time.stop(); // once committed there is no way to unroll @@ -4048,13 +4084,15 @@ impl Bank { ); timings.store_us += write_time.as_us(); timings.update_stakes_cache_us += update_stakes_cache_time.as_us(); - self.update_transaction_statuses(hashed_txs, executed); - let fee_collection_results = - self.filter_program_errors_and_collect_fee(hashed_txs.as_transactions_iter(), executed); + self.update_transaction_statuses(hashed_txs, &execution_results); + let fee_collection_results = self.filter_program_errors_and_collect_fee( + hashed_txs.as_transactions_iter(), + &execution_results, + ); TransactionResults { fee_collection_results, - execution_results: executed.to_vec(), + execution_results, rent_debits, } } @@ -4666,15 +4704,16 @@ impl Bank { vec![] }; - let ( - mut loaded_txs, - executed, + let LoadAndExecuteTransactionsOutput { + mut loaded_transactions, + execution_results, inner_instructions, - transaction_logs, - _, - tx_count, + transaction_log_messages, + executed_transactions_count, + executed_with_successful_result_count, signature_count, - ) = self.load_and_execute_transactions( + .. + } = self.load_and_execute_transactions( batch, max_age, enable_cpi_recording, @@ -4684,9 +4723,11 @@ impl Bank { let results = self.commit_transactions( batch.hashed_transactions(), - &mut loaded_txs, - &executed, - tx_count, + &mut loaded_transactions, + execution_results, + executed_transactions_count as u64, + executed_transactions_count.saturating_sub(executed_with_successful_result_count) + as u64, signature_count, timings, ); @@ -4699,7 +4740,7 @@ impl Bank { results, TransactionBalancesSet::new(pre_balances, post_balances), inner_instructions, - transaction_logs, + transaction_log_messages, ) } @@ -5641,6 +5682,11 @@ impl Bank { consumed_budget.saturating_sub(budget_recovery_delta) } + pub fn bank_tranaction_count_fix_enabled(&self) -> bool { + self.feature_set + .is_active(&feature_set::bank_tranaction_count_fix::id()) + } + pub fn shrink_candidate_slots(&self) -> usize { self.rc.accounts.accounts_db.shrink_candidate_slots() } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 35f796834f..eec4f0b5b2 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -289,6 +289,10 @@ pub mod spl_associated_token_account_v1_0_4 { solana_sdk::declare_id!("FaTa4SpiaSNH44PGC4z8bnGVTkSRYaWvrBs3KTu8XQQq"); } +pub mod bank_tranaction_count_fix { + solana_sdk::declare_id!("Vo5siZ442SaZBKPXNocthiXysNviW4UYPwRFggmbgAp"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -361,6 +365,7 @@ lazy_static! { (reject_non_rent_exempt_vote_withdraws::id(), "fail vote withdraw instructions which leave the account non-rent-exempt"), (evict_invalid_stakes_cache_entries::id(), "evict invalid stakes cache entries on epoch boundaries"), (spl_associated_token_account_v1_0_4::id(), "SPL Associated Token Account Program release version 1.0.4, tied to token 3.3.0 #22648"), + (bank_tranaction_count_fix::id(), "Fixes Bank::transaction_count to include all committed transactions, not just successful ones"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()