count commitable in banking_stage (#5477)

This commit is contained in:
Rob Walker
2019-08-09 21:14:20 -07:00
committed by GitHub
parent 9ee5f36068
commit 5884469d11

View File

@ -429,7 +429,7 @@ impl BankingStage {
txs: &[Transaction], txs: &[Transaction],
results: &[transaction::Result<()>], results: &[transaction::Result<()>],
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
) -> (Result<()>, Vec<usize>) { ) -> (Result<usize>, Vec<usize>) {
let mut processed_generation = Measure::start("record::process_generation"); let mut processed_generation = Measure::start("record::process_generation");
let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results
.iter() .iter()
@ -445,13 +445,11 @@ impl BankingStage {
.unzip(); .unzip();
processed_generation.stop(); processed_generation.stop();
debug!("processed: {} ", processed_transactions.len()); let num_to_commit = processed_transactions.len();
debug!("num_to_commit: {} ", num_to_commit);
// unlock all the accounts with errors which are filtered by the above `filter_map` // unlock all the accounts with errors which are filtered by the above `filter_map`
if !processed_transactions.is_empty() { if !processed_transactions.is_empty() {
inc_new_counter_warn!( inc_new_counter_warn!("banking_stage-record_transactions", num_to_commit);
"banking_stage-record_transactions",
processed_transactions.len()
);
let mut hash_time = Measure::start("record::hash"); let mut hash_time = Measure::start("record::hash");
let hash = hash_transactions(&processed_transactions[..]); let hash = hash_transactions(&processed_transactions[..]);
@ -469,13 +467,16 @@ impl BankingStage {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => { Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => {
// If record errors, add all the committable transactions (the ones // If record errors, add all the committable transactions (the ones
// we just attempted to record) as retryable // we just attempted to record) as retryable
return (res, processed_transactions_indexes); return (
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)),
processed_transactions_indexes,
);
} }
Err(e) => panic!(format!("Poh recorder returned unexpected error: {:?}", e)), Err(e) => panic!(format!("Poh recorder returned unexpected error: {:?}", e)),
} }
poh_record.stop(); poh_record.stop();
} }
(Ok(()), vec![]) (Ok(num_to_commit), vec![])
} }
fn process_and_record_transactions_locked( fn process_and_record_transactions_locked(
@ -483,7 +484,7 @@ impl BankingStage {
txs: &[Transaction], txs: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
lock_results: &LockedAccountsResults, lock_results: &LockedAccountsResults,
) -> (Result<()>, Vec<usize>) { ) -> (Result<usize>, Vec<usize>) {
let mut load_execute_time = Measure::start("load_execute_time"); let mut load_execute_time = Measure::start("load_execute_time");
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce // Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids. // the likelihood of any single thread getting starved and processing old ids.
@ -495,20 +496,20 @@ impl BankingStage {
let freeze_lock = bank.freeze_lock(); let freeze_lock = bank.freeze_lock();
let record_time = {
let mut record_time = Measure::start("record_time"); let mut record_time = Measure::start("record_time");
let (res, retryable_record_txs) = let (num_to_commit, retryable_record_txs) =
Self::record_transactions(bank.slot(), txs, &results, poh); Self::record_transactions(bank.slot(), txs, &results, poh);
retryable_txs.extend(retryable_record_txs); retryable_txs.extend(retryable_record_txs);
if res.is_err() { if num_to_commit.is_err() {
return (res, retryable_txs); return (num_to_commit, retryable_txs);
} }
record_time.stop(); record_time.stop();
record_time
};
let commit_time = {
let mut commit_time = Measure::start("commit_time"); let mut commit_time = Measure::start("commit_time");
let num_to_commit = num_to_commit.unwrap();
if num_to_commit != 0 {
bank.commit_transactions( bank.commit_transactions(
txs, txs,
&mut loaded_accounts, &mut loaded_accounts,
@ -516,14 +517,13 @@ impl BankingStage {
tx_count, tx_count,
signature_count, signature_count,
); );
}
commit_time.stop(); commit_time.stop();
commit_time
};
drop(freeze_lock); drop(freeze_lock);
debug!( debug!(
"bank: {} load_execute: {}us record: {}us commit: {}us txs_len: {}", "bank: {} process_and_record_locked: {}us record: {}us commit: {}us txs_len: {}",
bank.slot(), bank.slot(),
load_execute_time.as_us(), load_execute_time.as_us(),
record_time.as_us(), record_time.as_us(),
@ -531,7 +531,7 @@ impl BankingStage {
txs.len(), txs.len(),
); );
(Ok(()), retryable_txs) (Ok(num_to_commit), retryable_txs)
} }
pub fn process_and_record_transactions( pub fn process_and_record_transactions(
@ -539,7 +539,7 @@ impl BankingStage {
txs: &[Transaction], txs: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
chunk_offset: usize, chunk_offset: usize,
) -> (Result<()>, Vec<usize>) { ) -> (Result<usize>, Vec<usize>) {
let mut lock_time = Measure::start("lock_time"); let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the // Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state // same account state