diff --git a/src/accountant.rs b/src/accountant.rs index 683474efa7..472cf2f19c 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -14,6 +14,7 @@ use rayon::prelude::*; use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::atomic::{AtomicIsize, Ordering}; use std::result; use std::sync::RwLock; use transaction::Transaction; @@ -23,6 +24,7 @@ pub const MAX_ENTRY_IDS: usize = 1024 * 4; #[derive(Debug, PartialEq, Eq)] pub enum AccountingError { AccountNotFound, + BalanceUpdatedBeforeTransactionCompleted, InsufficientFunds, InvalidTransferSignature, } @@ -30,18 +32,18 @@ pub enum AccountingError { pub type Result = result::Result; /// Commit funds to the 'to' party. -fn apply_payment(balances: &RwLock>>, payment: &Payment) { +fn apply_payment(balances: &RwLock>, payment: &Payment) { if balances.read().unwrap().contains_key(&payment.to) { let bals = balances.read().unwrap(); - *bals[&payment.to].write().unwrap() += payment.tokens; + bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); } else { let mut bals = balances.write().unwrap(); - bals.insert(payment.to, RwLock::new(payment.tokens)); + bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize)); } } pub struct Accountant { - balances: RwLock>>, + balances: RwLock>, pending: RwLock>, last_ids: RwLock>)>>, time_sources: RwLock>, @@ -131,23 +133,34 @@ impl Accountant { // Hold a write lock before the condition check, so that a debit can't occur // between checking the balance and the withdraw. let option = bals.get(&tr.from); + if option.is_none() { return Err(AccountingError::AccountNotFound); } - let mut bal = option.unwrap().write().unwrap(); if !self.reserve_signature_with_last_id(&tr.sig, &tr.data.last_id) { return Err(AccountingError::InvalidTransferSignature); } - if *bal < tr.data.tokens { + let bal = option.unwrap(); + let current = bal.load(Ordering::Relaxed) as i64; + + if current < tr.data.tokens { self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id); return Err(AccountingError::InsufficientFunds); } - *bal -= tr.data.tokens; + let result = bal.compare_exchange( + current as isize, + (current - tr.data.tokens) as isize, + Ordering::Relaxed, + Ordering::Relaxed, + ); - Ok(()) + return match result { + Ok(_) => Ok(()), + Err(_) => Err(AccountingError::BalanceUpdatedBeforeTransactionCompleted), + } } pub fn process_verified_transaction_credits(&self, tr: &Transaction) { @@ -164,9 +177,15 @@ impl Accountant { /// Process a Transaction that has already been verified. pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> { - self.process_verified_transaction_debits(tr)?; - self.process_verified_transaction_credits(tr); - Ok(()) + return match self.process_verified_transaction_debits(tr) { + Ok(_) => { + self.process_verified_transaction_credits(tr); + Ok(()) + }, + Err(err) => { + Err(err) + } + }; } /// Process a batch of verified transactions. @@ -174,7 +193,11 @@ impl Accountant { // Run all debits first to filter out any transactions that can't be processed // in parallel deterministically. let results: Vec<_> = trs.into_par_iter() - .map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) + .filter_map(|tr| match self.process_verified_transaction_debits(&tr) { + Ok(_x) => Some(Ok(tr)), + Err(_e) => None, + }) + // .flat_map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) .collect(); // Calling collect() here forces all debits to complete before moving on. results @@ -300,7 +323,7 @@ impl Accountant { pub fn get_balance(&self, pubkey: &PublicKey) -> Option { let bals = self.balances.read().unwrap(); - bals.get(pubkey).map(|x| *x.read().unwrap()) + bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) } } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 9712c6c031..598cc54e58 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -757,6 +757,7 @@ mod bench { // Create transactions between unrelated parties. let txs = 100_000; let last_ids: Mutex> = Mutex::new(HashSet::new()); + let errors: Mutex = Mutex::new(0); let transactions: Vec<_> = (0..txs) .into_par_iter() .map(|i| { @@ -774,11 +775,17 @@ mod bench { // Seed the 'from' account. let rando0 = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); - acc.process_verified_transaction(&tr).unwrap(); + // some of these will fail because balance updates before transaction completes + match acc.process_verified_transaction(&tr) { + Ok(_) => (), + Err(_) => *errors.lock().unwrap() += 1, + }; let rando1 = KeyPair::new(); let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); - acc.process_verified_transaction(&tr).unwrap(); + // these will fail if the prior transaction does not go through + // but won't typically fail otherwise since the addresses are randomly generated + let _ = acc.process_verified_transaction(&tr); // Finally, return a transaction that's unique Transaction::new(&rando0, rando1.pubkey(), 1, last_id) @@ -803,7 +810,7 @@ mod bench { drop(skel.historian.sender); let entries: Vec = skel.historian.receiver.iter().collect(); assert_eq!(entries.len(), 1); - assert_eq!(entries[0].events.len(), txs as usize); + assert_eq!(entries[0].events.len() + *errors.lock().unwrap(), txs as usize); println!("{} tps", tps); }