randomize tx ordering (#4978)

Summary of Changes:
This change adds functionality to randomize tx execution for every entry. It does this by implementing OrderedIterator that iterates tx slice as per the order specified. The order is generated randomly for every entry.
This commit is contained in:
Parth
2019-08-28 21:08:32 +05:30
committed by GitHub
parent 1609765740
commit 7dfb735db9
12 changed files with 451 additions and 99 deletions

View File

@ -489,7 +489,7 @@ impl BankingStage {
// TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires.
let (mut loaded_accounts, results, mut retryable_txs, tx_count, signature_count) =
bank.load_and_execute_transactions(txs, lock_results, MAX_PROCESSING_AGE);
bank.load_and_execute_transactions(txs, None, lock_results, MAX_PROCESSING_AGE);
load_execute_time.stop();
let freeze_lock = bank.freeze_lock();
@ -510,6 +510,7 @@ impl BankingStage {
if num_to_commit != 0 {
bank.commit_transactions(
txs,
None,
&mut loaded_accounts,
&results,
tx_count,
@ -541,7 +542,7 @@ impl BankingStage {
let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let lock_results = bank.lock_accounts(txs);
let lock_results = bank.lock_accounts(txs, None);
lock_time.stop();
let (result, mut retryable_txs) =
@ -696,6 +697,7 @@ impl BankingStage {
// Drop the transaction if it will expire by the time the next node receives and processes it
let result = bank.check_transactions(
transactions,
None,
&filter,
(MAX_PROCESSING_AGE)
.saturating_sub(MAX_TRANSACTION_FORWARDING_DELAY)

View File

@ -15,6 +15,9 @@ use std::result;
use std::sync::Arc;
use std::time::{Duration, Instant};
use rand::seq::SliceRandom;
use rand::thread_rng;
pub const NUM_THREADS: u32 = 10;
use std::cell::RefCell;
@ -32,35 +35,46 @@ fn first_err(results: &[Result<()>]) -> Result<()> {
Ok(())
}
fn par_execute_entries(bank: &Bank, entries: &[(&Entry, LockedAccountsResults)]) -> Result<()> {
fn par_execute_entries(
bank: &Bank,
entries: &[(&Entry, LockedAccountsResults, bool, Vec<usize>)],
) -> Result<()> {
inc_new_counter_debug!("bank-par_execute_entries-count", entries.len());
let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
entries
.into_par_iter()
.map(|(e, locked_accounts)| {
let results = bank.load_execute_and_commit_transactions(
&e.transactions,
locked_accounts,
MAX_RECENT_BLOCKHASHES,
);
let mut first_err = None;
for (r, tx) in results.iter().zip(e.transactions.iter()) {
if let Err(ref e) = r {
if first_err.is_none() {
first_err = Some(r.clone());
}
if !Bank::can_commit(&r) {
warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx);
datapoint_error!(
"validator_process_entry_error",
("error", format!("error: {:?}, tx: {:?}", e, tx), String)
);
.map(
|(e, locked_accounts, randomize_tx_order, random_txs_execution_order)| {
let tx_execution_order: Option<&[usize]> = if *randomize_tx_order {
Some(random_txs_execution_order)
} else {
None
};
let results = bank.load_execute_and_commit_transactions(
&e.transactions,
tx_execution_order,
locked_accounts,
MAX_RECENT_BLOCKHASHES,
);
let mut first_err = None;
for (r, tx) in results.iter().zip(e.transactions.iter()) {
if let Err(ref e) = r {
if first_err.is_none() {
first_err = Some(r.clone());
}
if !Bank::can_commit(&r) {
warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx);
datapoint_error!(
"validator_process_entry_error",
("error", format!("error: {:?}, tx: {:?}", e, tx), String)
);
}
}
}
}
first_err.unwrap_or(Ok(()))
})
first_err.unwrap_or(Ok(()))
},
)
.collect()
})
});
@ -73,7 +87,11 @@ fn par_execute_entries(bank: &Bank, entries: &[(&Entry, LockedAccountsResults)])
/// 2. Process the locked group in parallel
/// 3. Register the `Tick` if it's available
/// 4. Update the leader scheduler, goto 1
pub fn process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> {
pub fn process_entries(
bank: &Bank,
entries: &[Entry],
randomize_tx_execution_order: bool,
) -> Result<()> {
// accumulator for entries that can be processed in parallel
let mut mt_group = vec![];
for entry in entries {
@ -86,15 +104,34 @@ pub fn process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> {
}
// else loop on processing the entry
loop {
// random_txs_execution_order need to be seperately defined apart from txs_execution_order,
// to satisfy borrow checker.
let mut random_txs_execution_order: Vec<usize> = vec![];
if randomize_tx_execution_order {
random_txs_execution_order = (0..entry.transactions.len()).collect();
random_txs_execution_order.shuffle(&mut thread_rng());
}
let txs_execution_order: Option<&[usize]> = if randomize_tx_execution_order {
Some(&random_txs_execution_order)
} else {
None
};
// try to lock the accounts
let lock_results = bank.lock_accounts(&entry.transactions);
let lock_results = bank.lock_accounts(&entry.transactions, txs_execution_order);
let first_lock_err = first_err(lock_results.locked_accounts_results());
// if locking worked
if first_lock_err.is_ok() {
// push the entry to the mt_group
mt_group.push((entry, lock_results));
mt_group.push((
entry,
lock_results,
randomize_tx_execution_order,
random_txs_execution_order,
));
// done with this entry
break;
}
@ -225,7 +262,7 @@ fn verify_and_process_entries(
return Err(BlocktreeProcessorError::LedgerVerificationFailed);
}
process_entries(&bank, &entries).map_err(|err| {
process_entries(&bank, &entries, true).map_err(|err| {
warn!(
"Failed to process entries for slot {}: {:?}",
bank.slot(),
@ -417,6 +454,7 @@ pub mod tests {
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
use solana_sdk::transaction::Transaction;
use solana_sdk::transaction::TransactionError;
pub fn fill_blocktree_slot_with_ticks(
@ -771,7 +809,7 @@ pub mod tests {
);
// Now ensure the TX is accepted despite pointing to the ID of an empty entry.
process_entries(&bank, &slot_entries).unwrap();
process_entries(&bank, &slot_entries, true).unwrap();
assert_eq!(bank.process_transaction(&tx), Ok(()));
}
@ -868,7 +906,7 @@ pub mod tests {
// ensure bank can process a tick
assert_eq!(bank.tick_height(), 0);
let tick = next_entry(&genesis_block.hash(), 1, vec![]);
assert_eq!(process_entries(&bank, &[tick.clone()]), Ok(()));
assert_eq!(process_entries(&bank, &[tick.clone()], true), Ok(()));
assert_eq!(bank.tick_height(), 1);
}
@ -900,7 +938,7 @@ pub mod tests {
bank.last_blockhash(),
);
let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]);
assert_eq!(process_entries(&bank, &[entry_1, entry_2]), Ok(()));
assert_eq!(process_entries(&bank, &[entry_1, entry_2], true), Ok(()));
assert_eq!(bank.get_balance(&keypair1.pubkey()), 2);
assert_eq!(bank.get_balance(&keypair2.pubkey()), 2);
assert_eq!(bank.last_blockhash(), blockhash);
@ -954,7 +992,7 @@ pub mod tests {
);
assert_eq!(
process_entries(&bank, &[entry_1_to_mint, entry_2_to_3_mint_to_1]),
process_entries(&bank, &[entry_1_to_mint, entry_2_to_3_mint_to_1], false),
Ok(())
);
@ -1022,7 +1060,8 @@ pub mod tests {
assert!(process_entries(
&bank,
&[entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()]
&[entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()],
false
)
.is_err());
@ -1033,13 +1072,13 @@ pub mod tests {
// Check all accounts are unlocked
let txs1 = &entry_1_to_mint.transactions[..];
let txs2 = &entry_2_to_3_mint_to_1.transactions[..];
let locked_accounts1 = bank.lock_accounts(txs1);
let locked_accounts1 = bank.lock_accounts(txs1, None);
for result in locked_accounts1.locked_accounts_results() {
assert!(result.is_ok());
}
// txs1 and txs2 have accounts that conflict, so we must drop txs1 first
drop(locked_accounts1);
let locked_accounts2 = bank.lock_accounts(txs2);
let locked_accounts2 = bank.lock_accounts(txs2, None);
for result in locked_accounts2.locked_accounts_results() {
assert!(result.is_ok());
}
@ -1131,7 +1170,8 @@ pub mod tests {
entry_1_to_mint.clone(),
entry_2_to_3_and_1_to_mint.clone(),
entry_conflict_itself.clone()
]
],
false
)
.is_err());
@ -1186,12 +1226,88 @@ pub mod tests {
bank.last_blockhash(),
);
let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]);
assert_eq!(process_entries(&bank, &[entry_1, entry_2]), Ok(()));
assert_eq!(process_entries(&bank, &[entry_1, entry_2], true), Ok(()));
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
assert_eq!(bank.get_balance(&keypair4.pubkey()), 1);
assert_eq!(bank.last_blockhash(), blockhash);
}
#[test]
fn test_process_entry_tx_random_execution_no_error() {
// entropy multiplier should be big enough to provide sufficient entropy
// but small enough to not take too much time while executing the test.
let entropy_multiplier: usize = 25;
let initial_lamports = 100;
// number of accounts need to be in multiple of 4 for correct
// execution of the test.
let num_accounts = entropy_multiplier * 4;
let GenesisBlockInfo {
genesis_block,
mint_keypair,
..
} = create_genesis_block((num_accounts + 1) as u64 * initial_lamports);
let bank = Bank::new(&genesis_block);
let mut keypairs: Vec<Keypair> = vec![];
for _ in 0..num_accounts {
let keypair = Keypair::new();
let create_account_tx = system_transaction::create_user_account(
&mint_keypair,
&keypair.pubkey(),
0,
bank.last_blockhash(),
);
assert_eq!(bank.process_transaction(&create_account_tx), Ok(()));
assert_matches!(
bank.transfer(initial_lamports, &mint_keypair, &keypair.pubkey()),
Ok(_)
);
keypairs.push(keypair);
}
let mut tx_vector: Vec<Transaction> = vec![];
for i in (0..num_accounts).step_by(4) {
tx_vector.append(&mut vec![
system_transaction::transfer(
&keypairs[i + 1],
&keypairs[i].pubkey(),
initial_lamports,
bank.last_blockhash(),
),
system_transaction::transfer(
&keypairs[i + 3],
&keypairs[i + 2].pubkey(),
initial_lamports,
bank.last_blockhash(),
),
]);
}
// Transfer lamports to each other
let entry = next_entry(&bank.last_blockhash(), 1, tx_vector);
assert_eq!(process_entries(&bank, &vec![entry], true), Ok(()));
bank.squash();
// Even number keypair should have balance of 2 * initial_lamports and
// odd number keypair should have balance of 0, which proves
// that even in case of random order of execution, overall state remains
// consistent.
for i in 0..num_accounts {
if i % 2 == 0 {
assert_eq!(
bank.get_balance(&keypairs[i].pubkey()),
2 * initial_lamports
);
} else {
assert_eq!(bank.get_balance(&keypairs[i].pubkey()), 0);
}
}
}
#[test]
fn test_process_entries_2_entries_tick() {
let GenesisBlockInfo {
@ -1239,7 +1355,11 @@ pub mod tests {
);
let entry_2 = next_entry(&tick.hash, 1, vec![tx]);
assert_eq!(
process_entries(&bank, &[entry_1.clone(), tick.clone(), entry_2.clone()]),
process_entries(
&bank,
&[entry_1.clone(), tick.clone(), entry_2.clone()],
true
),
Ok(())
);
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
@ -1254,7 +1374,7 @@ pub mod tests {
);
let entry_3 = next_entry(&entry_2.hash, 1, vec![tx]);
assert_eq!(
process_entries(&bank, &[entry_3]),
process_entries(&bank, &[entry_3], true),
Err(TransactionError::AccountNotFound)
);
}
@ -1335,7 +1455,7 @@ pub mod tests {
);
assert_eq!(
process_entries(&bank, &[entry_1_to_mint]),
process_entries(&bank, &[entry_1_to_mint], false),
Err(TransactionError::AccountInUse)
);
@ -1461,7 +1581,7 @@ pub mod tests {
})
.collect();
info!("paying iteration {}", i);
process_entries(&bank, &entries).expect("paying failed");
process_entries(&bank, &entries, true).expect("paying failed");
let entries: Vec<_> = (0..NUM_TRANSFERS)
.map(|i| {
@ -1479,7 +1599,7 @@ pub mod tests {
.collect();
info!("refunding iteration {}", i);
process_entries(&bank, &entries).expect("refunding failed");
process_entries(&bank, &entries, true).expect("refunding failed");
// advance to next block
process_entries(
@ -1487,6 +1607,7 @@ pub mod tests {
&(0..bank.ticks_per_slot())
.map(|_| next_entry_mut(&mut hash, 1, vec![]))
.collect::<Vec<_>>(),
true,
)
.expect("process ticks failed");

View File

@ -189,7 +189,7 @@ pub fn hash_transactions(transactions: &[Transaction]) -> Hash {
/// a signature, the final hash will be a hash of both the previous ID and
/// the signature. If num_hashes is zero and there's no transaction data,
/// start_hash is returned.
fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -> Hash {
pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -> Hash {
if num_hashes == 0 && transactions.is_empty() {
return *start_hash;
}

View File

@ -726,7 +726,7 @@ impl ReplayStage {
);
return Err(Error::BlobError(BlobError::VerificationFailed));
}
blocktree_processor::process_entries(bank, entries)?;
blocktree_processor::process_entries(bank, entries, true)?;
Ok(())
}

View File

@ -743,6 +743,7 @@ mod tests {
blocktree_processor::process_entries(
&bank,
&entry::create_ticks(64, bank.last_blockhash()),
true,
)
.expect("failed process entries");
last_bank = Arc::new(bank);
@ -863,6 +864,7 @@ mod tests {
DEFAULT_TICKS_PER_SLOT * next_bank.slots_per_segment() + 1,
bank.last_blockhash(),
),
true,
)
.unwrap();
let message = Message::new_with_payer(vec![mining_proof_ix], Some(&mint_keypair.pubkey()));