diff --git a/src/accountant.rs b/src/accountant.rs index 7f8a4de0d3..7c8a5e5c49 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -10,6 +10,7 @@ use event::Event; use hash::Hash; use mint::Mint; use plan::{Payment, Plan, Witness}; +use rayon::prelude::*; use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; @@ -21,6 +22,7 @@ const MAX_ENTRY_IDS: usize = 1024 * 4; #[derive(Debug, PartialEq, Eq)] pub enum AccountingError { + AccountNotFound, InsufficientFunds, InvalidTransferSignature, } @@ -104,9 +106,20 @@ impl Accountant { last_ids.push_back((*last_id, RwLock::new(HashSet::new()))); } - /// Process a Transaction that has already been verified. - pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> { - if self.get_balance(&tr.from).unwrap_or(0) < tr.data.tokens { + /// Deduct tokens from the 'from' address the account has sufficient + /// funds and isn't a duplicate. + pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> { + let bals = self.balances.read().unwrap(); + + // Hold a write lock before the condition check, so that a debit can't occur + // between checking the balance and the withdraw. + let option = bals.get(&tr.from); + if option.is_none() { + return Err(AccountingError::AccountNotFound); + } + let mut bal = option.unwrap().write().unwrap(); + + if *bal < tr.data.tokens { return Err(AccountingError::InsufficientFunds); } @@ -114,22 +127,40 @@ impl Accountant { return Err(AccountingError::InvalidTransferSignature); } - if let Some(x) = self.balances.read().unwrap().get(&tr.from) { - *x.write().unwrap() -= tr.data.tokens; - } + *bal -= tr.data.tokens; + Ok(()) + } + + pub fn process_verified_transaction_credits(&self, tr: &Transaction) { let mut plan = tr.data.plan.clone(); plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); } else { - self.pending.write().unwrap().insert(tr.sig, plan); + let mut pending = self.pending.write().unwrap(); + pending.insert(tr.sig, plan); } + } + /// Process a Transaction that has already been verified. + pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> { + self.process_verified_transaction_debits(tr)?; + self.process_verified_transaction_credits(tr); Ok(()) } + /// Process a batch of verified transactions. + pub fn process_verified_transactions(&self, trs: &[Transaction]) -> Vec> { + // Run all debits first to filter out any transactions that can't be processed + // in parallel deterministically. + trs.par_iter() + .map(|tr| self.process_verified_transaction_debits(tr).map(|_| tr)) + .map(|result| result.map(|tr| self.process_verified_transaction_credits(tr))) + .collect() + } + /// Process a Witness Signature that has already been verified. fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { @@ -228,9 +259,9 @@ impl Accountant { #[cfg(test)] mod tests { use super::*; - use signature::KeyPairUtil; - use hash::hash; use bincode::serialize; + use hash::hash; + use signature::KeyPairUtil; #[test] fn test_accountant() { @@ -246,6 +277,16 @@ mod tests { assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); } + #[test] + fn test_account_not_found() { + let mint = Mint::new(1); + let acc = Accountant::new(&mint); + assert_eq!( + acc.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()), + Err(AccountingError::AccountNotFound) + ); + } + #[test] fn test_invalid_transfer() { let alice = Mint::new(11_000); @@ -371,10 +412,9 @@ mod bench { extern crate test; use self::test::Bencher; use accountant::*; - use rayon::prelude::*; - use signature::KeyPairUtil; - use hash::hash; use bincode::serialize; + use hash::hash; + use signature::KeyPairUtil; #[bench] fn process_verified_event_bench(bencher: &mut Bencher) { @@ -407,9 +447,11 @@ mod bench { sigs.write().unwrap().clear(); } - transactions.par_iter().for_each(|tr| { - acc.process_verified_transaction(tr).unwrap(); - }); + assert!( + acc.process_verified_transactions(&transactions) + .iter() + .all(|x| x.is_ok()) + ); }); } } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index d827b2d5ad..e45efb83e1 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -4,9 +4,9 @@ use accountant::Accountant; use bincode::{deserialize, serialize}; +use ecdsa; use entry::Entry; use event::Event; -use ecdsa; use hash::Hash; use historian::Historian; use packet; diff --git a/src/ecdsa.rs b/src/ecdsa.rs index f3ede709b9..9594d01676 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -8,9 +8,9 @@ use std::mem::size_of; #[cfg(not(feature = "cuda"))] use rayon::prelude::*; #[cfg(not(feature = "cuda"))] -use untrusted; -#[cfg(not(feature = "cuda"))] use ring::signature; +#[cfg(not(feature = "cuda"))] +use untrusted; // Shared imports use packet::{Packet, SharedPackets}; diff --git a/src/lib.rs b/src/lib.rs index 1c8ed2ebad..706e61fb87 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,9 +2,9 @@ pub mod accountant; pub mod accountant_skel; pub mod accountant_stub; +pub mod ecdsa; pub mod entry; pub mod event; -pub mod ecdsa; pub mod hash; pub mod historian; pub mod ledger;