Per-cell locking

This allows us to use read-locks for balances most of the time. We
only lock the full table if we need to add one.
This commit is contained in:
Greg Fitzgerald
2018-04-04 16:31:11 -06:00
parent dc2ec925d7
commit 76679ffb92

View File

@ -24,12 +24,18 @@ pub enum AccountingError {
pub type Result<T> = result::Result<T, AccountingError>; pub type Result<T> = result::Result<T, AccountingError>;
/// Commit funds to the 'to' party. /// Commit funds to the 'to' party.
fn apply_payment(balances: &mut HashMap<PublicKey, i64>, payment: &Payment) { fn apply_payment(balances: &RwLock<HashMap<PublicKey, RwLock<i64>>>, payment: &Payment) {
*balances.entry(payment.to).or_insert(0) += payment.tokens; if balances.read().unwrap().contains_key(&payment.to) {
let bals = balances.read().unwrap();
*bals[&payment.to].write().unwrap() += payment.tokens;
} else {
let mut bals = balances.write().unwrap();
bals.insert(payment.to, RwLock::new(payment.tokens));
}
} }
pub struct Accountant { pub struct Accountant {
balances: RwLock<HashMap<PublicKey, i64>>, balances: RwLock<HashMap<PublicKey, RwLock<i64>>>,
pending: RwLock<HashMap<Signature, Plan>>, pending: RwLock<HashMap<Signature, Plan>>,
signatures: RwLock<HashSet<Signature>>, signatures: RwLock<HashSet<Signature>>,
time_sources: RwLock<HashSet<PublicKey>>, time_sources: RwLock<HashSet<PublicKey>>,
@ -39,10 +45,10 @@ pub struct Accountant {
impl Accountant { impl Accountant {
/// Create an Accountant using a deposit. /// Create an Accountant using a deposit.
pub fn new_from_deposit(deposit: &Payment) -> Self { pub fn new_from_deposit(deposit: &Payment) -> Self {
let mut balances = HashMap::new(); let balances = RwLock::new(HashMap::new());
apply_payment(&mut balances, deposit); apply_payment(&balances, deposit);
Accountant { Accountant {
balances: RwLock::new(balances), balances,
pending: RwLock::new(HashMap::new()), pending: RwLock::new(HashMap::new()),
signatures: RwLock::new(HashSet::new()), signatures: RwLock::new(HashSet::new()),
time_sources: RwLock::new(HashSet::new()), time_sources: RwLock::new(HashSet::new()),
@ -77,15 +83,15 @@ impl Accountant {
return Err(AccountingError::InvalidTransferSignature); return Err(AccountingError::InvalidTransferSignature);
} }
if let Some(x) = self.balances.write().unwrap().get_mut(&tr.from) { if let Some(x) = self.balances.read().unwrap().get(&tr.from) {
*x -= tr.tokens; *x.write().unwrap() -= tr.tokens;
} }
let mut plan = tr.plan.clone(); let mut plan = tr.plan.clone();
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap()));
if let Some(ref payment) = plan.final_payment() { if let Some(ref payment) = plan.final_payment() {
apply_payment(&mut self.balances.write().unwrap(), payment); apply_payment(&self.balances, payment);
} else { } else {
self.pending.write().unwrap().insert(tr.sig, plan); self.pending.write().unwrap().insert(tr.sig, plan);
} }
@ -98,7 +104,7 @@ impl Accountant {
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
e.get_mut().apply_witness(&Witness::Signature(from)); e.get_mut().apply_witness(&Witness::Signature(from));
if let Some(ref payment) = e.get().final_payment() { if let Some(ref payment) = e.get().final_payment() {
apply_payment(&mut self.balances.write().unwrap(), payment); apply_payment(&self.balances, payment);
e.remove_entry(); e.remove_entry();
} }
}; };
@ -131,7 +137,7 @@ impl Accountant {
for (key, plan) in pending.iter_mut() { for (key, plan) in pending.iter_mut() {
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap()));
if let Some(ref payment) = plan.final_payment() { if let Some(ref payment) = plan.final_payment() {
apply_payment(&mut self.balances.write().unwrap(), payment); apply_payment(&self.balances, payment);
completed.push(key.clone()); completed.push(key.clone());
} }
} }
@ -183,7 +189,8 @@ impl Accountant {
} }
pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> { pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
self.balances.read().unwrap().get(pubkey).cloned() let bals = self.balances.read().unwrap();
bals.get(pubkey).map(|x| *x.read().unwrap())
} }
} }
@ -329,15 +336,24 @@ mod bench {
let transactions: Vec<_> = (0..4096) let transactions: Vec<_> = (0..4096)
.into_par_iter() .into_par_iter()
.map(|_| { .map(|_| {
// Seed the 'from' account.
let rando0 = KeyPair::new(); let rando0 = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id()); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id());
acc.process_verified_transaction(&tr).unwrap(); acc.process_verified_transaction(&tr).unwrap();
// Seed the 'to' account.
let rando1 = KeyPair::new(); let rando1 = KeyPair::new();
let tr = Transaction::new(&rando0, rando1.pubkey(), 1, mint.last_id());
acc.process_verified_transaction(&tr).unwrap();
// Finally, return a transaction that's unique
Transaction::new(&rando0, rando1.pubkey(), 1, mint.last_id()) Transaction::new(&rando0, rando1.pubkey(), 1, mint.last_id())
}) })
.collect(); .collect();
bencher.iter(|| { bencher.iter(|| {
// Since benchmarker runs this multiple times, we need to clear the signatures.
acc.signatures.write().unwrap().clear(); acc.signatures.write().unwrap().clear();
transactions.par_iter().for_each(|tr| { transactions.par_iter().for_each(|tr| {
acc.process_verified_transaction(tr).unwrap(); acc.process_verified_transaction(tr).unwrap();
}); });