improved error handling and atomic transactions

This commit is contained in:
Robert Kelly
2018-05-01 16:22:33 -04:00
parent 6af3680f99
commit ccb478c1f6
2 changed files with 46 additions and 16 deletions

View File

@ -14,6 +14,7 @@ use rayon::prelude::*;
use signature::{KeyPair, PublicKey, Signature}; use signature::{KeyPair, PublicKey, Signature};
use std::collections::hash_map::Entry::Occupied; use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicIsize, Ordering};
use std::result; use std::result;
use std::sync::RwLock; use std::sync::RwLock;
use transaction::Transaction; use transaction::Transaction;
@ -23,6 +24,7 @@ pub const MAX_ENTRY_IDS: usize = 1024 * 4;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum AccountingError { pub enum AccountingError {
AccountNotFound, AccountNotFound,
BalanceUpdatedBeforeTransactionCompleted,
InsufficientFunds, InsufficientFunds,
InvalidTransferSignature, InvalidTransferSignature,
} }
@ -30,18 +32,18 @@ pub enum AccountingError {
pub type Result<T> = result::Result<T, AccountingError>; pub type Result<T> = result::Result<T, AccountingError>;
/// Commit funds to the 'to' party. /// Commit funds to the 'to' party.
fn apply_payment(balances: &RwLock<HashMap<PublicKey, RwLock<i64>>>, payment: &Payment) { fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &Payment) {
if balances.read().unwrap().contains_key(&payment.to) { if balances.read().unwrap().contains_key(&payment.to) {
let bals = balances.read().unwrap(); let bals = balances.read().unwrap();
*bals[&payment.to].write().unwrap() += payment.tokens; bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
} else { } else {
let mut bals = balances.write().unwrap(); let mut bals = balances.write().unwrap();
bals.insert(payment.to, RwLock::new(payment.tokens)); bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize));
} }
} }
pub struct Accountant { pub struct Accountant {
balances: RwLock<HashMap<PublicKey, RwLock<i64>>>, balances: RwLock<HashMap<PublicKey, AtomicIsize>>,
pending: RwLock<HashMap<Signature, Plan>>, pending: RwLock<HashMap<Signature, Plan>>,
last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>, last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>,
time_sources: RwLock<HashSet<PublicKey>>, time_sources: RwLock<HashSet<PublicKey>>,
@ -131,23 +133,34 @@ impl Accountant {
// Hold a write lock before the condition check, so that a debit can't occur // Hold a write lock before the condition check, so that a debit can't occur
// between checking the balance and the withdraw. // between checking the balance and the withdraw.
let option = bals.get(&tr.from); let option = bals.get(&tr.from);
if option.is_none() { if option.is_none() {
return Err(AccountingError::AccountNotFound); return Err(AccountingError::AccountNotFound);
} }
let mut bal = option.unwrap().write().unwrap();
if !self.reserve_signature_with_last_id(&tr.sig, &tr.data.last_id) { if !self.reserve_signature_with_last_id(&tr.sig, &tr.data.last_id) {
return Err(AccountingError::InvalidTransferSignature); return Err(AccountingError::InvalidTransferSignature);
} }
if *bal < tr.data.tokens { let bal = option.unwrap();
let current = bal.load(Ordering::Relaxed) as i64;
if current < tr.data.tokens {
self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id); self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id);
return Err(AccountingError::InsufficientFunds); return Err(AccountingError::InsufficientFunds);
} }
*bal -= tr.data.tokens; let result = bal.compare_exchange(
current as isize,
(current - tr.data.tokens) as isize,
Ordering::Relaxed,
Ordering::Relaxed,
);
Ok(()) return match result {
Ok(_) => Ok(()),
Err(_) => Err(AccountingError::BalanceUpdatedBeforeTransactionCompleted),
}
} }
pub fn process_verified_transaction_credits(&self, tr: &Transaction) { pub fn process_verified_transaction_credits(&self, tr: &Transaction) {
@ -164,9 +177,15 @@ impl Accountant {
/// Process a Transaction that has already been verified. /// Process a Transaction that has already been verified.
pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> { pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> {
self.process_verified_transaction_debits(tr)?; return match self.process_verified_transaction_debits(tr) {
self.process_verified_transaction_credits(tr); Ok(_) => {
Ok(()) self.process_verified_transaction_credits(tr);
Ok(())
},
Err(err) => {
Err(err)
}
};
} }
/// Process a batch of verified transactions. /// Process a batch of verified transactions.
@ -174,7 +193,11 @@ impl Accountant {
// Run all debits first to filter out any transactions that can't be processed // Run all debits first to filter out any transactions that can't be processed
// in parallel deterministically. // in parallel deterministically.
let results: Vec<_> = trs.into_par_iter() let results: Vec<_> = trs.into_par_iter()
.map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) .filter_map(|tr| match self.process_verified_transaction_debits(&tr) {
Ok(_x) => Some(Ok(tr)),
Err(_e) => None,
})
// .flat_map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr))
.collect(); // Calling collect() here forces all debits to complete before moving on. .collect(); // Calling collect() here forces all debits to complete before moving on.
results results
@ -300,7 +323,7 @@ impl Accountant {
pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> { pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
let bals = self.balances.read().unwrap(); let bals = self.balances.read().unwrap();
bals.get(pubkey).map(|x| *x.read().unwrap()) bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64)
} }
} }

View File

@ -757,6 +757,7 @@ mod bench {
// Create transactions between unrelated parties. // Create transactions between unrelated parties.
let txs = 100_000; let txs = 100_000;
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new()); let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
let errors: Mutex<usize> = Mutex::new(0);
let transactions: Vec<_> = (0..txs) let transactions: Vec<_> = (0..txs)
.into_par_iter() .into_par_iter()
.map(|i| { .map(|i| {
@ -774,11 +775,17 @@ mod bench {
// Seed the 'from' account. // Seed the 'from' account.
let rando0 = KeyPair::new(); let rando0 = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
acc.process_verified_transaction(&tr).unwrap(); // some of these will fail because balance updates before transaction completes
match acc.process_verified_transaction(&tr) {
Ok(_) => (),
Err(_) => *errors.lock().unwrap() += 1,
};
let rando1 = KeyPair::new(); let rando1 = KeyPair::new();
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
acc.process_verified_transaction(&tr).unwrap(); // these will fail if the prior transaction does not go through
// but won't typically fail otherwise since the addresses are randomly generated
let _ = acc.process_verified_transaction(&tr);
// Finally, return a transaction that's unique // Finally, return a transaction that's unique
Transaction::new(&rando0, rando1.pubkey(), 1, last_id) Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
@ -803,7 +810,7 @@ mod bench {
drop(skel.historian.sender); drop(skel.historian.sender);
let entries: Vec<Entry> = skel.historian.receiver.iter().collect(); let entries: Vec<Entry> = skel.historian.receiver.iter().collect();
assert_eq!(entries.len(), 1); assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize); assert_eq!(entries[0].events.len() + *errors.lock().unwrap(), txs as usize);
println!("{} tps", tps); println!("{} tps", tps);
} }