diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 8df2c23dab..37d501a9f1 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -422,25 +422,21 @@ impl BankingStage { results: &[transaction::Result<()>], poh: &Arc>, ) -> (Result<()>, Vec) { - let mut ok_txs = vec![]; let mut processed_generation = Measure::start("record::process_generation"); - let processed_transactions: Vec<_> = results + let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results .iter() .zip(txs.iter()) .enumerate() .filter_map(|(i, (r, x))| { - if r.is_ok() { - ok_txs.push(i); - } if Bank::can_commit(r) { - Some(x.clone()) + Some((x.clone(), i)) } else { None } }) - .collect(); - processed_generation.stop(); + .unzip(); + processed_generation.stop(); debug!("processed: {} ", processed_transactions.len()); // unlock all the accounts with errors which are filtered by the above `filter_map` if !processed_transactions.is_empty() { @@ -463,9 +459,9 @@ impl BankingStage { match res { Ok(()) => (), Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => { - // If record errors, return all the ok transactions as retryable, filter out - // all the transactions we know were errors - return (res, ok_txs); + // If record errors, add all the committable transactions (the ones + // we just attempted to record) as retryable + return (res, processed_transactions_indexes); } Err(_) => panic!("Poh recorder returned unexpected error"), } @@ -595,14 +591,10 @@ impl BankingStage { // process_and_record_transactions has returned all retryable errors in // transactions[chunk_start..chunk_end], so we just need to push the remaining // transactions into the unprocessed queue. - let range: Vec = (chunk_end..transactions.len()).collect(); - unprocessed_txs.extend_from_slice(&range); - unprocessed_txs.sort_unstable(); - unprocessed_txs.dedup(); + unprocessed_txs.extend(chunk_end..transactions.len()); break; } // Don't exit early on any other type of error, continue processing... - chunk_start = chunk_end; } @@ -671,11 +663,15 @@ impl BankingStage { Self::filter_transaction_indexes(transactions, &transaction_indexes) } - // This function filters pending transactions that are still valid - fn filter_pending_transactions( + /// This function filters pending packets that are still valid + /// # Arguments + /// * `transactions` - a batch of transactions deserialized from packets + /// * `transaction_to_packet_indexes` - maps each transaction to a packet index + /// * `pending_indexes` - identifies which indexes in the `transactions` list are still pending + fn filter_pending_packets_from_pending_txs( bank: &Arc, transactions: &[Transaction], - transaction_indexes: &[usize], + transaction_to_packet_indexes: &[usize], pending_indexes: &[usize], ) -> Vec { let filter = Self::prepare_filter_for_pending_transactions(transactions, pending_indexes); @@ -698,17 +694,17 @@ impl BankingStage { &mut error_counters, ); - Self::filter_valid_transaction_indexes(&result, transaction_indexes) + Self::filter_valid_transaction_indexes(&result, transaction_to_packet_indexes) } fn process_received_packets( bank: &Arc, poh: &Arc>, msgs: &Packets, - transaction_indexes: Vec, + packet_indexes: Vec, ) -> (usize, usize, Vec) { - let (transactions, transaction_indexes) = - Self::transactions_from_packets(msgs, &transaction_indexes); + let (transactions, transaction_to_packet_indexes) = + Self::transactions_from_packets(msgs, &packet_indexes); debug!( "bank: {} filtered transactions {}", bank.slot(), @@ -722,18 +718,18 @@ impl BankingStage { let unprocessed_tx_count = unprocessed_tx_indexes.len(); - let filtered_unprocessed_tx_indexes = Self::filter_pending_transactions( + let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( bank, &transactions, - &transaction_indexes, + &transaction_to_packet_indexes, &unprocessed_tx_indexes, ); inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", - unprocessed_tx_count.saturating_sub(filtered_unprocessed_tx_indexes.len()) + unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) ); - (processed, tx_len, filtered_unprocessed_tx_indexes) + (processed, tx_len, filtered_unprocessed_packet_indexes) } fn filter_unprocessed_packets( @@ -752,25 +748,25 @@ impl BankingStage { } } - let (transactions, transaction_indexes) = + let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(msgs, &transaction_indexes); - let tx_count = transaction_indexes.len(); + let tx_count = transaction_to_packet_indexes.len(); let unprocessed_tx_indexes = (0..transactions.len()).collect_vec(); - let filtered_unprocessed_tx_indexes = Self::filter_pending_transactions( + let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( bank, &transactions, - &transaction_indexes, + &transaction_to_packet_indexes, &unprocessed_tx_indexes, ); inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", - tx_count.saturating_sub(filtered_unprocessed_tx_indexes.len()) + tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) ); - filtered_unprocessed_tx_indexes + filtered_unprocessed_packet_indexes } fn generate_packet_indexes(vers: Vec) -> Vec {