Use OrderedIterator in TransactionStatusService (#11149) (#11150)

* Split out get-first-err for unit testing

* Add failing test

* Add missing ordering

(cherry picked from commit 6c38369042)

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
This commit is contained in:
mergify[bot]
2020-07-21 17:49:38 -06:00
committed by GitHub
parent c73e40a351
commit 4bd018e68b
4 changed files with 99 additions and 27 deletions

View File

@ -549,6 +549,7 @@ impl BankingStage {
send_transaction_status_batch( send_transaction_status_batch(
bank.clone(), bank.clone(),
batch.transactions(), batch.transactions(),
batch.iteration_order_vec(),
transaction_statuses, transaction_statuses,
TransactionBalancesSet::new(pre_balances, post_balances), TransactionBalancesSet::new(pre_balances, post_balances),
sender, sender,

View File

@ -3,6 +3,7 @@ use solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionSta
use solana_runtime::{ use solana_runtime::{
bank::{Bank, HashAgeKind}, bank::{Bank, HashAgeKind},
nonce_utils, nonce_utils,
transaction_utils::OrderedIterator,
}; };
use solana_transaction_status::TransactionStatusMeta; use solana_transaction_status::TransactionStatusMeta;
use std::{ use std::{
@ -50,13 +51,14 @@ impl TransactionStatusService {
let TransactionStatusBatch { let TransactionStatusBatch {
bank, bank,
transactions, transactions,
iteration_order,
statuses, statuses,
balances, balances,
} = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?; } = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?;
let slot = bank.slot(); let slot = bank.slot();
for (((transaction, (status, hash_age_kind)), pre_balances), post_balances) in transactions for (((transaction, (status, hash_age_kind)), pre_balances), post_balances) in
.iter() OrderedIterator::new(&transactions, iteration_order.as_deref())
.zip(statuses) .zip(statuses)
.zip(balances.pre_balances) .zip(balances.pre_balances)
.zip(balances.post_balances) .zip(balances.post_balances)

View File

@ -18,13 +18,14 @@ use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::{ use solana_runtime::{
bank::{Bank, TransactionBalancesSet, TransactionProcessResult, TransactionResults}, bank::{Bank, TransactionBalancesSet, TransactionProcessResult, TransactionResults},
transaction_batch::TransactionBatch, transaction_batch::TransactionBatch,
transaction_utils::OrderedIterator,
}; };
use solana_sdk::{ use solana_sdk::{
clock::{Slot, MAX_PROCESSING_AGE}, clock::{Slot, MAX_PROCESSING_AGE},
genesis_config::GenesisConfig, genesis_config::GenesisConfig,
hash::Hash, hash::Hash,
pubkey::Pubkey, pubkey::Pubkey,
signature::Keypair, signature::{Keypair, Signature},
timing::duration_as_ms, timing::duration_as_ms,
transaction::{Result, Transaction, TransactionError}, transaction::{Result, Transaction, TransactionError},
}; };
@ -57,6 +58,37 @@ fn first_err(results: &[Result<()>]) -> Result<()> {
Ok(()) Ok(())
} }
// Includes transaction signature for unit-testing
fn get_first_error(
batch: &TransactionBatch,
fee_collection_results: Vec<Result<()>>,
) -> Option<(Result<()>, Signature)> {
let mut first_err = None;
for (result, transaction) in fee_collection_results.iter().zip(OrderedIterator::new(
batch.transactions(),
batch.iteration_order(),
)) {
if let Err(ref err) = result {
if first_err.is_none() {
first_err = Some((result.clone(), transaction.signatures[0]));
}
warn!(
"Unexpected validator error: {:?}, transaction: {:?}",
err, transaction
);
datapoint_error!(
"validator_process_entry_error",
(
"error",
format!("error: {:?}, transaction: {:?}", err, transaction),
String
)
);
}
}
first_err
}
fn execute_batch( fn execute_batch(
batch: &TransactionBatch, batch: &TransactionBatch,
bank: &Arc<Bank>, bank: &Arc<Bank>,
@ -78,33 +110,15 @@ fn execute_batch(
send_transaction_status_batch( send_transaction_status_batch(
bank.clone(), bank.clone(),
batch.transactions(), batch.transactions(),
batch.iteration_order_vec(),
processing_results, processing_results,
balances, balances,
sender, sender,
); );
} }
let mut first_err = None; let first_err = get_first_error(batch, fee_collection_results);
for (result, transaction) in fee_collection_results.iter().zip(batch.transactions()) { first_err.map(|(result, _)| result).unwrap_or(Ok(()))
if let Err(ref err) = result {
if first_err.is_none() {
first_err = Some(result.clone());
}
warn!(
"Unexpected validator error: {:?}, transaction: {:?}",
err, transaction
);
datapoint_error!(
"validator_process_entry_error",
(
"error",
format!("error: {:?}, transaction: {:?}", err, transaction),
String
)
);
}
}
first_err.unwrap_or(Ok(()))
} }
fn execute_batches( fn execute_batches(
@ -824,6 +838,7 @@ fn process_single_slot(
pub struct TransactionStatusBatch { pub struct TransactionStatusBatch {
pub bank: Arc<Bank>, pub bank: Arc<Bank>,
pub transactions: Vec<Transaction>, pub transactions: Vec<Transaction>,
pub iteration_order: Option<Vec<usize>>,
pub statuses: Vec<TransactionProcessResult>, pub statuses: Vec<TransactionProcessResult>,
pub balances: TransactionBalancesSet, pub balances: TransactionBalancesSet,
} }
@ -832,6 +847,7 @@ pub type TransactionStatusSender = Sender<TransactionStatusBatch>;
pub fn send_transaction_status_batch( pub fn send_transaction_status_batch(
bank: Arc<Bank>, bank: Arc<Bank>,
transactions: &[Transaction], transactions: &[Transaction],
iteration_order: Option<Vec<usize>>,
statuses: Vec<TransactionProcessResult>, statuses: Vec<TransactionProcessResult>,
balances: TransactionBalancesSet, balances: TransactionBalancesSet,
transaction_status_sender: TransactionStatusSender, transaction_status_sender: TransactionStatusSender,
@ -840,6 +856,7 @@ pub fn send_transaction_status_batch(
if let Err(e) = transaction_status_sender.send(TransactionStatusBatch { if let Err(e) = transaction_status_sender.send(TransactionStatusBatch {
bank, bank,
transactions: transactions.to_vec(), transactions: transactions.to_vec(),
iteration_order,
statuses, statuses,
balances, balances,
}) { }) {
@ -2665,4 +2682,51 @@ pub mod tests {
} }
} }
} }
#[test]
fn test_get_first_error() {
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(1_000_000_000);
let bank = Arc::new(Bank::new(&genesis_config));
let present_account_key = Keypair::new();
let present_account = Account::new(1, 10, &Pubkey::default());
bank.store_account(&present_account_key.pubkey(), &present_account);
let keypair = Keypair::new();
// Create array of two transactions which throw different errors
let account_not_found_tx =
system_transaction::transfer(&keypair, &Pubkey::new_rand(), 42, bank.last_blockhash());
let account_not_found_sig = account_not_found_tx.signatures[0];
let mut account_loaded_twice = system_transaction::transfer(
&mint_keypair,
&Pubkey::new_rand(),
42,
bank.last_blockhash(),
);
account_loaded_twice.message.account_keys[1] = mint_keypair.pubkey();
let transactions = [account_loaded_twice, account_not_found_tx];
// Use inverted iteration_order
let iteration_order: Vec<usize> = vec![1, 0];
let batch = bank.prepare_batch(&transactions, Some(iteration_order));
let (
TransactionResults {
fee_collection_results,
processing_results: _,
},
_balances,
) = batch
.bank()
.load_execute_and_commit_transactions(&batch, MAX_PROCESSING_AGE, false);
let (err, signature) = get_first_error(&batch, fee_collection_results).unwrap();
// First error found should be for the 2nd transaction, due to iteration_order
assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound);
assert_eq!(signature, account_not_found_sig);
}
} }

View File

@ -41,6 +41,11 @@ impl<'a, 'b> TransactionBatch<'a, 'b> {
pub fn iteration_order(&self) -> Option<&[usize]> { pub fn iteration_order(&self) -> Option<&[usize]> {
self.iteration_order.as_deref() self.iteration_order.as_deref()
} }
pub fn iteration_order_vec(&self) -> Option<Vec<usize>> {
self.iteration_order.clone()
}
pub fn bank(&self) -> &Bank { pub fn bank(&self) -> &Bank {
self.bank self.bank
} }