committed by
GitHub
parent
d09889b1dd
commit
2bd877528f
181
src/bank.rs
181
src/bank.rs
@ -19,6 +19,7 @@ use log::Level;
|
|||||||
use mint::Mint;
|
use mint::Mint;
|
||||||
use payment_plan::Payment;
|
use payment_plan::Payment;
|
||||||
use poh_recorder::PohRecorder;
|
use poh_recorder::PohRecorder;
|
||||||
|
use rayon::prelude::*;
|
||||||
use rpc::RpcSignatureStatus;
|
use rpc::RpcSignatureStatus;
|
||||||
use signature::Keypair;
|
use signature::Keypair;
|
||||||
use signature::Signature;
|
use signature::Signature;
|
||||||
@ -931,9 +932,62 @@ impl Bank {
|
|||||||
|
|
||||||
/// Process an ordered list of entries.
|
/// Process an ordered list of entries.
|
||||||
pub fn process_entries(&self, entries: &[Entry]) -> Result<()> {
|
pub fn process_entries(&self, entries: &[Entry]) -> Result<()> {
|
||||||
for entry in entries {
|
self.par_process_entries(entries)
|
||||||
self.process_entry(&entry)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn first_err(results: &[Result<()>]) -> Result<()> {
|
||||||
|
for r in results {
|
||||||
|
r.clone()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub fn par_execute_entries(&self, entries: &[(&Entry, Vec<Result<()>>)]) -> Result<()> {
|
||||||
|
inc_new_counter_info!("bank-par_execute_entries-count", entries.len());
|
||||||
|
let results: Vec<Result<()>> = entries
|
||||||
|
.into_par_iter()
|
||||||
|
.map(|(e, locks)| {
|
||||||
|
let results = self.execute_and_commit_transactions(
|
||||||
|
&e.transactions,
|
||||||
|
locks.to_vec(),
|
||||||
|
MAX_ENTRY_IDS,
|
||||||
|
);
|
||||||
|
self.unlock_accounts(&e.transactions, &results);
|
||||||
|
Self::first_err(&results)
|
||||||
|
}).collect();
|
||||||
|
Self::first_err(&results)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// process entries in parallel
|
||||||
|
/// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry
|
||||||
|
/// 2. Process the locked group in parallel
|
||||||
|
/// 3. Register the `Tick` if it's available, goto 1
|
||||||
|
pub fn par_process_entries(&self, entries: &[Entry]) -> Result<()> {
|
||||||
|
// accumulator for entries that can be processed in parallel
|
||||||
|
let mut mt_group = vec![];
|
||||||
|
for entry in entries {
|
||||||
|
if entry.transactions.is_empty() {
|
||||||
|
// if its a tick, execute the group and register the tick
|
||||||
|
self.par_execute_entries(&mt_group)?;
|
||||||
|
self.register_entry_id(&entry.id);
|
||||||
|
mt_group = vec![];
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// try to lock the accounts
|
||||||
|
let locked = self.lock_accounts(&entry.transactions);
|
||||||
|
// if any of the locks error out
|
||||||
|
// execute the current group
|
||||||
|
if Self::first_err(&locked).is_err() {
|
||||||
|
self.par_execute_entries(&mt_group)?;
|
||||||
|
mt_group = vec![];
|
||||||
|
//reset the lock and push the entry
|
||||||
|
let locked = self.lock_accounts(&entry.transactions);
|
||||||
|
mt_group.push((entry, locked));
|
||||||
|
} else {
|
||||||
|
// push the entry to the mt_group
|
||||||
|
mt_group.push((entry, locked));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.par_execute_entries(&mt_group)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1906,4 +1960,127 @@ mod tests {
|
|||||||
// hash3 is not in the q
|
// hash3 is not in the q
|
||||||
assert_eq!(Bank::check_entry_id_age(&q, hash3, 3), false);
|
assert_eq!(Bank::check_entry_id_age(&q, hash3, 3), false);
|
||||||
}
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_first_err() {
|
||||||
|
assert_eq!(Bank::first_err(&[Ok(())]), Ok(()));
|
||||||
|
assert_eq!(
|
||||||
|
Bank::first_err(&[Ok(()), Err(BankError::DuplicateSignature)]),
|
||||||
|
Err(BankError::DuplicateSignature)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
Bank::first_err(&[
|
||||||
|
Ok(()),
|
||||||
|
Err(BankError::DuplicateSignature),
|
||||||
|
Err(BankError::AccountInUse)
|
||||||
|
]),
|
||||||
|
Err(BankError::DuplicateSignature)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
Bank::first_err(&[
|
||||||
|
Ok(()),
|
||||||
|
Err(BankError::AccountInUse),
|
||||||
|
Err(BankError::DuplicateSignature)
|
||||||
|
]),
|
||||||
|
Err(BankError::AccountInUse)
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
Bank::first_err(&[
|
||||||
|
Err(BankError::AccountInUse),
|
||||||
|
Ok(()),
|
||||||
|
Err(BankError::DuplicateSignature)
|
||||||
|
]),
|
||||||
|
Err(BankError::AccountInUse)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_par_process_entries_tick() {
|
||||||
|
let mint = Mint::new(1000);
|
||||||
|
let bank = Bank::new(&mint);
|
||||||
|
|
||||||
|
// ensure bank can process a tick
|
||||||
|
let tick = next_entry(&mint.last_id(), 1, vec![]);
|
||||||
|
assert_eq!(bank.par_process_entries(&[tick.clone()]), Ok(()));
|
||||||
|
assert_eq!(bank.last_id(), tick.id);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_par_process_entries_2_entries_collision() {
|
||||||
|
let mint = Mint::new(1000);
|
||||||
|
let bank = Bank::new(&mint);
|
||||||
|
let keypair1 = Keypair::new();
|
||||||
|
let keypair2 = Keypair::new();
|
||||||
|
|
||||||
|
let last_id = bank.last_id();
|
||||||
|
|
||||||
|
// ensure bank can process 2 entries that have a common account and no tick is registered
|
||||||
|
let tx = Transaction::system_new(&mint.keypair(), keypair1.pubkey(), 2, bank.last_id());
|
||||||
|
let entry_1 = next_entry(&last_id, 1, vec![tx]);
|
||||||
|
let tx = Transaction::system_new(&mint.keypair(), keypair2.pubkey(), 2, bank.last_id());
|
||||||
|
let entry_2 = next_entry(&entry_1.id, 1, vec![tx]);
|
||||||
|
assert_eq!(bank.par_process_entries(&[entry_1, entry_2]), Ok(()));
|
||||||
|
assert_eq!(bank.get_balance(&keypair1.pubkey()), 2);
|
||||||
|
assert_eq!(bank.get_balance(&keypair2.pubkey()), 2);
|
||||||
|
assert_eq!(bank.last_id(), last_id);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_par_process_entries_2_entries_par() {
|
||||||
|
let mint = Mint::new(1000);
|
||||||
|
let bank = Bank::new(&mint);
|
||||||
|
let keypair1 = Keypair::new();
|
||||||
|
let keypair2 = Keypair::new();
|
||||||
|
let keypair3 = Keypair::new();
|
||||||
|
let keypair4 = Keypair::new();
|
||||||
|
|
||||||
|
//load accounts
|
||||||
|
let tx = Transaction::system_new(&mint.keypair(), keypair1.pubkey(), 1, bank.last_id());
|
||||||
|
assert_eq!(bank.process_transaction(&tx), Ok(()));
|
||||||
|
let tx = Transaction::system_new(&mint.keypair(), keypair2.pubkey(), 1, bank.last_id());
|
||||||
|
assert_eq!(bank.process_transaction(&tx), Ok(()));
|
||||||
|
|
||||||
|
// ensure bank can process 2 entries that do not have a common account and no tick is registered
|
||||||
|
let last_id = bank.last_id();
|
||||||
|
let tx = Transaction::system_new(&keypair1, keypair3.pubkey(), 1, bank.last_id());
|
||||||
|
let entry_1 = next_entry(&last_id, 1, vec![tx]);
|
||||||
|
let tx = Transaction::system_new(&keypair2, keypair4.pubkey(), 1, bank.last_id());
|
||||||
|
let entry_2 = next_entry(&entry_1.id, 1, vec![tx]);
|
||||||
|
assert_eq!(bank.par_process_entries(&[entry_1, entry_2]), Ok(()));
|
||||||
|
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
|
||||||
|
assert_eq!(bank.get_balance(&keypair4.pubkey()), 1);
|
||||||
|
assert_eq!(bank.last_id(), last_id);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_par_process_entries_2_entries_tick() {
|
||||||
|
let mint = Mint::new(1000);
|
||||||
|
let bank = Bank::new(&mint);
|
||||||
|
let keypair1 = Keypair::new();
|
||||||
|
let keypair2 = Keypair::new();
|
||||||
|
let keypair3 = Keypair::new();
|
||||||
|
let keypair4 = Keypair::new();
|
||||||
|
|
||||||
|
//load accounts
|
||||||
|
let tx = Transaction::system_new(&mint.keypair(), keypair1.pubkey(), 1, bank.last_id());
|
||||||
|
assert_eq!(bank.process_transaction(&tx), Ok(()));
|
||||||
|
let tx = Transaction::system_new(&mint.keypair(), keypair2.pubkey(), 1, bank.last_id());
|
||||||
|
assert_eq!(bank.process_transaction(&tx), Ok(()));
|
||||||
|
|
||||||
|
let last_id = bank.last_id();
|
||||||
|
|
||||||
|
// ensure bank can process 2 entries that do not have a common account and tick is registered
|
||||||
|
let tx = Transaction::system_new(&keypair2, keypair3.pubkey(), 1, bank.last_id());
|
||||||
|
let entry_1 = next_entry(&last_id, 1, vec![tx]);
|
||||||
|
let new_tick = next_entry(&entry_1.id, 1, vec![]);
|
||||||
|
let tx = Transaction::system_new(&keypair1, keypair4.pubkey(), 1, new_tick.id);
|
||||||
|
let entry_2 = next_entry(&new_tick.id, 1, vec![tx]);
|
||||||
|
assert_eq!(
|
||||||
|
bank.par_process_entries(&[entry_1.clone(), new_tick.clone(), entry_2]),
|
||||||
|
Ok(())
|
||||||
|
);
|
||||||
|
assert_eq!(bank.get_balance(&keypair3.pubkey()), 1);
|
||||||
|
assert_eq!(bank.get_balance(&keypair4.pubkey()), 1);
|
||||||
|
assert_eq!(bank.last_id(), new_tick.id);
|
||||||
|
// ensure that errors are returned
|
||||||
|
assert_eq!(
|
||||||
|
bank.par_process_entries(&[entry_1]),
|
||||||
|
Err(BankError::AccountNotFound)
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user