Return actual committed transactions from process_transactions() (backport #22802) (#22904)

* Return actual committed transactions from process_transactions()

* resolve conflicts

* fixup comment

* Fixup banking_stage-dropped_tx_before_forwarding counter

* Count cost model throttled transactions

* fixup tx_count moved

* Fixup tests

* remove qos service

* Cleanup clippy

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
mergify[bot]
2022-02-04 03:52:11 +00:00
committed by GitHub
parent 430cdf679e
commit 70ec2cd244
4 changed files with 643 additions and 210 deletions

View File

@ -19,7 +19,10 @@ use {
solana_poh::poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder},
solana_runtime::{
accounts_db::ErrorCounters,
bank::{Bank, TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult},
bank::{
Bank, LoadAndExecuteTransactionsOutput, TransactionBalancesSet, TransactionCheckResult,
TransactionExecutionResult,
},
bank_utils,
cost_model::CostModel,
cost_tracker::CostTracker,
@ -80,6 +83,65 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
const MIN_THREADS_BANKING: u32 = 1;
/// A summary of what happened to transactions passed to the execution pipeline.
/// Transactions can
/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse
/// lock conflictss or CostModel compute limits. These types of errors are retryable and
/// counted in `Self::retryable_transaction_indexes`.
/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These
/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes`
/// 3) Were executed and committed, captured by `committed_transactions_count` below.
/// 4) Were executed and failed commit, captured by `failed_commit_count` below.
struct ProcessTransactionsSummary {
// Returns true if we hit the end of the block/max PoH height for the block before
// processing all the transactions in the batch.
reached_max_poh_height: bool,
// Total number of transactions that were passed as candidates for execution. See description
// of struct above for possible outcomes for these transactions
#[allow(dead_code)]
transactions_attempted_execution_count: usize,
// Total number of transactions that made it into the block
#[allow(dead_code)]
committed_transactions_count: usize,
// Total number of transactions that made it into the block where the transactions
// output from execution was success/no error.
#[allow(dead_code)]
committed_transactions_with_successful_result_count: usize,
// All transactions that were executed but then failed record because the
// slot ended
#[allow(dead_code)]
failed_commit_count: usize,
// Indexes of transactions in the transactions slice that were not committed but are retryable
retryable_transaction_indexes: Vec<usize>,
// The number of transactions filtered out by the cost model
#[allow(dead_code)]
cost_model_throttled_transactions_count: usize,
}
pub struct ExecuteAndCommitTransactionsOutput {
// Total number of transactions that were passed as candidates for execution
transactions_attempted_execution_count: usize,
// The number of transactions of that were executed. See description of in `ProcessTransactionsSummary`
// for possible outcomes of execution.
executed_transactions_count: usize,
// Total number of the executed transactions that returned success/not
// an error.
executed_with_successful_result_count: usize,
// Transactions that either were not executed, or were executed and failed to be committed due
// to the block ending.
retryable_transaction_indexes: Vec<usize>,
// A result that indicates whether transactions were successfully
// committed into the Poh stream. If so, the result tells us
// how many such transactions were committed
commit_transactions_result: Result<(), PohRecorderError>,
}
#[derive(Debug, Default)]
pub struct BankingStageStats {
last_report: AtomicInterval,
@ -458,7 +520,7 @@ impl BankingStage {
cost_model: &Arc<RwLock<CostModel>>,
) {
let mut rebuffered_packet_count = 0;
let mut new_tx_count = 0;
let mut consumed_buffered_packets_count = 0;
let buffered_packet_batches_len = buffered_packet_batches.len();
let mut proc_start = Measure::start("consume_buffered_process");
let mut reached_end_of_slot = None;
@ -485,19 +547,23 @@ impl BankingStage {
} else {
let bank_start = poh_recorder.lock().unwrap().bank_start();
if let Some((bank, bank_creation_time)) = bank_start {
let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_packets_transactions(
&bank,
&bank_creation_time,
recorder,
packet_batch,
original_unprocessed_indexes.to_owned(),
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
cost_model,
);
if processed < verified_txs_len
let process_transactions_summary = Self::process_packets_transactions(
&bank,
&bank_creation_time,
recorder,
packet_batch,
original_unprocessed_indexes.to_owned(),
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
cost_model,
);
let ProcessTransactionsSummary {
reached_max_poh_height,
retryable_transaction_indexes,
..
} = process_transactions_summary;
if reached_max_poh_height
|| !Bank::should_bank_still_be_processing_txs(
&bank_creation_time,
max_tx_ingestion_ns,
@ -506,21 +572,30 @@ impl BankingStage {
reached_end_of_slot =
Some((poh_recorder.lock().unwrap().next_slot_leader(), bank));
}
new_tx_count += processed;
// The difference between all transactions passed to execution and the ones that
// are retryable were the ones that were either:
// 1) Committed into the block
// 2) Dropped without being committed because they had some fatal error (too old,
// duplicate signature, etc.)
//
// Note: This assumes that every packet deserializes into one transaction!
consumed_buffered_packets_count += original_unprocessed_indexes
.len()
.saturating_sub(retryable_transaction_indexes.len());
// Out of the buffered packets just retried, collect any still unprocessed
// transactions in this batch for forwarding
rebuffered_packet_count += new_unprocessed_indexes.len();
rebuffered_packet_count += retryable_transaction_indexes.len();
let has_more_unprocessed_transactions =
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
new_unprocessed_indexes,
retryable_transaction_indexes,
);
if let Some(test_fn) = &test_fn {
test_fn();
}
has_more_unprocessed_transactions
} else {
rebuffered_packet_count += original_unprocessed_indexes.len();
// `original_unprocessed_indexes` must have remaining packets to process
// if not yet processed.
assert!(Self::packet_has_more_unprocessed_transactions(
@ -538,8 +613,8 @@ impl BankingStage {
timestamp(),
buffered_packet_batches_len,
proc_start.as_ms(),
new_tx_count,
(new_tx_count as f32) / (proc_start.as_s())
consumed_buffered_packets_count,
(consumed_buffered_packets_count as f32) / (proc_start.as_s())
);
banking_stage_stats
@ -550,7 +625,7 @@ impl BankingStage {
.fetch_add(rebuffered_packet_count, Ordering::Relaxed);
banking_stage_stats
.consumed_buffered_packets_count
.fetch_add(new_tx_count, Ordering::Relaxed);
.fetch_add(consumed_buffered_packets_count, Ordering::Relaxed);
}
fn consume_or_forward_packets(
@ -850,13 +925,13 @@ impl BankingStage {
(Ok(num_to_commit), vec![])
}
fn process_and_record_transactions_locked(
fn execute_and_commit_transactions_locked(
bank: &Arc<Bank>,
poh: &TransactionRecorder,
batch: &TransactionBatch,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
) -> ExecuteAndCommitTransactionsOutput {
let mut load_execute_time = Measure::start("load_execute_time");
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids.
@ -877,15 +952,7 @@ impl BankingStage {
};
let mut execute_timings = ExecuteTimings::default();
let (
mut loaded_accounts,
results,
inner_instructions,
transaction_logs,
mut retryable_txs,
tx_count,
signature_count,
) = bank.load_and_execute_transactions(
let load_and_execute_transactions_output = bank.load_and_execute_transactions(
batch,
MAX_PROCESSING_AGE,
transaction_status_sender.is_some(),
@ -894,34 +961,60 @@ impl BankingStage {
);
load_execute_time.stop();
let LoadAndExecuteTransactionsOutput {
mut loaded_transactions,
execution_results,
inner_instructions,
transaction_log_messages,
mut retryable_transaction_indexes,
executed_transactions_count,
executed_with_successful_result_count,
signature_count,
..
} = load_and_execute_transactions_output;
let freeze_lock = bank.freeze_lock();
let mut record_time = Measure::start("record_time");
let (num_to_commit, retryable_record_txs) =
Self::record_transactions(bank.slot(), batch.transactions_iter(), &results, poh);
let (commit_transactions_result, retryable_record_transaction_indexes) =
Self::record_transactions(
bank.slot(),
batch.transactions_iter(),
&execution_results,
poh,
);
inc_new_counter_info!(
"banking_stage-record_transactions_num_to_commit",
*num_to_commit.as_ref().unwrap_or(&0)
*commit_transactions_result.as_ref().unwrap_or(&0)
);
inc_new_counter_info!(
"banking_stage-record_transactions_retryable_record_txs",
retryable_record_txs.len()
retryable_record_transaction_indexes.len()
);
retryable_txs.extend(retryable_record_txs);
if num_to_commit.is_err() {
return (num_to_commit, retryable_txs);
retryable_transaction_indexes.extend(retryable_record_transaction_indexes);
let transactions_attempted_execution_count = execution_results.len();
if let Err(e) = commit_transactions_result {
return ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
executed_with_successful_result_count,
retryable_transaction_indexes,
commit_transactions_result: Err(e),
};
}
record_time.stop();
let mut commit_time = Measure::start("commit_time");
let hashed_txs = batch.hashed_transactions();
let num_to_commit = num_to_commit.unwrap();
if num_to_commit != 0 {
let committed_transaction_count = commit_transactions_result.unwrap();
if committed_transaction_count != 0 {
let tx_results = bank.commit_transactions(
hashed_txs,
&mut loaded_accounts,
&results,
tx_count,
&mut loaded_transactions,
execution_results,
executed_transactions_count as u64,
executed_transactions_count.saturating_sub(executed_with_successful_result_count)
as u64,
signature_count,
&mut execute_timings,
);
@ -938,7 +1031,7 @@ impl BankingStage {
TransactionBalancesSet::new(pre_balances, post_balances),
TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances),
inner_instructions,
transaction_logs,
transaction_log_messages,
tx_results.rent_debits,
);
}
@ -957,11 +1050,17 @@ impl BankingStage {
);
debug!(
"process_and_record_transactions_locked: {:?}",
"execute_and_commit_transactions_locked: {:?}",
execute_timings
);
(Ok(num_to_commit), retryable_txs)
ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
executed_with_successful_result_count,
retryable_transaction_indexes,
commit_transactions_result: Ok(()),
}
}
pub fn process_and_record_transactions(
@ -971,21 +1070,33 @@ impl BankingStage {
chunk_offset: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
) -> ExecuteAndCommitTransactionsOutput {
let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let batch = bank.prepare_hashed_batch(txs);
lock_time.stop();
let (result, mut retryable_txs) = Self::process_and_record_transactions_locked(
bank,
poh,
&batch,
transaction_status_sender,
gossip_vote_sender,
);
retryable_txs.iter_mut().for_each(|x| *x += chunk_offset);
// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
// WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit
// and WouldExceedMaxAccountDataCostLimit
let mut execute_and_commit_transactions_output =
Self::execute_and_commit_transactions_locked(
bank,
poh,
&batch,
transaction_status_sender,
gossip_vote_sender,
);
let ExecuteAndCommitTransactionsOutput {
ref mut retryable_transaction_indexes,
..
} = execute_and_commit_transactions_output;
retryable_transaction_indexes
.iter_mut()
.for_each(|x| *x += chunk_offset);
let mut unlock_time = Measure::start("unlock_time");
// Once the accounts are new transactions can enter the pipeline to process them
@ -1000,7 +1111,7 @@ impl BankingStage {
txs.len(),
);
(result, retryable_txs)
execute_and_commit_transactions_output
}
/// Sends transactions to the bank.
@ -1014,16 +1125,26 @@ impl BankingStage {
poh: &TransactionRecorder,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (usize, Vec<usize>) {
) -> ProcessTransactionsSummary {
let mut chunk_start = 0;
let mut unprocessed_txs = vec![];
let mut all_retryable_tx_indexes = vec![];
// All the transactions that attempted execution. See description of
// struct ProcessTransactionsSummary above for possible outcomes.
let mut total_transactions_attempted_execution_count: usize = 0;
// All transactions that were executed and committed
let mut total_committed_transactions_count: usize = 0;
// All transactions that were executed and committed with a successful result
let mut total_committed_transactions_with_successful_result_count: usize = 0;
// All transactions that were executed but then failed record because the
// slot ended
let mut total_failed_commit_count: usize = 0;
let mut reached_max_poh_height = false;
while chunk_start != transactions.len() {
let chunk_end = std::cmp::min(
transactions.len(),
chunk_start + MAX_NUM_TRANSACTIONS_PER_BATCH,
);
let (result, retryable_txs_in_chunk) = Self::process_and_record_transactions(
let execute_and_commit_transactions_output = Self::process_and_record_transactions(
bank,
&transactions[chunk_start..chunk_end],
poh,
@ -1031,17 +1152,48 @@ impl BankingStage {
transaction_status_sender.clone(),
gossip_vote_sender,
);
trace!("process_transactions result: {:?}", result);
let ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count: new_transactions_attempted_execution_count,
executed_transactions_count: new_executed_transactions_count,
executed_with_successful_result_count: new_executed_with_successful_result_count,
retryable_transaction_indexes: new_retryable_transaction_indexes,
commit_transactions_result: new_commit_transactions_result,
..
} = execute_and_commit_transactions_output;
total_transactions_attempted_execution_count =
total_transactions_attempted_execution_count
.saturating_add(new_transactions_attempted_execution_count);
trace!(
"process_transactions result: {:?}",
new_commit_transactions_result
);
if new_commit_transactions_result.is_ok() {
total_committed_transactions_count = total_committed_transactions_count
.saturating_add(new_executed_transactions_count);
total_committed_transactions_with_successful_result_count =
total_committed_transactions_with_successful_result_count
.saturating_add(new_executed_with_successful_result_count);
} else {
total_failed_commit_count =
total_failed_commit_count.saturating_add(new_executed_transactions_count);
}
// Add the retryable txs (transactions that errored in a way that warrants a retry)
// to the list of unprocessed txs.
unprocessed_txs.extend_from_slice(&retryable_txs_in_chunk);
all_retryable_tx_indexes.extend_from_slice(&new_retryable_transaction_indexes);
// If `bank_creation_time` is None, it's a test so ignore the option so
// allow processing
let should_bank_still_be_processing_txs =
Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot);
match (result, should_bank_still_be_processing_txs) {
match (
new_commit_transactions_result,
should_bank_still_be_processing_txs,
) {
(Err(PohRecorderError::MaxHeightReached), _) | (_, false) => {
info!(
"process transactions: max height reached slot: {} height: {}",
@ -1051,7 +1203,8 @@ impl BankingStage {
// process_and_record_transactions has returned all retryable errors in
// transactions[chunk_start..chunk_end], so we just need to push the remaining
// transactions into the unprocessed queue.
unprocessed_txs.extend(chunk_end..transactions.len());
all_retryable_tx_indexes.extend(chunk_end..transactions.len());
reached_max_poh_height = true;
break;
}
_ => (),
@ -1060,7 +1213,16 @@ impl BankingStage {
chunk_start = chunk_end;
}
(chunk_start, unprocessed_txs)
ProcessTransactionsSummary {
reached_max_poh_height,
transactions_attempted_execution_count: total_transactions_attempted_execution_count,
committed_transactions_count: total_committed_transactions_count,
committed_transactions_with_successful_result_count:
total_committed_transactions_with_successful_result_count,
failed_commit_count: total_failed_commit_count,
retryable_transaction_indexes: all_retryable_tx_indexes,
cost_model_throttled_transactions_count: 0,
}
}
// This function creates a filter of transaction results with Ok() for every pending
@ -1248,9 +1410,9 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
cost_model: &Arc<RwLock<CostModel>>,
) -> (usize, usize, Vec<usize>) {
) -> ProcessTransactionsSummary {
let mut packet_conversion_time = Measure::start("packet_conversion");
let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) =
let (transactions, transaction_to_packet_indexes, cost_model_throttled_packet_indexes) =
Self::transactions_from_packets(
packet_batch,
&packet_indexes,
@ -1262,22 +1424,22 @@ impl BankingStage {
cost_model,
);
packet_conversion_time.stop();
let cost_model_throttled_transactions_count = cost_model_throttled_packet_indexes.len();
inc_new_counter_info!("banking_stage-packet_conversion", 1);
banking_stage_stats
.cost_forced_retry_transactions_count
.fetch_add(retryable_packet_indexes.len(), Ordering::Relaxed);
.fetch_add(cost_model_throttled_packet_indexes.len(), Ordering::Relaxed);
debug!(
"bank: {} filtered transactions {} cost limited transactions {}",
bank.slot(),
transactions.len(),
retryable_packet_indexes.len()
cost_model_throttled_packet_indexes.len()
);
let tx_len = transactions.len();
let mut process_tx_time = Measure::start("process_tx_time");
let (processed, unprocessed_tx_indexes) = Self::process_transactions(
let mut process_transactions_summary = Self::process_transactions(
bank,
bank_creation_time,
&transactions,
@ -1286,16 +1448,18 @@ impl BankingStage {
gossip_vote_sender,
);
process_tx_time.stop();
let unprocessed_tx_count = unprocessed_tx_indexes.len();
inc_new_counter_info!(
"banking_stage-unprocessed_transactions",
unprocessed_tx_count
);
process_transactions_summary.cost_model_throttled_transactions_count =
cost_model_throttled_transactions_count;
let ProcessTransactionsSummary {
ref retryable_transaction_indexes,
..
} = process_transactions_summary;
// applying cost of processed transactions to shared cost_tracker
let mut cost_tracking_time = Measure::start("cost_tracking_time");
transactions.iter().enumerate().for_each(|(index, tx)| {
if unprocessed_tx_indexes.iter().all(|&i| i != index) {
if retryable_transaction_indexes.iter().all(|&i| i != index) {
bank.write_cost_tracker().unwrap().add_transaction_cost(
tx.transaction(),
&cost_model
@ -1308,23 +1472,26 @@ impl BankingStage {
cost_tracking_time.stop();
let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs(
let mut filtered_retryable_tx_indexes = Self::filter_pending_packets_from_pending_txs(
bank,
&transactions,
&transaction_to_packet_indexes,
&unprocessed_tx_indexes,
retryable_transaction_indexes,
);
filter_pending_packets_time.stop();
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len())
retryable_transaction_indexes
.len()
.saturating_sub(filtered_retryable_tx_indexes.len())
);
// combine cost-related unprocessed transactions with bank determined unprocessed for
// buffering
filtered_unprocessed_packet_indexes.extend(retryable_packet_indexes);
filtered_retryable_tx_indexes.extend(cost_model_throttled_packet_indexes);
// Increment timing-based metrics
banking_stage_stats
.packet_conversion_elapsed
.fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed);
@ -1338,7 +1505,8 @@ impl BankingStage {
.filter_pending_packets_elapsed
.fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed);
(processed, tx_len, filtered_unprocessed_packet_indexes)
process_transactions_summary.retryable_transaction_indexes = filtered_retryable_tx_indexes;
process_transactions_summary
}
fn filter_unprocessed_packets(
@ -1364,7 +1532,7 @@ impl BankingStage {
let (transactions, transaction_to_packet_indexes, retry_packet_indexes) =
Self::transactions_from_packets(
packet_batch,
&transaction_indexes,
transaction_indexes,
&bank.feature_set,
&bank.read_cost_tracker().unwrap(),
banking_stage_stats,
@ -1374,8 +1542,6 @@ impl BankingStage {
);
unprocessed_packet_conversion_time.stop();
let tx_count = transaction_to_packet_indexes.len();
let unprocessed_tx_indexes = (0..transactions.len()).collect_vec();
let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs(
bank,
@ -1388,7 +1554,9 @@ impl BankingStage {
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len())
unprocessed_tx_indexes
.len()
.saturating_sub(filtered_unprocessed_packet_indexes.len())
);
banking_stage_stats
.unprocessed_packet_conversion_elapsed
@ -1471,26 +1639,31 @@ impl BankingStage {
}
let (bank, bank_creation_time) = bank_start.unwrap();
let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_packets_transactions(
&bank,
&bank_creation_time,
recorder,
&packet_batch,
packet_indexes,
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
cost_model,
);
new_tx_count += packet_indexes.len();
new_tx_count += processed;
let process_transactions_summary = Self::process_packets_transactions(
&bank,
&bank_creation_time,
recorder,
&packet_batch,
packet_indexes,
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
cost_model,
);
let ProcessTransactionsSummary {
reached_max_poh_height,
retryable_transaction_indexes,
..
} = process_transactions_summary;
// Collect any unprocessed transactions in this batch for forwarding
Self::push_unprocessed(
buffered_packet_batches,
packet_batch,
unprocessed_indexes,
retryable_transaction_indexes,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
@ -1499,7 +1672,7 @@ impl BankingStage {
);
// If there were retryable transactions, add the unexpired ones to the buffered queue
if processed < verified_txs_len {
if reached_max_poh_height {
let mut handle_retryable_packets_time = Measure::start("handle_retryable_packets");
let next_leader = poh.lock().unwrap().next_slot_leader();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
@ -2357,16 +2530,26 @@ mod tests {
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
BankingStage::process_and_record_transactions(
let ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
executed_with_successful_result_count,
commit_transactions_result,
..
} = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&recorder,
0,
None,
&gossip_vote_sender,
)
.0
.unwrap();
);
assert_eq!(transactions_attempted_execution_count, 1);
assert_eq!(executed_transactions_count, 1);
assert_eq!(executed_with_successful_result_count, 1);
assert!(commit_transactions_result.is_ok());
poh_recorder.lock().unwrap().tick();
let mut done = false;
@ -2394,16 +2577,29 @@ mod tests {
)
.into()];
let ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
executed_with_successful_result_count,
retryable_transaction_indexes,
commit_transactions_result,
..
} = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&recorder,
0,
None,
&gossip_vote_sender,
);
assert_eq!(transactions_attempted_execution_count, 1);
// Transactions was still executed, just wasn't committed, so should be counted here.
assert_eq!(executed_transactions_count, 1);
assert_eq!(executed_with_successful_result_count, 1);
assert_eq!(retryable_transaction_indexes, vec![0]);
assert_matches!(
BankingStage::process_and_record_transactions(
&bank,
&transactions,
&recorder,
0,
None,
&gossip_vote_sender,
)
.0,
commit_transactions_result,
Err(PohRecorderError::MaxHeightReached)
);
@ -2489,14 +2685,15 @@ mod tests {
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let (result, unprocessed) = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&recorder,
0,
None,
&gossip_vote_sender,
);
let execute_and_commit_transactions_output =
BankingStage::process_and_record_transactions(
&bank,
&transactions,
&recorder,
0,
None,
&gossip_vote_sender,
);
poh_recorder
.lock()
@ -2505,8 +2702,18 @@ mod tests {
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
assert!(result.is_ok());
assert_eq!(unprocessed.len(), 1);
let ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
retryable_transaction_indexes,
commit_transactions_result,
..
} = execute_and_commit_transactions_output;
assert_eq!(transactions_attempted_execution_count, 2);
assert_eq!(executed_transactions_count, 1);
assert_eq!(retryable_transaction_indexes, vec![1],);
assert!(commit_transactions_result.is_ok());
}
Blockstore::destroy(&ledger_path).unwrap();
}
@ -2595,21 +2802,34 @@ mod tests {
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let (processed_transactions_count, mut retryable_txs) =
BankingStage::process_transactions(
&bank,
&Instant::now(),
&transactions,
&recorder,
None,
&gossip_vote_sender,
);
let process_transactions_summary = BankingStage::process_transactions(
&bank,
&Instant::now(),
&transactions,
&recorder,
None,
&gossip_vote_sender,
);
assert_eq!(processed_transactions_count, 0,);
let ProcessTransactionsSummary {
reached_max_poh_height,
transactions_attempted_execution_count,
committed_transactions_count,
committed_transactions_with_successful_result_count,
failed_commit_count,
mut retryable_transaction_indexes,
..
} = process_transactions_summary;
assert!(reached_max_poh_height);
assert_eq!(transactions_attempted_execution_count, 1);
assert_eq!(failed_commit_count, 1);
// MaxHeightReached error does not commit, should be zero here
assert_eq!(committed_transactions_count, 0);
assert_eq!(committed_transactions_with_successful_result_count, 0);
retryable_txs.sort_unstable();
retryable_transaction_indexes.sort_unstable();
let expected: Vec<usize> = (0..transactions.len()).collect();
assert_eq!(retryable_txs, expected);
assert_eq!(retryable_transaction_indexes, expected);
recorder.is_exited.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
@ -2618,6 +2838,167 @@ mod tests {
Blockstore::destroy(&ledger_path).unwrap();
}
fn execute_transactions_with_dummy_poh_service(
bank: Arc<Bank>,
transactions: Vec<Transaction>,
) -> ProcessTransactionsSummary {
let transactions: Vec<HashedTransaction> =
transactions.into_iter().map(|tx| tx.into()).collect();
let ledger_path = get_tmp_ledger_path!();
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
Some((4, 4)),
bank.ticks_per_slot(),
&Pubkey::new_unique(),
&Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_bank(&bank);
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let process_transactions_summary = BankingStage::process_transactions(
&bank,
&Instant::now(),
&transactions,
&recorder,
None,
&gossip_vote_sender,
);
poh_recorder
.lock()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
process_transactions_summary
}
#[test]
fn test_process_transactions_instruction_error() {
solana_logger::setup();
let lamports = 10_000;
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_slow_genesis_config(lamports);
let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config));
// Transfer more than the balance of the mint keypair, should cause a
// InstructionError::InsufficientFunds that is then committed. Needs to be
// MAX_NUM_TRANSACTIONS_PER_BATCH at least so it doesn't conflict on account locks
// with the below transaction
let mut transactions = vec![
system_transaction::transfer(
&mint_keypair,
&Pubkey::new_unique(),
lamports + 1,
genesis_config.hash(),
);
MAX_NUM_TRANSACTIONS_PER_BATCH
];
// Make one transaction that will succeed.
transactions.push(system_transaction::transfer(
&mint_keypair,
&Pubkey::new_unique(),
1,
genesis_config.hash(),
));
let transactions_count = transactions.len();
let ProcessTransactionsSummary {
reached_max_poh_height,
transactions_attempted_execution_count,
committed_transactions_count,
committed_transactions_with_successful_result_count,
failed_commit_count,
retryable_transaction_indexes,
..
} = execute_transactions_with_dummy_poh_service(bank, transactions);
// All the transactions should have been replayed, but only 1 committed
assert!(!reached_max_poh_height);
assert_eq!(transactions_attempted_execution_count, transactions_count);
// Both transactions should have been committed, even though one was an error,
// because InstructionErrors are committed
assert_eq!(committed_transactions_count, 2);
assert_eq!(committed_transactions_with_successful_result_count, 1);
assert_eq!(failed_commit_count, 0);
assert_eq!(
retryable_transaction_indexes,
(1..transactions_count - 1).collect::<Vec<usize>>()
);
}
#[test]
fn test_process_transactions_account_in_use() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_slow_genesis_config(10_000);
let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config));
// Make all repetitive transactions that conflict on the `mint_keypair`, so only 1 should be executed
let mut transactions = vec![
system_transaction::transfer(
&mint_keypair,
&Pubkey::new_unique(),
1,
genesis_config.hash()
);
MAX_NUM_TRANSACTIONS_PER_BATCH
];
// Make one more in separate batch that also conflicts, but because it's in a separate batch, it
// should be executed
transactions.push(system_transaction::transfer(
&mint_keypair,
&Pubkey::new_unique(),
1,
genesis_config.hash(),
));
let transactions_count = transactions.len();
let ProcessTransactionsSummary {
reached_max_poh_height,
transactions_attempted_execution_count,
committed_transactions_count,
committed_transactions_with_successful_result_count,
failed_commit_count,
retryable_transaction_indexes,
..
} = execute_transactions_with_dummy_poh_service(bank, transactions);
// All the transactions should have been replayed, but only 2 committed (first and last)
assert!(!reached_max_poh_height);
assert_eq!(transactions_attempted_execution_count, transactions_count);
assert_eq!(committed_transactions_count, 2);
assert_eq!(committed_transactions_with_successful_result_count, 2);
assert_eq!(failed_commit_count, 0,);
// Everything except first and last index of the transactions failed and are last retryable
assert_eq!(
retryable_transaction_indexes,
(1..transactions_count - 1).collect::<Vec<usize>>()
);
}
#[test]
fn test_write_persist_transaction_status() {
solana_logger::setup();

View File

@ -428,8 +428,9 @@ fn setup_fee_calculator(bank: Bank) -> Bank {
bank.commit_transactions(
&[], // transactions
&mut [], // loaded accounts
&[], // transaction execution results
0, // tx count
vec![], // transaction execution results
0, // executed tx count
0, // executed with failure output tx count
1, // signature count
&mut ExecuteTimings::default(),
);

View File

@ -651,6 +651,22 @@ impl TransactionBalancesSet {
}
pub type TransactionBalances = Vec<Vec<u64>>;
pub struct LoadAndExecuteTransactionsOutput {
pub loaded_transactions: Vec<TransactionLoadResult>,
// Vector of results indicating whether a transaction was executed or could not
// be executed. Note executed transactions can still have failed!
pub execution_results: Vec<TransactionExecutionResult>,
pub inner_instructions: Vec<Option<InnerInstructionsList>>,
pub transaction_log_messages: Vec<Option<TransactionLogMessages>>,
pub retryable_transaction_indexes: Vec<usize>,
// Total number of transactions that were executed
pub executed_transactions_count: usize,
// Total number of the executed transactions that returned success/not
// an error.
pub executed_with_successful_result_count: usize,
pub signature_count: u64,
}
/// An ordered list of instructions that were invoked during a transaction instruction
pub type InnerInstructions = Vec<CompiledInstruction>;
@ -3197,15 +3213,12 @@ impl Bank {
let mut timings = ExecuteTimings::default();
let (
loaded_txs,
executed,
_inner_instructions,
log_messages,
_retryable_transactions,
_transaction_count,
_signature_count,
) = self.load_and_execute_transactions(
let LoadAndExecuteTransactionsOutput {
loaded_transactions,
execution_results,
transaction_log_messages,
..
} = self.load_and_execute_transactions(
&batch,
// After simulation, transactions will need to be forwarded to the leader
// for processing. During forwarding, the transaction could expire if the
@ -3216,9 +3229,13 @@ impl Bank {
&mut timings,
);
let transaction_result = executed[0].0.clone().map(|_| ());
let log_messages = log_messages.get(0).cloned().flatten().unwrap_or_default();
let post_transaction_accounts = loaded_txs
let transaction_result = execution_results[0].0.clone().map(|_| ());
let transaction_log_messages = transaction_log_messages
.get(0)
.cloned()
.flatten()
.unwrap_or_default();
let post_transaction_accounts = loaded_transactions
.into_iter()
.next()
.unwrap()
@ -3229,7 +3246,11 @@ impl Bank {
debug!("simulate_transaction: {:?}", timings);
(transaction_result, log_messages, post_transaction_accounts)
(
transaction_result,
transaction_log_messages,
post_transaction_accounts,
)
}
pub fn unlock_accounts(&self, batch: &mut TransactionBatch) {
@ -3372,21 +3393,21 @@ impl Bank {
hashed_txs: &[HashedTransaction],
lock_results: &[Result<()>],
max_age: usize,
mut error_counters: &mut ErrorCounters,
error_counters: &mut ErrorCounters,
) -> Vec<TransactionCheckResult> {
let age_results = self.check_age(
hashed_txs.as_transactions_iter(),
lock_results.to_vec(),
max_age,
&mut error_counters,
error_counters,
);
let cache_results = self.check_status_cache(hashed_txs, age_results, &mut error_counters);
let cache_results = self.check_status_cache(hashed_txs, age_results, error_counters);
if self.upgrade_epoch() {
// Reject all non-vote transactions
self.filter_by_vote_transactions(
hashed_txs.as_transactions_iter(),
cache_results,
&mut error_counters,
error_counters,
)
} else {
cache_results
@ -3623,21 +3644,13 @@ impl Bank {
enable_cpi_recording: bool,
enable_log_recording: bool,
timings: &mut ExecuteTimings,
) -> (
Vec<TransactionLoadResult>,
Vec<TransactionExecutionResult>,
Vec<Option<InnerInstructionsList>>,
Vec<Option<TransactionLogMessages>>,
Vec<usize>,
u64,
u64,
) {
) -> LoadAndExecuteTransactionsOutput {
let hashed_txs = batch.hashed_transactions();
debug!("processing transactions: {}", hashed_txs.len());
inc_new_counter_info!("bank-process_transactions", hashed_txs.len());
let mut error_counters = ErrorCounters::default();
let retryable_txs: Vec<_> = batch
let retryable_transaction_indexes: Vec<_> = batch
.lock_results()
.iter()
.enumerate()
@ -3661,7 +3674,7 @@ impl Bank {
check_time.stop();
let mut load_time = Measure::start("accounts_load");
let mut loaded_txs = self.rc.accounts.load_accounts(
let mut loaded_transactions = self.rc.accounts.load_accounts(
&self.ancestors,
hashed_txs.as_transactions_iter(),
check_results,
@ -3678,8 +3691,8 @@ impl Bank {
Vec::with_capacity(hashed_txs.len());
let mut transaction_log_messages: Vec<Option<Vec<String>>> =
Vec::with_capacity(hashed_txs.len());
let executed: Vec<TransactionExecutionResult> = loaded_txs
let mut executed_transactions_count: usize = 0;
let execution_results: Vec<TransactionExecutionResult> = loaded_transactions
.iter_mut()
.zip(hashed_txs.as_transactions_iter())
.map(|(accs, tx)| match accs {
@ -3714,6 +3727,8 @@ impl Bank {
Ok(())
};
executed_transactions_count += 1;
if process_result.is_ok() {
let mut get_executors_time = Measure::start("get_executors_time");
let executors = self.get_executors(
@ -3831,12 +3846,14 @@ impl Bank {
timings.load_us += load_time.as_us();
timings.execute_us += execution_time.as_us();
let mut tx_count: u64 = 0;
let mut executed_with_successful_result_count: usize = 0;
let err_count = &mut error_counters.total;
let transaction_log_collector_config =
self.transaction_log_collector_config.read().unwrap();
for (i, ((r, _nonce_rollback), hashed_tx)) in executed.iter().zip(hashed_txs).enumerate() {
for (i, ((r, _nonce_rollback), hashed_tx)) in
execution_results.iter().zip(hashed_txs).enumerate()
{
let tx = hashed_tx.transaction();
if let Some(debug_keys) = &self.transaction_debug_keys {
for key in &tx.message.account_keys {
@ -3901,7 +3918,7 @@ impl Bank {
}
if r.is_ok() {
tx_count += 1;
executed_with_successful_result_count += 1;
} else {
if *err_count == 0 {
debug!("tx error: {:?} {:?}", r, tx);
@ -3913,19 +3930,21 @@ impl Bank {
debug!(
"{} errors of {} txs",
*err_count,
*err_count as u64 + tx_count
*err_count + executed_with_successful_result_count
);
}
Self::update_error_counters(&error_counters);
(
loaded_txs,
executed,
LoadAndExecuteTransactionsOutput {
loaded_transactions,
execution_results,
inner_instructions,
transaction_log_messages,
retryable_txs,
tx_count,
retryable_transaction_indexes,
executed_transactions_count,
executed_with_successful_result_count,
signature_count,
)
}
}
fn filter_program_errors_and_collect_fee<'a>(
@ -3983,12 +4002,17 @@ impl Bank {
results
}
/// `committed_transactions_count` is the number of transactions out of `sanitized_txs`
/// that was executed. Of those, `committed_transactions_count`,
/// `committed_with_failure_result_count` is the number of executed transactions that returned
/// a failure result.
pub fn commit_transactions(
&self,
hashed_txs: &[HashedTransaction],
loaded_txs: &mut [TransactionLoadResult],
executed: &[TransactionExecutionResult],
tx_count: u64,
execution_results: Vec<TransactionExecutionResult>,
committed_transactions_count: u64,
committed_with_failure_result_count: u64,
signature_count: u64,
timings: &mut ExecuteTimings,
) -> TransactionResults {
@ -3997,34 +4021,42 @@ impl Bank {
"commit_transactions() working on a bank that is already frozen or is undergoing freezing!"
);
let tx_count = if self.bank_tranaction_count_fix_enabled() {
committed_transactions_count
} else {
committed_transactions_count.saturating_sub(committed_with_failure_result_count)
};
self.increment_transaction_count(tx_count);
self.increment_signature_count(signature_count);
inc_new_counter_info!("bank-process_transactions-txs", tx_count as usize);
inc_new_counter_info!(
"bank-process_transactions-txs",
committed_transactions_count as usize
);
inc_new_counter_info!("bank-process_transactions-sigs", signature_count as usize);
if !hashed_txs.is_empty() {
let processed_tx_count = hashed_txs.len() as u64;
let failed_tx_count = processed_tx_count.saturating_sub(tx_count);
if committed_with_failure_result_count > 0 {
self.transaction_error_count
.fetch_add(failed_tx_count, Relaxed);
self.transaction_entries_count.fetch_add(1, Relaxed);
self.transactions_per_entry_max
.fetch_max(processed_tx_count, Relaxed);
.fetch_add(committed_with_failure_result_count, Relaxed);
}
if executed
// Should be equivalent to checking `committed_transactions_count > 0`
if execution_results
.iter()
.any(|(res, _nonce_rollback)| Self::can_commit(res))
{
self.is_delta.store(true, Relaxed);
self.transaction_entries_count.fetch_add(1, Relaxed);
self.transactions_per_entry_max
.fetch_max(committed_transactions_count, Relaxed);
}
let mut write_time = Measure::start("write_time");
self.rc.accounts.store_cached(
self.slot(),
hashed_txs.as_transactions_iter(),
executed,
&execution_results,
loaded_txs,
&self.rent_collector,
&self.last_blockhash_with_fee_calculator(),
@ -4033,10 +4065,14 @@ impl Bank {
self.merge_nonce_error_into_system_error(),
self.demote_program_write_locks(),
);
let rent_debits = self.collect_rent(executed, loaded_txs);
let rent_debits = self.collect_rent(&execution_results, loaded_txs);
let mut update_stakes_cache_time = Measure::start("update_stakes_cache_time");
self.update_stakes_cache(hashed_txs.as_transactions_iter(), executed, loaded_txs);
self.update_stakes_cache(
hashed_txs.as_transactions_iter(),
&execution_results,
loaded_txs,
);
update_stakes_cache_time.stop();
// once committed there is no way to unroll
@ -4048,13 +4084,15 @@ impl Bank {
);
timings.store_us += write_time.as_us();
timings.update_stakes_cache_us += update_stakes_cache_time.as_us();
self.update_transaction_statuses(hashed_txs, executed);
let fee_collection_results =
self.filter_program_errors_and_collect_fee(hashed_txs.as_transactions_iter(), executed);
self.update_transaction_statuses(hashed_txs, &execution_results);
let fee_collection_results = self.filter_program_errors_and_collect_fee(
hashed_txs.as_transactions_iter(),
&execution_results,
);
TransactionResults {
fee_collection_results,
execution_results: executed.to_vec(),
execution_results,
rent_debits,
}
}
@ -4666,15 +4704,16 @@ impl Bank {
vec![]
};
let (
mut loaded_txs,
executed,
let LoadAndExecuteTransactionsOutput {
mut loaded_transactions,
execution_results,
inner_instructions,
transaction_logs,
_,
tx_count,
transaction_log_messages,
executed_transactions_count,
executed_with_successful_result_count,
signature_count,
) = self.load_and_execute_transactions(
..
} = self.load_and_execute_transactions(
batch,
max_age,
enable_cpi_recording,
@ -4684,9 +4723,11 @@ impl Bank {
let results = self.commit_transactions(
batch.hashed_transactions(),
&mut loaded_txs,
&executed,
tx_count,
&mut loaded_transactions,
execution_results,
executed_transactions_count as u64,
executed_transactions_count.saturating_sub(executed_with_successful_result_count)
as u64,
signature_count,
timings,
);
@ -4699,7 +4740,7 @@ impl Bank {
results,
TransactionBalancesSet::new(pre_balances, post_balances),
inner_instructions,
transaction_logs,
transaction_log_messages,
)
}
@ -5641,6 +5682,11 @@ impl Bank {
consumed_budget.saturating_sub(budget_recovery_delta)
}
pub fn bank_tranaction_count_fix_enabled(&self) -> bool {
self.feature_set
.is_active(&feature_set::bank_tranaction_count_fix::id())
}
pub fn shrink_candidate_slots(&self) -> usize {
self.rc.accounts.accounts_db.shrink_candidate_slots()
}

View File

@ -289,6 +289,10 @@ pub mod spl_associated_token_account_v1_0_4 {
solana_sdk::declare_id!("FaTa4SpiaSNH44PGC4z8bnGVTkSRYaWvrBs3KTu8XQQq");
}
pub mod bank_tranaction_count_fix {
solana_sdk::declare_id!("Vo5siZ442SaZBKPXNocthiXysNviW4UYPwRFggmbgAp");
}
lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@ -361,6 +365,7 @@ lazy_static! {
(reject_non_rent_exempt_vote_withdraws::id(), "fail vote withdraw instructions which leave the account non-rent-exempt"),
(evict_invalid_stakes_cache_entries::id(), "evict invalid stakes cache entries on epoch boundaries"),
(spl_associated_token_account_v1_0_4::id(), "SPL Associated Token Account Program release version 1.0.4, tied to token 3.3.0 #22648"),
(bank_tranaction_count_fix::id(), "Fixes Bank::transaction_count to include all committed transactions, not just successful ones"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()