Support parallelization of arbitrary transactions

Still assumes witnesses are processed serially afterward.
This commit is contained in:
Greg Fitzgerald
2018-04-05 22:13:54 -06:00
parent c12da50f9b
commit 491a530d90

View File

@ -10,6 +10,7 @@ use event::Event;
use hash::Hash; use hash::Hash;
use mint::Mint; use mint::Mint;
use plan::{Payment, Plan, Witness}; use plan::{Payment, Plan, Witness};
use rayon::prelude::*;
use signature::{KeyPair, PublicKey, Signature}; use signature::{KeyPair, PublicKey, Signature};
use std::collections::hash_map::Entry::Occupied; use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
@ -150,6 +151,16 @@ impl Accountant {
Ok(()) Ok(())
} }
/// Process a batch of verified transactions.
pub fn process_verified_transactions(&self, trs: &[Transaction]) -> Vec<Result<()>> {
// Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically.
trs.par_iter()
.map(|tr| self.process_verified_transaction_debits(tr).map(|_| tr))
.map(|result| result.map(|tr| self.process_verified_transaction_credits(tr)))
.collect()
}
/// Process a Witness Signature that has already been verified. /// Process a Witness Signature that has already been verified.
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
@ -401,7 +412,6 @@ mod bench {
extern crate test; extern crate test;
use self::test::Bencher; use self::test::Bencher;
use accountant::*; use accountant::*;
use rayon::prelude::*;
use signature::KeyPairUtil; use signature::KeyPairUtil;
use hash::hash; use hash::hash;
use bincode::serialize; use bincode::serialize;
@ -437,9 +447,11 @@ mod bench {
sigs.write().unwrap().clear(); sigs.write().unwrap().clear();
} }
transactions.par_iter().for_each(|tr| { assert!(
acc.process_verified_transaction(tr).unwrap(); acc.process_verified_transactions(&transactions)
}); .iter()
.all(|x| x.is_ok())
);
}); });
} }
} }