diff --git a/src/bank.rs b/src/bank.rs index 2f90b8033c..ffd9271273 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -10,17 +10,15 @@ use entry::Entry; use hash::Hash; use mint::Mint; use payment_plan::{Payment, PaymentPlan, Witness}; -use rayon::prelude::*; use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; use std::result; -use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::RwLock; -use transaction::{Instruction, Plan, Transaction}; use std::time::Instant; -use std::cell::Cell; use timing::duration_as_us; +use transaction::{Instruction, Plan, Transaction}; /// The number of most recent `last_id` values that the bank will track the signatures /// of. Once the bank discards a `last_id`, it will reject any transactions that use @@ -61,9 +59,6 @@ pub type Result = result::Result; pub struct Bank { /// A map of account public keys to the balance in that account. balances: RwLock>, - //balances: RwLock>>, - //balances: RwLock>, - /// A map of smart contract transaction signatures to what remains of its payment /// plan. Each transaction that targets the plan should cause it to be reduced. @@ -71,12 +66,13 @@ pub struct Bank { pending: RwLock>, /// A FIFO queue of `last_id` items, where each item is a set of signatures - /// that have been processed using that `last_id`. The bank uses this data to - /// reject transactions with signatures its seen before as well as `last_id` - /// values that are so old that its `last_id` has been pulled out of the queue. + /// that have been processed using that `last_id`. Rejected `last_id` + /// values are so old that the `last_id` has been pulled out of the queue. last_ids: RwLock>, - last_ids_sigs: RwLock>>>, + // Mapping of hashes to signature sets. The bank uses this data to + /// reject transactions with signatures its seen before + last_ids_sigs: RwLock>>, /// The set of trusted timekeepers. A Timestamp transaction from a `PublicKey` /// outside this set will be discarded. Note that if validators do not have the @@ -128,37 +124,6 @@ impl Bank { } } - /// Commit funds to the 'to' party. - /*fn apply_payment(&self, payment: &Payment) { - // First we check balances with a read lock to maximize potential parallelization. - if self.balances - .read() - .expect("'balances' read lock in apply_payment") - .contains_key(&payment.to) - { - let mut bals = self.balances.write().expect("'balances' read lock"); - //bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); - // *bals[&payment.to].write().unwrap() += payment.tokens; - let x = bals.get_mut(&payment.to).unwrap(); - *x += payment.tokens; - //trace!("updated balance to {}", bals[&payment.to].load(Ordering::Relaxed)); - } else { - // Now we know the key wasn't present a nanosecond ago, but it might be there - // by the time we aquire a write lock, so we'll have to check again. - let mut bals = self.balances.write().expect("'balances' write lock"); - if bals.contains_key(&payment.to) { - let x= bals.get_mut(&payment.to).unwrap(); - *x += payment.tokens; - // *bals[&payment.to].write().unwrap() += payment.tokens; - //bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); - } else { - //bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize)); - //bals.insert(payment.to, RwLock::new(payment.tokens)); - bals.insert(payment.to, payment.tokens); - } - } - }*/ - /// Return the last entry ID registered. pub fn last_id(&self) -> Hash { let last_ids = self.last_ids.read().expect("'last_ids' read lock"); @@ -167,42 +132,25 @@ impl Bank { } /// Store the given signature. The bank will reject any transaction with the same signature. - fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> Result<()> { - let mut sigs_l = signatures.write().unwrap(); - if let Some(sig) = sigs_l.get(sig) { + fn reserve_signature(signatures: &mut HashSet, sig: &Signature) -> Result<()> { + if let Some(sig) = signatures.get(sig) { return Err(BankError::DuplicateSiganture(*sig)); } - { - sigs_l.insert(*sig); - } - /*if signatures - .read() - .expect("'signatures' read lock") - .contains(sig) - { - return Err(BankError::DuplicateSiganture(*sig)); - } - signatures - .write() - .expect("'signatures' write lock") - .insert(*sig);*/ + signatures.insert(*sig); Ok(()) } /// Forget the given `signature` because its transaction was rejected. - fn forget_signature(signatures: &RwLock>, signature: &Signature) { - signatures - .write() - .expect("'signatures' write lock in forget_signature") - .remove(signature); + fn forget_signature(signatures: &mut HashSet, signature: &Signature) { + signatures.remove(signature); } /// Forget the given `signature` with `last_id` because the transaction was rejected. fn forget_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) { if let Some(entry) = self.last_ids_sigs - .read() + .write() .expect("'last_ids' read lock in forget_signature_with_last_id") - .get(last_id) + .get_mut(last_id) { Self::forget_signature(entry, signature); } @@ -210,12 +158,11 @@ impl Bank { fn reserve_signature_with_last_id(&self, signature: &Signature, last_id: &Hash) -> Result<()> { if let Some(entry) = self.last_ids_sigs - .read() + .write() .expect("'last_ids' read lock in reserve_signature_with_last_id") - .get(last_id) + .get_mut(last_id) { - return Self::reserve_signature(&entry, signature); - //return Ok(()); + return Self::reserve_signature(entry, signature); } Err(BankError::LastIdNotFound(*last_id)) } @@ -235,22 +182,16 @@ impl Bank { let id = last_ids.pop_front().unwrap(); last_ids_sigs.remove(&id); } - last_ids_sigs.insert(*last_id, RwLock::new(HashSet::new())); + last_ids_sigs.insert(*last_id, HashSet::new()); last_ids.push_back(*last_id); } pub fn apply_debits(&self, tr: &Transaction, bals: &mut HashMap) -> Result<()> { - - //let mut bals = self.balances.write().unwrap(); - - // Hold a write lock before the condition check, so that a debit can't occur - // between checking the balance and the withdraw. - let mut option = bals.get_mut(&tr.from); + let option = bals.get_mut(&tr.from); if option.is_none() { return Err(BankError::AccountNotFound(tr.from)); } - //let mut bal = option.unwrap().write().unwrap(); - let mut bal = option.unwrap(); + let bal = option.unwrap(); self.reserve_signature_with_last_id(&tr.sig, &tr.last_id)?; @@ -269,56 +210,6 @@ impl Bank { Ok(()) } - /// Deduct tokens from the 'from' address the account has sufficient - /// funds and isn't a duplicate. - /*fn apply_debits(&self, tx: &Transaction) -> Result<()> { - if let Instruction::NewContract(contract) = &tx.instruction { - trace!("Transaction {}", contract.tokens); - if contract.tokens < 0 { - return Err(BankError::NegativeTokens); - } - } - let bals = self.balances - .read() - .expect("'balances' read lock in apply_debits"); - let option = bals.get(&tx.from); - - if option.is_none() { - return Err(BankError::AccountNotFound(tx.from)); - } - - self.reserve_signature_with_last_id(&tx.sig, &tx.last_id)?; - - loop { - let result = if let Instruction::NewContract(contract) = &tx.instruction { - let bal = option.expect("assignment of option to bal"); - let current = bal.load(Ordering::Relaxed) as i64; - - if current < contract.tokens { - self.forget_signature_with_last_id(&tx.sig, &tx.last_id); - return Err(BankError::InsufficientFunds(tx.from)); - } - - bal.compare_exchange( - current as isize, - (current - contract.tokens) as isize, - Ordering::Relaxed, - Ordering::Relaxed, - ) - } else { - Ok(0) - }; - - match result { - Ok(_) => { - self.transaction_count.fetch_add(1, Ordering::Relaxed); - return Ok(()); - } - Err(_) => continue, - }; - } - }*/ - fn apply_credits(&self, tx: &Transaction, balances: &mut HashMap) { match &tx.instruction { Instruction::NewContract(contract) => { @@ -355,11 +246,8 @@ impl Bank { Ok(()) } - /// Process a batch of transactions. It runs all debits first to filter out any - /// transactions that can't be processed in parallel deterministically. + /// Process a batch of transactions. pub fn process_transactions(&self, txs: Vec) -> Vec> { - // Run all debits first to filter out any transactions that can't be processed - // in parallel deterministically. let bals = &mut self.balances.write().unwrap(); debug!("processing Transactions {}", txs.len()); let txs_len = txs.len(); @@ -380,8 +268,14 @@ impl Bank { }) }) .collect(); - info!("debits: {} us credits: {:?} us tx: {}", - duration_as_us(&debits), duration_as_us(&now.elapsed()), txs_len); + + debug!( + "debits: {} us credits: {:?} us tx: {}", + duration_as_us(&debits), + duration_as_us(&now.elapsed()), + txs_len + ); + let mut tr_count = 0; for r in &res { if r.is_ok() { @@ -390,10 +284,8 @@ impl Bank { info!("tx error: {:?}", r); } } - //let tr_count = txs.len(); - self.transaction_count.fetch_add(tr_count, Ordering::Relaxed); - //debug!("credits: {:?}", now.elapsed()); - //let res = txs.into_iter().map(|tx| Ok(tx)).collect(); + self.transaction_count + .fetch_add(tr_count, Ordering::Relaxed); res } @@ -518,8 +410,6 @@ impl Bank { let bals = self.balances .read() .expect("'balances' read lock in get_balance"); - //bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) - //bals.get(pubkey).map(|x| *x.read().unwrap()) bals.get(pubkey).map(|x| *x) } @@ -772,6 +662,7 @@ mod bench { use bank::*; use bincode::serialize; use hash::hash; + use rayon::prelude::*; use signature::KeyPairUtil; #[bench] diff --git a/src/banking_stage.rs b/src/banking_stage.rs index ddf47dbb57..223d201732 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -274,17 +274,15 @@ mod bench { use logger; use mint::Mint; use packet::{to_packets_chunked, PacketRecycler}; + use rayon::prelude::*; use record_stage::Signal; use signature::{KeyPair, KeyPairUtil}; use std::iter; use std::sync::mpsc::{channel, Receiver}; use std::sync::Arc; use transaction::Transaction; - use rayon::prelude::*; - fn check_txs(batches: usize, - receiver: &Receiver, - ref_tx_count: usize) { + fn check_txs(batches: usize, receiver: &Receiver, ref_tx_count: usize) { let mut total = 0; for _ in 0..batches { let signal = receiver.recv().unwrap(); @@ -306,8 +304,10 @@ mod bench { let num_dst_accounts = 8 * 1024; let num_src_accounts = 8 * 1024; - let srckeys: Vec<_> = (0..num_src_accounts).map(|_| { KeyPair::new() }).collect(); - let dstkeys: Vec<_> = (0..num_dst_accounts).map(|_| { KeyPair::new().pubkey() }).collect(); + let srckeys: Vec<_> = (0..num_src_accounts).map(|_| KeyPair::new()).collect(); + let dstkeys: Vec<_> = (0..num_dst_accounts) + .map(|_| KeyPair::new().pubkey()) + .collect(); info!("created keys src: {} dst: {}", srckeys.len(), dstkeys.len());