Merge pull request #106 from garious/parallelize-accountant
Parallelize accountant
This commit is contained in:
@ -10,8 +10,9 @@ use mint::Mint;
|
|||||||
use plan::{Payment, Plan, Witness};
|
use plan::{Payment, Plan, Witness};
|
||||||
use signature::{KeyPair, PublicKey, Signature};
|
use signature::{KeyPair, PublicKey, Signature};
|
||||||
use std::collections::hash_map::Entry::Occupied;
|
use std::collections::hash_map::Entry::Occupied;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet, VecDeque};
|
||||||
use std::result;
|
use std::result;
|
||||||
|
use std::sync::RwLock;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
@ -23,29 +24,35 @@ pub enum AccountingError {
|
|||||||
pub type Result<T> = result::Result<T, AccountingError>;
|
pub type Result<T> = result::Result<T, AccountingError>;
|
||||||
|
|
||||||
/// Commit funds to the 'to' party.
|
/// Commit funds to the 'to' party.
|
||||||
fn apply_payment(balances: &mut HashMap<PublicKey, i64>, payment: &Payment) {
|
fn apply_payment(balances: &RwLock<HashMap<PublicKey, RwLock<i64>>>, payment: &Payment) {
|
||||||
*balances.entry(payment.to).or_insert(0) += payment.tokens;
|
if balances.read().unwrap().contains_key(&payment.to) {
|
||||||
|
let bals = balances.read().unwrap();
|
||||||
|
*bals[&payment.to].write().unwrap() += payment.tokens;
|
||||||
|
} else {
|
||||||
|
let mut bals = balances.write().unwrap();
|
||||||
|
bals.insert(payment.to, RwLock::new(payment.tokens));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Accountant {
|
pub struct Accountant {
|
||||||
balances: HashMap<PublicKey, i64>,
|
balances: RwLock<HashMap<PublicKey, RwLock<i64>>>,
|
||||||
pending: HashMap<Signature, Plan>,
|
pending: RwLock<HashMap<Signature, Plan>>,
|
||||||
signatures: HashSet<Signature>,
|
last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>,
|
||||||
time_sources: HashSet<PublicKey>,
|
time_sources: RwLock<HashSet<PublicKey>>,
|
||||||
last_time: DateTime<Utc>,
|
last_time: RwLock<DateTime<Utc>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Accountant {
|
impl Accountant {
|
||||||
/// Create an Accountant using a deposit.
|
/// Create an Accountant using a deposit.
|
||||||
pub fn new_from_deposit(deposit: &Payment) -> Self {
|
pub fn new_from_deposit(deposit: &Payment) -> Self {
|
||||||
let mut balances = HashMap::new();
|
let balances = RwLock::new(HashMap::new());
|
||||||
apply_payment(&mut balances, deposit);
|
apply_payment(&balances, deposit);
|
||||||
Accountant {
|
Accountant {
|
||||||
balances,
|
balances,
|
||||||
pending: HashMap::new(),
|
pending: RwLock::new(HashMap::new()),
|
||||||
signatures: HashSet::new(),
|
last_ids: RwLock::new(VecDeque::new()),
|
||||||
time_sources: HashSet::new(),
|
time_sources: RwLock::new(HashSet::new()),
|
||||||
last_time: Utc.timestamp(0, 0),
|
last_time: RwLock::new(Utc.timestamp(0, 0)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,46 +65,62 @@ impl Accountant {
|
|||||||
Self::new_from_deposit(&deposit)
|
Self::new_from_deposit(&deposit)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reserve_signature(&mut self, sig: &Signature) -> bool {
|
fn reserve_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
|
||||||
if self.signatures.contains(sig) {
|
if signatures.read().unwrap().contains(sig) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
self.signatures.insert(*sig);
|
signatures.write().unwrap().insert(*sig);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool {
|
||||||
|
if let Some(entry) = self.last_ids
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.find(|x| x.0 == *last_id)
|
||||||
|
{
|
||||||
|
return Self::reserve_signature(&entry.1, sig);
|
||||||
|
}
|
||||||
|
let sigs = RwLock::new(HashSet::new());
|
||||||
|
Self::reserve_signature(&sigs, sig);
|
||||||
|
self.last_ids.write().unwrap().push_back((*last_id, sigs));
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process a Transaction that has already been verified.
|
/// Process a Transaction that has already been verified.
|
||||||
pub fn process_verified_transaction(&mut self, tr: &Transaction) -> Result<()> {
|
pub fn process_verified_transaction(&self, tr: &Transaction) -> Result<()> {
|
||||||
if self.get_balance(&tr.from).unwrap_or(0) < tr.tokens {
|
if self.get_balance(&tr.from).unwrap_or(0) < tr.tokens {
|
||||||
return Err(AccountingError::InsufficientFunds);
|
return Err(AccountingError::InsufficientFunds);
|
||||||
}
|
}
|
||||||
|
|
||||||
if !self.reserve_signature(&tr.sig) {
|
if !self.reserve_signature_with_last_id(&tr.sig, &tr.last_id) {
|
||||||
return Err(AccountingError::InvalidTransferSignature);
|
return Err(AccountingError::InvalidTransferSignature);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(x) = self.balances.get_mut(&tr.from) {
|
if let Some(x) = self.balances.read().unwrap().get(&tr.from) {
|
||||||
*x -= tr.tokens;
|
*x.write().unwrap() -= tr.tokens;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut plan = tr.plan.clone();
|
let mut plan = tr.plan.clone();
|
||||||
plan.apply_witness(&Witness::Timestamp(self.last_time));
|
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap()));
|
||||||
|
|
||||||
if let Some(ref payment) = plan.final_payment() {
|
if let Some(ref payment) = plan.final_payment() {
|
||||||
apply_payment(&mut self.balances, payment);
|
apply_payment(&self.balances, payment);
|
||||||
} else {
|
} else {
|
||||||
self.pending.insert(tr.sig, plan);
|
self.pending.write().unwrap().insert(tr.sig, plan);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process a Witness Signature that has already been verified.
|
/// Process a Witness Signature that has already been verified.
|
||||||
fn process_verified_sig(&mut self, from: PublicKey, tx_sig: Signature) -> Result<()> {
|
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
|
||||||
if let Occupied(mut e) = self.pending.entry(tx_sig) {
|
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
|
||||||
e.get_mut().apply_witness(&Witness::Signature(from));
|
e.get_mut().apply_witness(&Witness::Signature(from));
|
||||||
if let Some(ref payment) = e.get().final_payment() {
|
if let Some(ref payment) = e.get().final_payment() {
|
||||||
apply_payment(&mut self.balances, payment);
|
apply_payment(&self.balances, payment);
|
||||||
e.remove_entry();
|
e.remove_entry();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -106,16 +129,16 @@ impl Accountant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Process a Witness Timestamp that has already been verified.
|
/// Process a Witness Timestamp that has already been verified.
|
||||||
fn process_verified_timestamp(&mut self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> {
|
fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> {
|
||||||
// If this is the first timestamp we've seen, it probably came from the genesis block,
|
// If this is the first timestamp we've seen, it probably came from the genesis block,
|
||||||
// so we'll trust it.
|
// so we'll trust it.
|
||||||
if self.last_time == Utc.timestamp(0, 0) {
|
if *self.last_time.read().unwrap() == Utc.timestamp(0, 0) {
|
||||||
self.time_sources.insert(from);
|
self.time_sources.write().unwrap().insert(from);
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.time_sources.contains(&from) {
|
if self.time_sources.read().unwrap().contains(&from) {
|
||||||
if dt > self.last_time {
|
if dt > *self.last_time.read().unwrap() {
|
||||||
self.last_time = dt;
|
*self.last_time.write().unwrap() = dt;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@ -123,23 +146,27 @@ impl Accountant {
|
|||||||
|
|
||||||
// Check to see if any timelocked transactions can be completed.
|
// Check to see if any timelocked transactions can be completed.
|
||||||
let mut completed = vec![];
|
let mut completed = vec![];
|
||||||
for (key, plan) in &mut self.pending {
|
|
||||||
plan.apply_witness(&Witness::Timestamp(self.last_time));
|
// Hold 'pending' write lock until the end of this function. Otherwise another thread can
|
||||||
|
// double-spend if it enters before the modified plan is removed from 'pending'.
|
||||||
|
let mut pending = self.pending.write().unwrap();
|
||||||
|
for (key, plan) in pending.iter_mut() {
|
||||||
|
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap()));
|
||||||
if let Some(ref payment) = plan.final_payment() {
|
if let Some(ref payment) = plan.final_payment() {
|
||||||
apply_payment(&mut self.balances, payment);
|
apply_payment(&self.balances, payment);
|
||||||
completed.push(key.clone());
|
completed.push(key.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for key in completed {
|
for key in completed {
|
||||||
self.pending.remove(&key);
|
pending.remove(&key);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process an Transaction or Witness that has already been verified.
|
/// Process an Transaction or Witness that has already been verified.
|
||||||
pub fn process_verified_event(&mut self, event: &Event) -> Result<()> {
|
pub fn process_verified_event(&self, event: &Event) -> Result<()> {
|
||||||
match *event {
|
match *event {
|
||||||
Event::Transaction(ref tr) => self.process_verified_transaction(tr),
|
Event::Transaction(ref tr) => self.process_verified_transaction(tr),
|
||||||
Event::Signature { from, tx_sig, .. } => self.process_verified_sig(from, tx_sig),
|
Event::Signature { from, tx_sig, .. } => self.process_verified_sig(from, tx_sig),
|
||||||
@ -150,7 +177,7 @@ impl Accountant {
|
|||||||
/// Create, sign, and process a Transaction from `keypair` to `to` of
|
/// Create, sign, and process a Transaction from `keypair` to `to` of
|
||||||
/// `n` tokens where `last_id` is the last Entry ID observed by the client.
|
/// `n` tokens where `last_id` is the last Entry ID observed by the client.
|
||||||
pub fn transfer(
|
pub fn transfer(
|
||||||
&mut self,
|
&self,
|
||||||
n: i64,
|
n: i64,
|
||||||
keypair: &KeyPair,
|
keypair: &KeyPair,
|
||||||
to: PublicKey,
|
to: PublicKey,
|
||||||
@ -165,7 +192,7 @@ impl Accountant {
|
|||||||
/// to `to` of `n` tokens on `dt` where `last_id` is the last Entry ID
|
/// to `to` of `n` tokens on `dt` where `last_id` is the last Entry ID
|
||||||
/// observed by the client.
|
/// observed by the client.
|
||||||
pub fn transfer_on_date(
|
pub fn transfer_on_date(
|
||||||
&mut self,
|
&self,
|
||||||
n: i64,
|
n: i64,
|
||||||
keypair: &KeyPair,
|
keypair: &KeyPair,
|
||||||
to: PublicKey,
|
to: PublicKey,
|
||||||
@ -178,7 +205,8 @@ impl Accountant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
|
pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
|
||||||
self.balances.get(pubkey).cloned()
|
let bals = self.balances.read().unwrap();
|
||||||
|
bals.get(pubkey).map(|x| *x.read().unwrap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,7 +219,7 @@ mod tests {
|
|||||||
fn test_accountant() {
|
fn test_accountant() {
|
||||||
let alice = Mint::new(10_000);
|
let alice = Mint::new(10_000);
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let mut acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
|
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000);
|
||||||
@ -204,7 +232,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_invalid_transfer() {
|
fn test_invalid_transfer() {
|
||||||
let alice = Mint::new(11_000);
|
let alice = Mint::new(11_000);
|
||||||
let mut acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@ -221,7 +249,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_transfer_to_newb() {
|
fn test_transfer_to_newb() {
|
||||||
let alice = Mint::new(10_000);
|
let alice = Mint::new(10_000);
|
||||||
let mut acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let alice_keypair = alice.keypair();
|
let alice_keypair = alice.keypair();
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
acc.transfer(500, &alice_keypair, bob_pubkey, alice.last_id())
|
acc.transfer(500, &alice_keypair, bob_pubkey, alice.last_id())
|
||||||
@ -232,7 +260,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_transfer_on_date() {
|
fn test_transfer_on_date() {
|
||||||
let alice = Mint::new(1);
|
let alice = Mint::new(1);
|
||||||
let mut acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let alice_keypair = alice.keypair();
|
let alice_keypair = alice.keypair();
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let dt = Utc::now();
|
let dt = Utc::now();
|
||||||
@ -258,7 +286,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_transfer_after_date() {
|
fn test_transfer_after_date() {
|
||||||
let alice = Mint::new(1);
|
let alice = Mint::new(1);
|
||||||
let mut acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let alice_keypair = alice.keypair();
|
let alice_keypair = alice.keypair();
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let dt = Utc::now();
|
let dt = Utc::now();
|
||||||
@ -275,7 +303,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_cancel_transfer() {
|
fn test_cancel_transfer() {
|
||||||
let alice = Mint::new(1);
|
let alice = Mint::new(1);
|
||||||
let mut acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let alice_keypair = alice.keypair();
|
let alice_keypair = alice.keypair();
|
||||||
let bob_pubkey = KeyPair::new().pubkey();
|
let bob_pubkey = KeyPair::new().pubkey();
|
||||||
let dt = Utc::now();
|
let dt = Utc::now();
|
||||||
@ -301,9 +329,56 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_duplicate_event_signature() {
|
fn test_duplicate_event_signature() {
|
||||||
let alice = Mint::new(1);
|
let alice = Mint::new(1);
|
||||||
let mut acc = Accountant::new(&alice);
|
let acc = Accountant::new(&alice);
|
||||||
let sig = Signature::default();
|
let sig = Signature::default();
|
||||||
assert!(acc.reserve_signature(&sig));
|
let last_id = Hash::default();
|
||||||
assert!(!acc.reserve_signature(&sig));
|
assert!(acc.reserve_signature_with_last_id(&sig, &last_id));
|
||||||
|
assert!(!acc.reserve_signature_with_last_id(&sig, &last_id));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(feature = "unstable", test))]
|
||||||
|
mod bench {
|
||||||
|
extern crate test;
|
||||||
|
use self::test::Bencher;
|
||||||
|
use accountant::*;
|
||||||
|
use rayon::prelude::*;
|
||||||
|
use signature::KeyPairUtil;
|
||||||
|
use hash::hash;
|
||||||
|
use bincode::serialize;
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn process_verified_event_bench(bencher: &mut Bencher) {
|
||||||
|
let mint = Mint::new(100_000_000);
|
||||||
|
let acc = Accountant::new(&mint);
|
||||||
|
// Create transactions between unrelated parties.
|
||||||
|
let transactions: Vec<_> = (0..4096)
|
||||||
|
.into_par_iter()
|
||||||
|
.map(|i| {
|
||||||
|
// Seed the 'from' account.
|
||||||
|
let rando0 = KeyPair::new();
|
||||||
|
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id());
|
||||||
|
acc.process_verified_transaction(&tr).unwrap();
|
||||||
|
|
||||||
|
// Seed the 'to' account and a cell for its signature.
|
||||||
|
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
|
||||||
|
let rando1 = KeyPair::new();
|
||||||
|
let tr = Transaction::new(&rando0, rando1.pubkey(), 1, last_id);
|
||||||
|
acc.process_verified_transaction(&tr).unwrap();
|
||||||
|
|
||||||
|
// Finally, return a transaction that's unique
|
||||||
|
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
bencher.iter(|| {
|
||||||
|
// Since benchmarker runs this multiple times, we need to clear the signatures.
|
||||||
|
for (_, sigs) in acc.last_ids.read().unwrap().iter() {
|
||||||
|
sigs.write().unwrap().clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
transactions.par_iter().for_each(|tr| {
|
||||||
|
acc.process_verified_transaction(tr).unwrap();
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,7 @@ fn main() {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut acc = Accountant::new_from_deposit(&deposit.unwrap());
|
let acc = Accountant::new_from_deposit(&deposit.unwrap());
|
||||||
|
|
||||||
let mut last_id = entry1.id;
|
let mut last_id = entry1.id;
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
|
Reference in New Issue
Block a user