Cleanup some of banking stage (#4878)

* Add committable transactions that cause errors like InstructionErrors back to retryable list on MaxHeightReached

* Remove unnecessary logic

* Add comments/renaming for clarity
This commit is contained in:
carllin 2019-07-01 12:14:40 -07:00 committed by GitHub
parent 19ea5fe0c0
commit c1953dca8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -422,25 +422,21 @@ impl BankingStage {
results: &[transaction::Result<()>],
poh: &Arc<Mutex<PohRecorder>>,
) -> (Result<()>, Vec<usize>) {
let mut ok_txs = vec![];
let mut processed_generation = Measure::start("record::process_generation");
let processed_transactions: Vec<_> = results
let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results
.iter()
.zip(txs.iter())
.enumerate()
.filter_map(|(i, (r, x))| {
if r.is_ok() {
ok_txs.push(i);
}
if Bank::can_commit(r) {
Some(x.clone())
Some((x.clone(), i))
} else {
None
}
})
.collect();
processed_generation.stop();
.unzip();
processed_generation.stop();
debug!("processed: {} ", processed_transactions.len());
// unlock all the accounts with errors which are filtered by the above `filter_map`
if !processed_transactions.is_empty() {
@ -463,9 +459,9 @@ impl BankingStage {
match res {
Ok(()) => (),
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => {
// If record errors, return all the ok transactions as retryable, filter out
// all the transactions we know were errors
return (res, ok_txs);
// If record errors, add all the committable transactions (the ones
// we just attempted to record) as retryable
return (res, processed_transactions_indexes);
}
Err(_) => panic!("Poh recorder returned unexpected error"),
}
@ -595,14 +591,10 @@ impl BankingStage {
// process_and_record_transactions has returned all retryable errors in
// transactions[chunk_start..chunk_end], so we just need to push the remaining
// transactions into the unprocessed queue.
let range: Vec<usize> = (chunk_end..transactions.len()).collect();
unprocessed_txs.extend_from_slice(&range);
unprocessed_txs.sort_unstable();
unprocessed_txs.dedup();
unprocessed_txs.extend(chunk_end..transactions.len());
break;
}
// Don't exit early on any other type of error, continue processing...
chunk_start = chunk_end;
}
@ -671,11 +663,15 @@ impl BankingStage {
Self::filter_transaction_indexes(transactions, &transaction_indexes)
}
// This function filters pending transactions that are still valid
fn filter_pending_transactions(
/// This function filters pending packets that are still valid
/// # Arguments
/// * `transactions` - a batch of transactions deserialized from packets
/// * `transaction_to_packet_indexes` - maps each transaction to a packet index
/// * `pending_indexes` - identifies which indexes in the `transactions` list are still pending
fn filter_pending_packets_from_pending_txs(
bank: &Arc<Bank>,
transactions: &[Transaction],
transaction_indexes: &[usize],
transaction_to_packet_indexes: &[usize],
pending_indexes: &[usize],
) -> Vec<usize> {
let filter = Self::prepare_filter_for_pending_transactions(transactions, pending_indexes);
@ -698,17 +694,17 @@ impl BankingStage {
&mut error_counters,
);
Self::filter_valid_transaction_indexes(&result, transaction_indexes)
Self::filter_valid_transaction_indexes(&result, transaction_to_packet_indexes)
}
fn process_received_packets(
bank: &Arc<Bank>,
poh: &Arc<Mutex<PohRecorder>>,
msgs: &Packets,
transaction_indexes: Vec<usize>,
packet_indexes: Vec<usize>,
) -> (usize, usize, Vec<usize>) {
let (transactions, transaction_indexes) =
Self::transactions_from_packets(msgs, &transaction_indexes);
let (transactions, transaction_to_packet_indexes) =
Self::transactions_from_packets(msgs, &packet_indexes);
debug!(
"bank: {} filtered transactions {}",
bank.slot(),
@ -722,18 +718,18 @@ impl BankingStage {
let unprocessed_tx_count = unprocessed_tx_indexes.len();
let filtered_unprocessed_tx_indexes = Self::filter_pending_transactions(
let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs(
bank,
&transactions,
&transaction_indexes,
&transaction_to_packet_indexes,
&unprocessed_tx_indexes,
);
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
unprocessed_tx_count.saturating_sub(filtered_unprocessed_tx_indexes.len())
unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len())
);
(processed, tx_len, filtered_unprocessed_tx_indexes)
(processed, tx_len, filtered_unprocessed_packet_indexes)
}
fn filter_unprocessed_packets(
@ -752,25 +748,25 @@ impl BankingStage {
}
}
let (transactions, transaction_indexes) =
let (transactions, transaction_to_packet_indexes) =
Self::transactions_from_packets(msgs, &transaction_indexes);
let tx_count = transaction_indexes.len();
let tx_count = transaction_to_packet_indexes.len();
let unprocessed_tx_indexes = (0..transactions.len()).collect_vec();
let filtered_unprocessed_tx_indexes = Self::filter_pending_transactions(
let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs(
bank,
&transactions,
&transaction_indexes,
&transaction_to_packet_indexes,
&unprocessed_tx_indexes,
);
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
tx_count.saturating_sub(filtered_unprocessed_tx_indexes.len())
tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len())
);
filtered_unprocessed_tx_indexes
filtered_unprocessed_packet_indexes
}
fn generate_packet_indexes(vers: Vec<u8>) -> Vec<usize> {