Merge pull request #217 from garious/add-historian-stage

Add record_stage to pipeline
This commit is contained in:
Greg Fitzgerald 2018-05-14 16:01:45 -06:00 committed by GitHub
commit a604dcb4c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 399 additions and 446 deletions

View File

@ -1,4 +1,4 @@
//! The `accountant` module tracks client balances, and the progress of pending //! The `bank` module tracks client balances, and the progress of pending
//! transactions. It offers a high-level public API that signs transactions //! transactions. It offers a high-level public API that signs transactions
//! on behalf of the caller, and a private low-level API for when they have //! on behalf of the caller, and a private low-level API for when they have
//! already been signed and verified. //! already been signed and verified.
@ -23,13 +23,13 @@ use transaction::Transaction;
pub const MAX_ENTRY_IDS: usize = 1024 * 4; pub const MAX_ENTRY_IDS: usize = 1024 * 4;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum AccountingError { pub enum BankError {
AccountNotFound, AccountNotFound,
InsufficientFunds, InsufficientFunds,
InvalidTransferSignature, InvalidTransferSignature,
} }
pub type Result<T> = result::Result<T, AccountingError>; pub type Result<T> = result::Result<T, BankError>;
/// Commit funds to the 'to' party. /// Commit funds to the 'to' party.
fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &Payment) { fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &Payment) {
@ -53,7 +53,7 @@ fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &P
} }
} }
pub struct Accountant { pub struct Bank {
balances: RwLock<HashMap<PublicKey, AtomicIsize>>, balances: RwLock<HashMap<PublicKey, AtomicIsize>>,
pending: RwLock<HashMap<Signature, Plan>>, pending: RwLock<HashMap<Signature, Plan>>,
last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>, last_ids: RwLock<VecDeque<(Hash, RwLock<HashSet<Signature>>)>>,
@ -62,12 +62,12 @@ pub struct Accountant {
transaction_count: AtomicUsize, transaction_count: AtomicUsize,
} }
impl Accountant { impl Bank {
/// Create an Accountant using a deposit. /// Create an Bank using a deposit.
pub fn new_from_deposit(deposit: &Payment) -> Self { pub fn new_from_deposit(deposit: &Payment) -> Self {
let balances = RwLock::new(HashMap::new()); let balances = RwLock::new(HashMap::new());
apply_payment(&balances, deposit); apply_payment(&balances, deposit);
Accountant { Bank {
balances, balances,
pending: RwLock::new(HashMap::new()), pending: RwLock::new(HashMap::new()),
last_ids: RwLock::new(VecDeque::new()), last_ids: RwLock::new(VecDeque::new()),
@ -77,15 +77,15 @@ impl Accountant {
} }
} }
/// Create an Accountant with only a Mint. Typically used by unit tests. /// Create an Bank with only a Mint. Typically used by unit tests.
pub fn new(mint: &Mint) -> Self { pub fn new(mint: &Mint) -> Self {
let deposit = Payment { let deposit = Payment {
to: mint.pubkey(), to: mint.pubkey(),
tokens: mint.tokens, tokens: mint.tokens,
}; };
let accountant = Self::new_from_deposit(&deposit); let bank = Self::new_from_deposit(&deposit);
accountant.register_entry_id(&mint.last_id()); bank.register_entry_id(&mint.last_id());
accountant bank
} }
/// Return the last entry ID registered /// Return the last entry ID registered
@ -143,10 +143,10 @@ impl Accountant {
false false
} }
/// Tell the accountant which Entry IDs exist on the ledger. This function /// Tell the bank which Entry IDs exist on the ledger. This function
/// assumes subsequent calls correspond to later entries, and will boot /// assumes subsequent calls correspond to later entries, and will boot
/// the oldest ones once its internal cache is full. Once boot, the /// the oldest ones once its internal cache is full. Once boot, the
/// accountant will reject transactions using that `last_id`. /// bank will reject transactions using that `last_id`.
pub fn register_entry_id(&self, last_id: &Hash) { pub fn register_entry_id(&self, last_id: &Hash) {
let mut last_ids = self.last_ids let mut last_ids = self.last_ids
.write() .write()
@ -166,11 +166,11 @@ impl Accountant {
let option = bals.get(&tr.from); let option = bals.get(&tr.from);
if option.is_none() { if option.is_none() {
return Err(AccountingError::AccountNotFound); return Err(BankError::AccountNotFound);
} }
if !self.reserve_signature_with_last_id(&tr.sig, &tr.data.last_id) { if !self.reserve_signature_with_last_id(&tr.sig, &tr.data.last_id) {
return Err(AccountingError::InvalidTransferSignature); return Err(BankError::InvalidTransferSignature);
} }
loop { loop {
@ -179,7 +179,7 @@ impl Accountant {
if current < tr.data.tokens { if current < tr.data.tokens {
self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id); self.forget_signature_with_last_id(&tr.sig, &tr.data.last_id);
return Err(AccountingError::InsufficientFunds); return Err(BankError::InsufficientFunds);
} }
let result = bal.compare_exchange( let result = bal.compare_exchange(
@ -406,207 +406,186 @@ mod tests {
use signature::KeyPairUtil; use signature::KeyPairUtil;
#[test] #[test]
fn test_accountant() { fn test_bank() {
let alice = Mint::new(10_000); let mint = Mint::new(10_000);
let bob_pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
let accountant = Accountant::new(&alice); let bank = Bank::new(&mint);
assert_eq!(accountant.last_id(), alice.last_id()); assert_eq!(bank.last_id(), mint.last_id());
accountant bank.transfer(1_000, &mint.keypair(), pubkey, mint.last_id())
.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
.unwrap(); .unwrap();
assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_000); assert_eq!(bank.get_balance(&pubkey).unwrap(), 1_000);
accountant bank.transfer(500, &mint.keypair(), pubkey, mint.last_id())
.transfer(500, &alice.keypair(), bob_pubkey, alice.last_id())
.unwrap(); .unwrap();
assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_500); assert_eq!(bank.get_balance(&pubkey).unwrap(), 1_500);
assert_eq!(accountant.transaction_count(), 2); assert_eq!(bank.transaction_count(), 2);
} }
#[test] #[test]
fn test_account_not_found() { fn test_account_not_found() {
let mint = Mint::new(1); let mint = Mint::new(1);
let accountant = Accountant::new(&mint); let bank = Bank::new(&mint);
assert_eq!( assert_eq!(
accountant.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()), bank.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()),
Err(AccountingError::AccountNotFound) Err(BankError::AccountNotFound)
); );
assert_eq!(accountant.transaction_count(), 0); assert_eq!(bank.transaction_count(), 0);
} }
#[test] #[test]
fn test_invalid_transfer() { fn test_invalid_transfer() {
let alice = Mint::new(11_000); let mint = Mint::new(11_000);
let accountant = Accountant::new(&alice); let bank = Bank::new(&mint);
let bob_pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
accountant bank.transfer(1_000, &mint.keypair(), pubkey, mint.last_id())
.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
.unwrap(); .unwrap();
assert_eq!(accountant.transaction_count(), 1); assert_eq!(bank.transaction_count(), 1);
assert_eq!( assert_eq!(
accountant.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()), bank.transfer(10_001, &mint.keypair(), pubkey, mint.last_id()),
Err(AccountingError::InsufficientFunds) Err(BankError::InsufficientFunds)
); );
assert_eq!(accountant.transaction_count(), 1); assert_eq!(bank.transaction_count(), 1);
let alice_pubkey = alice.keypair().pubkey(); let mint_pubkey = mint.keypair().pubkey();
assert_eq!(accountant.get_balance(&alice_pubkey).unwrap(), 10_000); assert_eq!(bank.get_balance(&mint_pubkey).unwrap(), 10_000);
assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_000); assert_eq!(bank.get_balance(&pubkey).unwrap(), 1_000);
} }
#[test] #[test]
fn test_transfer_to_newb() { fn test_transfer_to_newb() {
let alice = Mint::new(10_000); let mint = Mint::new(10_000);
let accountant = Accountant::new(&alice); let bank = Bank::new(&mint);
let alice_keypair = alice.keypair(); let pubkey = KeyPair::new().pubkey();
let bob_pubkey = KeyPair::new().pubkey(); bank.transfer(500, &mint.keypair(), pubkey, mint.last_id())
accountant
.transfer(500, &alice_keypair, bob_pubkey, alice.last_id())
.unwrap(); .unwrap();
assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 500); assert_eq!(bank.get_balance(&pubkey).unwrap(), 500);
} }
#[test] #[test]
fn test_transfer_on_date() { fn test_transfer_on_date() {
let alice = Mint::new(1); let mint = Mint::new(1);
let accountant = Accountant::new(&alice); let bank = Bank::new(&mint);
let alice_keypair = alice.keypair(); let pubkey = KeyPair::new().pubkey();
let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
accountant bank.transfer_on_date(1, &mint.keypair(), pubkey, dt, mint.last_id())
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
.unwrap(); .unwrap();
// Alice's balance will be zero because all funds are locked up. // Mint's balance will be zero because all funds are locked up.
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0)); assert_eq!(bank.get_balance(&mint.pubkey()), Some(0));
// tx count is 1, because debits were applied. // tx count is 1, because debits were applied.
assert_eq!(accountant.transaction_count(), 1); assert_eq!(bank.transaction_count(), 1);
// Bob's balance will be None because the funds have not been // pubkey's balance will be None because the funds have not been
// sent. // sent.
assert_eq!(accountant.get_balance(&bob_pubkey), None); assert_eq!(bank.get_balance(&pubkey), None);
// Now, acknowledge the time in the condition occurred and // Now, acknowledge the time in the condition occurred and
// that bob's funds are now available. // that pubkey's funds are now available.
accountant bank.process_verified_timestamp(mint.pubkey(), dt).unwrap();
.process_verified_timestamp(alice.pubkey(), dt) assert_eq!(bank.get_balance(&pubkey), Some(1));
.unwrap();
assert_eq!(accountant.get_balance(&bob_pubkey), Some(1));
// tx count is still 1, because we chose not to count timestamp events // tx count is still 1, because we chose not to count timestamp events
// tx count. // tx count.
assert_eq!(accountant.transaction_count(), 1); assert_eq!(bank.transaction_count(), 1);
accountant bank.process_verified_timestamp(mint.pubkey(), dt).unwrap(); // <-- Attack! Attempt to process completed transaction.
.process_verified_timestamp(alice.pubkey(), dt) assert_ne!(bank.get_balance(&pubkey), Some(2));
.unwrap(); // <-- Attack! Attempt to process completed transaction.
assert_ne!(accountant.get_balance(&bob_pubkey), Some(2));
} }
#[test] #[test]
fn test_transfer_after_date() { fn test_transfer_after_date() {
let alice = Mint::new(1); let mint = Mint::new(1);
let accountant = Accountant::new(&alice); let bank = Bank::new(&mint);
let alice_keypair = alice.keypair(); let pubkey = KeyPair::new().pubkey();
let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
accountant bank.process_verified_timestamp(mint.pubkey(), dt).unwrap();
.process_verified_timestamp(alice.pubkey(), dt)
.unwrap();
// It's now past now, so this transfer should be processed immediately. // It's now past now, so this transfer should be processed immediately.
accountant bank.transfer_on_date(1, &mint.keypair(), pubkey, dt, mint.last_id())
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
.unwrap(); .unwrap();
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0)); assert_eq!(bank.get_balance(&mint.pubkey()), Some(0));
assert_eq!(accountant.get_balance(&bob_pubkey), Some(1)); assert_eq!(bank.get_balance(&pubkey), Some(1));
} }
#[test] #[test]
fn test_cancel_transfer() { fn test_cancel_transfer() {
let alice = Mint::new(1); let mint = Mint::new(1);
let accountant = Accountant::new(&alice); let bank = Bank::new(&mint);
let alice_keypair = alice.keypair(); let pubkey = KeyPair::new().pubkey();
let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
let sig = accountant let sig = bank.transfer_on_date(1, &mint.keypair(), pubkey, dt, mint.last_id())
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
.unwrap(); .unwrap();
// Assert the debit counts as a transaction. // Assert the debit counts as a transaction.
assert_eq!(accountant.transaction_count(), 1); assert_eq!(bank.transaction_count(), 1);
// Alice's balance will be zero because all funds are locked up. // Mint's balance will be zero because all funds are locked up.
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0)); assert_eq!(bank.get_balance(&mint.pubkey()), Some(0));
// Bob's balance will be None because the funds have not been // pubkey's balance will be None because the funds have not been
// sent. // sent.
assert_eq!(accountant.get_balance(&bob_pubkey), None); assert_eq!(bank.get_balance(&pubkey), None);
// Now, cancel the trancaction. Alice gets her funds back, Bob never sees them. // Now, cancel the trancaction. Mint gets her funds back, pubkey never sees them.
accountant bank.process_verified_sig(mint.pubkey(), sig).unwrap();
.process_verified_sig(alice.pubkey(), sig) assert_eq!(bank.get_balance(&mint.pubkey()), Some(1));
.unwrap(); assert_eq!(bank.get_balance(&pubkey), None);
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1));
assert_eq!(accountant.get_balance(&bob_pubkey), None);
// Assert cancel doesn't cause count to go backward. // Assert cancel doesn't cause count to go backward.
assert_eq!(accountant.transaction_count(), 1); assert_eq!(bank.transaction_count(), 1);
accountant bank.process_verified_sig(mint.pubkey(), sig).unwrap(); // <-- Attack! Attempt to cancel completed transaction.
.process_verified_sig(alice.pubkey(), sig) assert_ne!(bank.get_balance(&mint.pubkey()), Some(2));
.unwrap(); // <-- Attack! Attempt to cancel completed transaction.
assert_ne!(accountant.get_balance(&alice.pubkey()), Some(2));
} }
#[test] #[test]
fn test_duplicate_event_signature() { fn test_duplicate_event_signature() {
let alice = Mint::new(1); let mint = Mint::new(1);
let accountant = Accountant::new(&alice); let bank = Bank::new(&mint);
let sig = Signature::default(); let sig = Signature::default();
assert!(accountant.reserve_signature_with_last_id(&sig, &alice.last_id())); assert!(bank.reserve_signature_with_last_id(&sig, &mint.last_id()));
assert!(!accountant.reserve_signature_with_last_id(&sig, &alice.last_id())); assert!(!bank.reserve_signature_with_last_id(&sig, &mint.last_id()));
} }
#[test] #[test]
fn test_forget_signature() { fn test_forget_signature() {
let alice = Mint::new(1); let mint = Mint::new(1);
let accountant = Accountant::new(&alice); let bank = Bank::new(&mint);
let sig = Signature::default(); let sig = Signature::default();
accountant.reserve_signature_with_last_id(&sig, &alice.last_id()); bank.reserve_signature_with_last_id(&sig, &mint.last_id());
assert!(accountant.forget_signature_with_last_id(&sig, &alice.last_id())); assert!(bank.forget_signature_with_last_id(&sig, &mint.last_id()));
assert!(!accountant.forget_signature_with_last_id(&sig, &alice.last_id())); assert!(!bank.forget_signature_with_last_id(&sig, &mint.last_id()));
} }
#[test] #[test]
fn test_max_entry_ids() { fn test_max_entry_ids() {
let alice = Mint::new(1); let mint = Mint::new(1);
let accountant = Accountant::new(&alice); let bank = Bank::new(&mint);
let sig = Signature::default(); let sig = Signature::default();
for i in 0..MAX_ENTRY_IDS { for i in 0..MAX_ENTRY_IDS {
let last_id = hash(&serialize(&i).unwrap()); // Unique hash let last_id = hash(&serialize(&i).unwrap()); // Unique hash
accountant.register_entry_id(&last_id); bank.register_entry_id(&last_id);
} }
// Assert we're no longer able to use the oldest entry ID. // Assert we're no longer able to use the oldest entry ID.
assert!(!accountant.reserve_signature_with_last_id(&sig, &alice.last_id())); assert!(!bank.reserve_signature_with_last_id(&sig, &mint.last_id()));
} }
#[test] #[test]
fn test_debits_before_credits() { fn test_debits_before_credits() {
let mint = Mint::new(2); let mint = Mint::new(2);
let accountant = Accountant::new(&mint); let bank = Bank::new(&mint);
let alice = KeyPair::new(); let keypair = KeyPair::new();
let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let tr0 = Transaction::new(&mint.keypair(), keypair.pubkey(), 2, mint.last_id());
let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let tr1 = Transaction::new(&keypair, mint.pubkey(), 1, mint.last_id());
let trs = vec![tr0, tr1]; let trs = vec![tr0, tr1];
let results = accountant.process_verified_transactions(trs); let results = bank.process_verified_transactions(trs);
assert!(results[1].is_err()); assert!(results[1].is_err());
// Assert bad transactions aren't counted. // Assert bad transactions aren't counted.
assert_eq!(accountant.transaction_count(), 1); assert_eq!(bank.transaction_count(), 1);
} }
} }
@ -614,7 +593,7 @@ mod tests {
mod bench { mod bench {
extern crate test; extern crate test;
use self::test::Bencher; use self::test::Bencher;
use accountant::*; use bank::*;
use bincode::serialize; use bincode::serialize;
use hash::hash; use hash::hash;
use signature::KeyPairUtil; use signature::KeyPairUtil;
@ -622,7 +601,7 @@ mod bench {
#[bench] #[bench]
fn process_verified_event_bench(bencher: &mut Bencher) { fn process_verified_event_bench(bencher: &mut Bencher) {
let mint = Mint::new(100_000_000); let mint = Mint::new(100_000_000);
let accountant = Accountant::new(&mint); let bank = Bank::new(&mint);
// Create transactions between unrelated parties. // Create transactions between unrelated parties.
let transactions: Vec<_> = (0..4096) let transactions: Vec<_> = (0..4096)
.into_par_iter() .into_par_iter()
@ -630,15 +609,15 @@ mod bench {
// Seed the 'from' account. // Seed the 'from' account.
let rando0 = KeyPair::new(); let rando0 = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id()); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id());
accountant.process_verified_transaction(&tr).unwrap(); bank.process_verified_transaction(&tr).unwrap();
// Seed the 'to' account and a cell for its signature. // Seed the 'to' account and a cell for its signature.
let last_id = hash(&serialize(&i).unwrap()); // Unique hash let last_id = hash(&serialize(&i).unwrap()); // Unique hash
accountant.register_entry_id(&last_id); bank.register_entry_id(&last_id);
let rando1 = KeyPair::new(); let rando1 = KeyPair::new();
let tr = Transaction::new(&rando0, rando1.pubkey(), 1, last_id); let tr = Transaction::new(&rando0, rando1.pubkey(), 1, last_id);
accountant.process_verified_transaction(&tr).unwrap(); bank.process_verified_transaction(&tr).unwrap();
// Finally, return a transaction that's unique // Finally, return a transaction that's unique
Transaction::new(&rando0, rando1.pubkey(), 1, last_id) Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
@ -646,13 +625,12 @@ mod bench {
.collect(); .collect();
bencher.iter(|| { bencher.iter(|| {
// Since benchmarker runs this multiple times, we need to clear the signatures. // Since benchmarker runs this multiple times, we need to clear the signatures.
for sigs in accountant.last_ids.read().unwrap().iter() { for sigs in bank.last_ids.read().unwrap().iter() {
sigs.1.write().unwrap().clear(); sigs.1.write().unwrap().clear();
} }
assert!( assert!(
accountant bank.process_verified_transactions(transactions.clone())
.process_verified_transactions(transactions.clone())
.iter() .iter()
.all(|x| x.is_ok()) .all(|x| x.is_ok())
); );

View File

@ -7,7 +7,7 @@ extern crate untrusted;
use isatty::stdin_isatty; use isatty::stdin_isatty;
use rayon::prelude::*; use rayon::prelude::*;
use solana::accountant::MAX_ENTRY_IDS; use solana::bank::MAX_ENTRY_IDS;
use solana::entry::{create_entry, next_entry}; use solana::entry::{create_entry, next_entry};
use solana::event::Event; use solana::event::Event;
use solana::mint::MintDemo; use solana::mint::MintDemo;

View File

@ -6,11 +6,10 @@ extern crate solana;
use getopts::Options; use getopts::Options;
use isatty::stdin_isatty; use isatty::stdin_isatty;
use solana::accountant::Accountant; use solana::bank::Bank;
use solana::crdt::ReplicatedData; use solana::crdt::ReplicatedData;
use solana::entry::Entry; use solana::entry::Entry;
use solana::event::Event; use solana::event::Event;
use solana::event_processor::EventProcessor;
use solana::rpu::Rpu; use solana::rpu::Rpu;
use solana::signature::{KeyPair, KeyPairUtil}; use solana::signature::{KeyPair, KeyPairUtil};
use std::env; use std::env;
@ -19,6 +18,7 @@ use std::net::UdpSocket;
use std::process::exit; use std::process::exit;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::time::Duration;
fn print_usage(program: &str, opts: Options) { fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program); let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
@ -92,32 +92,31 @@ fn main() {
None None
}; };
eprintln!("creating accountant..."); eprintln!("creating bank...");
let accountant = Accountant::new_from_deposit(&deposit.unwrap()); let bank = Bank::new_from_deposit(&deposit.unwrap());
accountant.register_entry_id(&entry0.id); bank.register_entry_id(&entry0.id);
accountant.register_entry_id(&entry1.id); bank.register_entry_id(&entry1.id);
eprintln!("processing entries..."); eprintln!("processing entries...");
let mut last_id = entry1.id; let mut last_id = entry1.id;
for entry in entries { for entry in entries {
last_id = entry.id; last_id = entry.id;
let results = accountant.process_verified_events(entry.events); let results = bank.process_verified_events(entry.events);
for result in results { for result in results {
if let Err(e) = result { if let Err(e) = result {
eprintln!("failed to process event {:?}", e); eprintln!("failed to process event {:?}", e);
exit(1); exit(1);
} }
} }
accountant.register_entry_id(&last_id); bank.register_entry_id(&last_id);
} }
eprintln!("creating networking stack..."); eprintln!("creating networking stack...");
let event_processor = EventProcessor::new(accountant, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(event_processor); let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000)));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();

View File

@ -11,7 +11,7 @@
//! * layer 1 - As many nodes as we can fit //! * layer 1 - As many nodes as we can fit
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//! //!
//! Accountant needs to provide an interface for us to query the stake weight //! Bank needs to provide an interface for us to query the stake weight
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt}; use byteorder::{LittleEndian, ReadBytesExt};

View File

@ -1,6 +1,6 @@
//! The `entry_writer` module helps implement the TPU's write stage. //! The `entry_writer` module helps implement the TPU's write stage.
use accountant::Accountant; use bank::Bank;
use entry::Entry; use entry::Entry;
use ledger; use ledger;
use packet; use packet;
@ -15,18 +15,18 @@ use std::time::Duration;
use streamer; use streamer;
pub struct EntryWriter<'a> { pub struct EntryWriter<'a> {
accountant: &'a Accountant, bank: &'a Bank,
} }
impl<'a> EntryWriter<'a> { impl<'a> EntryWriter<'a> {
/// Create a new Tpu that wraps the given Accountant. /// Create a new Tpu that wraps the given Bank.
pub fn new(accountant: &'a Accountant) -> Self { pub fn new(bank: &'a Bank) -> Self {
EntryWriter { accountant } EntryWriter { bank }
} }
fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) { fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
trace!("write_entry entry"); trace!("write_entry entry");
self.accountant.register_entry_id(&entry.id); self.bank.register_entry_id(&entry.id);
writeln!( writeln!(
writer.lock().expect("'writer' lock in fn fn write_entry"), writer.lock().expect("'writer' lock in fn fn write_entry"),
"{}", "{}",

View File

@ -1,167 +0,0 @@
//! The `event_processor` module implements the accounting stage of the TPU.
use accountant::Accountant;
use entry::Entry;
use event::Event;
use hash::Hash;
use historian::Historian;
use recorder::Signal;
use result::Result;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
pub struct EventProcessor {
pub accountant: Arc<Accountant>,
historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>,
}
impl EventProcessor {
/// Create a new stage of the TPU for event and transaction processing
pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
let (historian_input, event_receiver) = channel();
let historian = Historian::new(event_receiver, start_hash, ms_per_tick);
EventProcessor {
accountant: Arc::new(accountant),
historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian),
}
}
/// Process the transactions in parallel and then log the successful ones.
pub fn process_events(&self, events: Vec<Event>) -> Result<Entry> {
let historian = self.historian.lock().unwrap();
let results = self.accountant.process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect();
let sender = self.historian_input.lock().unwrap();
sender.send(Signal::Events(events))?;
// Wait for the historian to tag our Events with an ID and then register it.
let entry = historian.entry_receiver.lock().unwrap().recv()?;
self.accountant.register_entry_id(&entry.id);
Ok(entry)
}
}
#[cfg(test)]
mod tests {
use accountant::Accountant;
use event::Event;
use event_processor::EventProcessor;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use transaction::Transaction;
#[test]
// TODO: Move this test accounting_stage. Calling process_events() directly
// defeats the purpose of this test.
fn test_accounting_sequential_consistency() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let accountant = Accountant::new(&mint);
let event_processor = EventProcessor::new(accountant, &mint.last_id(), None);
// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let events = vec![Event::Transaction(tr)];
let entry0 = event_processor.process_events(events).unwrap();
// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let events = vec![Event::Transaction(tr)];
let entry1 = event_processor.process_events(events).unwrap();
// Collect the ledger and feed it to a new accountant.
let entries = vec![entry0, entry1];
// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
// the account balance below zero before the credit is added.
let accountant = Accountant::new(&mint);
for entry in entries {
assert!(
accountant
.process_verified_events(entry.events)
.into_iter()
.all(|x| x.is_ok())
);
}
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1));
}
}
#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
use self::test::Bencher;
use accountant::{Accountant, MAX_ENTRY_IDS};
use bincode::serialize;
use event_processor::*;
use hash::hash;
use mint::Mint;
use rayon::prelude::*;
use signature::{KeyPair, KeyPairUtil};
use std::collections::HashSet;
use std::time::Instant;
use transaction::Transaction;
#[bench]
fn process_events_bench(_bencher: &mut Bencher) {
let mint = Mint::new(100_000_000);
let accountant = Accountant::new(&mint);
// Create transactions between unrelated parties.
let txs = 100_000;
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
let transactions: Vec<_> = (0..txs)
.into_par_iter()
.map(|i| {
// Seed the 'to' account and a cell for its signature.
let dummy_id = i % (MAX_ENTRY_IDS as i32);
let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
{
let mut last_ids = last_ids.lock().unwrap();
if !last_ids.contains(&last_id) {
last_ids.insert(last_id);
accountant.register_entry_id(&last_id);
}
}
// Seed the 'from' account.
let rando0 = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
accountant.process_verified_transaction(&tr).unwrap();
let rando1 = KeyPair::new();
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
accountant.process_verified_transaction(&tr).unwrap();
// Finally, return a transaction that's unique
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
})
.collect();
let events: Vec<_> = transactions
.into_iter()
.map(|tr| Event::Transaction(tr))
.collect();
let event_processor = EventProcessor::new(accountant, &mint.last_id(), None);
let now = Instant::now();
assert!(event_processor.process_events(events).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;
// Ensure that all transactions were successfully logged.
drop(event_processor.historian_input);
let entries: Vec<Entry> = event_processor.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);
println!("{} tps", tps);
}
}

View File

@ -1,5 +1,5 @@
#![cfg_attr(feature = "unstable", feature(test))] #![cfg_attr(feature = "unstable", feature(test))]
pub mod accountant; pub mod bank;
pub mod crdt; pub mod crdt;
pub mod ecdsa; pub mod ecdsa;
pub mod entry; pub mod entry;
@ -7,14 +7,13 @@ pub mod entry_writer;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
pub mod erasure; pub mod erasure;
pub mod event; pub mod event;
pub mod event_processor;
pub mod hash; pub mod hash;
pub mod historian;
pub mod ledger; pub mod ledger;
pub mod logger; pub mod logger;
pub mod mint; pub mod mint;
pub mod packet; pub mod packet;
pub mod plan; pub mod plan;
pub mod record_stage;
pub mod recorder; pub mod recorder;
pub mod request; pub mod request;
pub mod request_processor; pub mod request_processor;

View File

@ -1,30 +1,29 @@
//! The `historian` module provides a microservice for generating a Proof of History. //! The `record_stage` implements the Record stage of the TPU.
//! It manages a thread containing a Proof of History Recorder. //! It manages a thread containing a Proof of History Recorder.
use entry::Entry; use entry::Entry;
use hash::Hash; use hash::Hash;
use recorder::{ExitReason, Recorder, Signal}; use recorder::{ExitReason, Recorder, Signal};
use std::sync::Mutex;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Instant; use std::time::{Duration, Instant};
pub struct Historian { pub struct RecordStage {
pub entry_receiver: Mutex<Receiver<Entry>>, pub entry_receiver: Receiver<Entry>,
pub thread_hdl: JoinHandle<ExitReason>, pub thread_hdl: JoinHandle<ExitReason>,
} }
impl Historian { impl RecordStage {
pub fn new( pub fn new(
event_receiver: Receiver<Signal>, event_receiver: Receiver<Signal>,
start_hash: &Hash, start_hash: &Hash,
ms_per_tick: Option<u64>, tick_duration: Option<Duration>,
) -> Self { ) -> Self {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let thread_hdl = let thread_hdl =
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Self::create_recorder(*start_hash, tick_duration, event_receiver, entry_sender);
Historian { RecordStage {
entry_receiver: Mutex::new(entry_receiver), entry_receiver,
thread_hdl, thread_hdl,
} }
} }
@ -33,18 +32,18 @@ impl Historian {
/// sending back Entry messages until either the receiver or sender channel is closed. /// sending back Entry messages until either the receiver or sender channel is closed.
fn create_recorder( fn create_recorder(
start_hash: Hash, start_hash: Hash,
ms_per_tick: Option<u64>, tick_duration: Option<Duration>,
receiver: Receiver<Signal>, receiver: Receiver<Signal>,
sender: Sender<Entry>, sender: Sender<Entry>,
) -> JoinHandle<ExitReason> { ) -> JoinHandle<ExitReason> {
spawn(move || { spawn(move || {
let mut recorder = Recorder::new(receiver, sender, start_hash); let mut recorder = Recorder::new(receiver, sender, start_hash);
let now = Instant::now(); let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
loop { loop {
if let Err(err) = recorder.process_events(now, ms_per_tick) { if let Err(err) = recorder.process_events(duration_data) {
return err; return err;
} }
if ms_per_tick.is_some() { if duration_data.is_some() {
recorder.hash(); recorder.hash();
} }
} }
@ -52,10 +51,7 @@ impl Historian {
} }
pub fn receive(self: &Self) -> Result<Entry, TryRecvError> { pub fn receive(self: &Self) -> Result<Entry, TryRecvError> {
self.entry_receiver self.entry_receiver.try_recv()
.lock()
.expect("'entry_receiver' lock in pub fn receive")
.try_recv()
} }
} }
@ -64,13 +60,12 @@ mod tests {
use super::*; use super::*;
use ledger::Block; use ledger::Block;
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration;
#[test] #[test]
fn test_historian() { fn test_historian() {
let (input, event_receiver) = channel(); let (input, event_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(event_receiver, &zero, None); let record_stage = RecordStage::new(event_receiver, &zero, None);
input.send(Signal::Tick).unwrap(); input.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
@ -78,9 +73,9 @@ mod tests {
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
input.send(Signal::Tick).unwrap(); input.send(Signal::Tick).unwrap();
let entry0 = hist.entry_receiver.lock().unwrap().recv().unwrap(); let entry0 = record_stage.entry_receiver.recv().unwrap();
let entry1 = hist.entry_receiver.lock().unwrap().recv().unwrap(); let entry1 = record_stage.entry_receiver.recv().unwrap();
let entry2 = hist.entry_receiver.lock().unwrap().recv().unwrap(); let entry2 = record_stage.entry_receiver.recv().unwrap();
assert_eq!(entry0.num_hashes, 0); assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0); assert_eq!(entry1.num_hashes, 0);
@ -88,7 +83,7 @@ mod tests {
drop(input); drop(input);
assert_eq!( assert_eq!(
hist.thread_hdl.join().unwrap(), record_stage.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected ExitReason::RecvDisconnected
); );
@ -99,11 +94,11 @@ mod tests {
fn test_historian_closed_sender() { fn test_historian_closed_sender() {
let (input, event_receiver) = channel(); let (input, event_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(event_receiver, &zero, None); let record_stage = RecordStage::new(event_receiver, &zero, None);
drop(hist.entry_receiver); drop(record_stage.entry_receiver);
input.send(Signal::Tick).unwrap(); input.send(Signal::Tick).unwrap();
assert_eq!( assert_eq!(
hist.thread_hdl.join().unwrap(), record_stage.thread_hdl.join().unwrap(),
ExitReason::SendDisconnected ExitReason::SendDisconnected
); );
} }
@ -113,11 +108,11 @@ mod tests {
fn test_ticking_historian() { fn test_ticking_historian() {
let (input, event_receiver) = channel(); let (input, event_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(event_receiver, &zero, Some(20)); let record_stage = RecordStage::new(event_receiver, &zero, Some(Duration::from_millis(20)));
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
input.send(Signal::Tick).unwrap(); input.send(Signal::Tick).unwrap();
drop(input); drop(input);
let entries: Vec<Entry> = hist.entry_receiver.lock().unwrap().iter().collect(); let entries: Vec<Entry> = record_stage.entry_receiver.iter().collect();
assert!(entries.len() > 1); assert!(entries.len() > 1);
// Ensure the ID is not the seed. // Ensure the ID is not the seed.

View File

@ -28,7 +28,7 @@ pub struct Recorder {
receiver: Receiver<Signal>, receiver: Receiver<Signal>,
last_hash: Hash, last_hash: Hash,
num_hashes: u64, num_hashes: u64,
num_ticks: u64, num_ticks: u32,
} }
impl Recorder { impl Recorder {
@ -57,13 +57,13 @@ impl Recorder {
pub fn process_events( pub fn process_events(
&mut self, &mut self,
epoch: Instant, duration_data: Option<(Instant, Duration)>,
ms_per_tick: Option<u64>,
) -> Result<(), ExitReason> { ) -> Result<(), ExitReason> {
loop { loop {
if let Some(ms) = ms_per_tick { if let Some((start_time, tick_duration)) = duration_data {
if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) { if start_time.elapsed() > tick_duration * (self.num_ticks + 1) {
self.record_entry(vec![])?; self.record_entry(vec![])?;
// TODO: don't let this overflow u32
self.num_ticks += 1; self.num_ticks += 1;
} }
} }
@ -104,7 +104,7 @@ mod tests {
signal_sender signal_sender
.send(Signal::Events(vec![event0, event1])) .send(Signal::Events(vec![event0, event1]))
.unwrap(); .unwrap();
recorder.process_events(Instant::now(), None).unwrap(); recorder.process_events(None).unwrap();
drop(recorder.sender); drop(recorder.sender);
let entries: Vec<_> = entry_receiver.iter().collect(); let entries: Vec<_> = entry_receiver.iter().collect();

View File

@ -1,13 +1,12 @@
//! The `request_stage` processes thin client Request messages. //! The `request_stage` processes thin client Request messages.
use accountant::Accountant; use bank::Bank;
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use entry::Entry;
use event::Event; use event::Event;
use event_processor::EventProcessor;
use packet; use packet;
use packet::SharedPackets; use packet::SharedPackets;
use rayon::prelude::*; use rayon::prelude::*;
use recorder::Signal;
use request::{Request, Response}; use request::{Request, Response};
use result::Result; use result::Result;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -20,13 +19,13 @@ use streamer;
use timing; use timing;
pub struct RequestProcessor { pub struct RequestProcessor {
accountant: Arc<Accountant>, bank: Arc<Bank>,
} }
impl RequestProcessor { impl RequestProcessor {
/// Create a new Tpu that wraps the given Accountant. /// Create a new Tpu that wraps the given Bank.
pub fn new(accountant: Arc<Accountant>) -> Self { pub fn new(bank: Arc<Bank>) -> Self {
RequestProcessor { accountant } RequestProcessor { bank }
} }
/// Process Request items sent by clients. /// Process Request items sent by clients.
@ -37,19 +36,19 @@ impl RequestProcessor {
) -> Option<(Response, SocketAddr)> { ) -> Option<(Response, SocketAddr)> {
match msg { match msg {
Request::GetBalance { key } => { Request::GetBalance { key } => {
let val = self.accountant.get_balance(&key); let val = self.bank.get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr); let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp); info!("Response::Balance {:?}", rsp);
Some(rsp) Some(rsp)
} }
Request::GetLastId => { Request::GetLastId => {
let id = self.accountant.last_id(); let id = self.bank.last_id();
let rsp = (Response::LastId { id }, rsp_addr); let rsp = (Response::LastId { id }, rsp_addr);
info!("Response::LastId {:?}", rsp); info!("Response::LastId {:?}", rsp);
Some(rsp) Some(rsp)
} }
Request::GetTransactionCount => { Request::GetTransactionCount => {
let transaction_count = self.accountant.transaction_count() as u64; let transaction_count = self.bank.transaction_count() as u64;
let rsp = (Response::TransactionCount { transaction_count }, rsp_addr); let rsp = (Response::TransactionCount { transaction_count }, rsp_addr);
info!("Response::TransactionCount {:?}", rsp); info!("Response::TransactionCount {:?}", rsp);
Some(rsp) Some(rsp)
@ -140,9 +139,8 @@ impl RequestProcessor {
pub fn process_request_packets( pub fn process_request_packets(
&self, &self,
event_processor: &EventProcessor,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
entry_sender: &Sender<Entry>, signal_sender: &Sender<Signal>,
blob_sender: &streamer::BlobSender, blob_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler, packet_recycler: &packet::PacketRecycler,
blob_recycler: &packet::BlobRecycler, blob_recycler: &packet::BlobRecycler,
@ -176,8 +174,9 @@ impl RequestProcessor {
debug!("events: {} reqs: {}", events.len(), reqs.len()); debug!("events: {} reqs: {}", events.len(), reqs.len());
debug!("process_events"); debug!("process_events");
let entry = event_processor.process_events(events)?; let results = self.bank.process_verified_events(events);
entry_sender.send(entry)?; let events = results.into_iter().filter_map(|x| x.ok()).collect();
signal_sender.send(Signal::Events(events))?;
debug!("done process_events"); debug!("done process_events");
debug!("process_requests"); debug!("process_requests");

View File

@ -1,9 +1,8 @@
//! The `request_stage` processes thin client Request messages. //! The `request_stage` processes thin client Request messages.
use entry::Entry;
use event_processor::EventProcessor;
use packet; use packet;
use packet::SharedPackets; use packet::SharedPackets;
use recorder::Signal;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -13,7 +12,7 @@ use streamer;
pub struct RequestStage { pub struct RequestStage {
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
pub entry_receiver: Receiver<Entry>, pub signal_receiver: Receiver<Signal>,
pub blob_receiver: streamer::BlobReceiver, pub blob_receiver: streamer::BlobReceiver,
pub request_processor: Arc<RequestProcessor>, pub request_processor: Arc<RequestProcessor>,
} }
@ -21,7 +20,6 @@ pub struct RequestStage {
impl RequestStage { impl RequestStage {
pub fn new( pub fn new(
request_processor: RequestProcessor, request_processor: RequestProcessor,
event_processor: Arc<EventProcessor>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
packet_recycler: packet::PacketRecycler, packet_recycler: packet::PacketRecycler,
@ -29,13 +27,12 @@ impl RequestStage {
) -> Self { ) -> Self {
let request_processor = Arc::new(request_processor); let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone(); let request_processor_ = request_processor.clone();
let (entry_sender, entry_receiver) = channel(); let (signal_sender, signal_receiver) = channel();
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop { let thread_hdl = spawn(move || loop {
let e = request_processor_.process_request_packets( let e = request_processor_.process_request_packets(
&event_processor,
&verified_receiver, &verified_receiver,
&entry_sender, &signal_sender,
&blob_sender, &blob_sender,
&packet_recycler, &packet_recycler,
&blob_recycler, &blob_recycler,
@ -48,9 +45,145 @@ impl RequestStage {
}); });
RequestStage { RequestStage {
thread_hdl, thread_hdl,
entry_receiver, signal_receiver,
blob_receiver, blob_receiver,
request_processor, request_processor,
} }
} }
} }
// TODO: When banking is pulled out of RequestStage, add this test back in.
//use bank::Bank;
//use entry::Entry;
//use event::Event;
//use hash::Hash;
//use record_stage::RecordStage;
//use recorder::Signal;
//use result::Result;
//use std::sync::mpsc::{channel, Sender};
//use std::sync::{Arc, Mutex};
//use std::time::Duration;
//
//#[cfg(test)]
//mod tests {
// use bank::Bank;
// use event::Event;
// use event_processor::EventProcessor;
// use mint::Mint;
// use signature::{KeyPair, KeyPairUtil};
// use transaction::Transaction;
//
// #[test]
// // TODO: Move this test banking_stage. Calling process_events() directly
// // defeats the purpose of this test.
// fn test_banking_sequential_consistency() {
// // In this attack we'll demonstrate that a verifier can interpret the ledger
// // differently if either the server doesn't signal the ledger to add an
// // Entry OR if the verifier tries to parallelize across multiple Entries.
// let mint = Mint::new(2);
// let bank = Bank::new(&mint);
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None);
//
// // Process a batch that includes a transaction that receives two tokens.
// let alice = KeyPair::new();
// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
// let events = vec![Event::Transaction(tr)];
// let entry0 = event_processor.process_events(events).unwrap();
//
// // Process a second batch that spends one of those tokens.
// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
// let events = vec![Event::Transaction(tr)];
// let entry1 = event_processor.process_events(events).unwrap();
//
// // Collect the ledger and feed it to a new bank.
// let entries = vec![entry0, entry1];
//
// // Assert the user holds one token, not two. If the server only output one
// // entry, then the second transaction will be rejected, because it drives
// // the account balance below zero before the credit is added.
// let bank = Bank::new(&mint);
// for entry in entries {
// assert!(
// bank
// .process_verified_events(entry.events)
// .into_iter()
// .all(|x| x.is_ok())
// );
// }
// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1));
// }
//}
//
//#[cfg(all(feature = "unstable", test))]
//mod bench {
// extern crate test;
// use self::test::Bencher;
// use bank::{Bank, MAX_ENTRY_IDS};
// use bincode::serialize;
// use event_processor::*;
// use hash::hash;
// use mint::Mint;
// use rayon::prelude::*;
// use signature::{KeyPair, KeyPairUtil};
// use std::collections::HashSet;
// use std::time::Instant;
// use transaction::Transaction;
//
// #[bench]
// fn process_events_bench(_bencher: &mut Bencher) {
// let mint = Mint::new(100_000_000);
// let bank = Bank::new(&mint);
// // Create transactions between unrelated parties.
// let txs = 100_000;
// let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
// let transactions: Vec<_> = (0..txs)
// .into_par_iter()
// .map(|i| {
// // Seed the 'to' account and a cell for its signature.
// let dummy_id = i % (MAX_ENTRY_IDS as i32);
// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
// {
// let mut last_ids = last_ids.lock().unwrap();
// if !last_ids.contains(&last_id) {
// last_ids.insert(last_id);
// bank.register_entry_id(&last_id);
// }
// }
//
// // Seed the 'from' account.
// let rando0 = KeyPair::new();
// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
// bank.process_verified_transaction(&tr).unwrap();
//
// let rando1 = KeyPair::new();
// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
// bank.process_verified_transaction(&tr).unwrap();
//
// // Finally, return a transaction that's unique
// Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
// })
// .collect();
//
// let events: Vec<_> = transactions
// .into_iter()
// .map(|tr| Event::Transaction(tr))
// .collect();
//
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None);
//
// let now = Instant::now();
// assert!(event_processor.process_events(events).is_ok());
// let duration = now.elapsed();
// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
// let tps = txs as f64 / sec;
//
// // Ensure that all transactions were successfully logged.
// drop(event_processor.historian_input);
// let entries: Vec<Entry> = event_processor.output.lock().unwrap().iter().collect();
// assert_eq!(entries.len(), 1);
// assert_eq!(entries[0].events.len(), txs as usize);
//
// println!("{} tps", tps);
// }
//}

View File

@ -1,6 +1,6 @@
//! The `result` module exposes a Result type that propagates one of many different Error types. //! The `result` module exposes a Result type that propagates one of many different Error types.
use accountant; use bank;
use bincode; use bincode;
use serde_json; use serde_json;
use std; use std;
@ -15,7 +15,7 @@ pub enum Error {
RecvError(std::sync::mpsc::RecvError), RecvError(std::sync::mpsc::RecvError),
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
Serialize(std::boxed::Box<bincode::ErrorKind>), Serialize(std::boxed::Box<bincode::ErrorKind>),
AccountingError(accountant::AccountingError), BankError(bank::BankError),
SendError, SendError,
Services, Services,
GeneralError, GeneralError,
@ -33,9 +33,9 @@ impl std::convert::From<std::sync::mpsc::RecvTimeoutError> for Error {
Error::RecvTimeoutError(e) Error::RecvTimeoutError(e)
} }
} }
impl std::convert::From<accountant::AccountingError> for Error { impl std::convert::From<bank::BankError> for Error {
fn from(e: accountant::AccountingError) -> Error { fn from(e: bank::BankError) -> Error {
Error::AccountingError(e) Error::BankError(e)
} }
} }
impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error { impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error {

View File

@ -1,12 +1,13 @@
//! The `rpu` module implements the Request Processing Unit, a //! The `rpu` module implements the Request Processing Unit, a
//! 5-stage transaction processing pipeline in software. //! 5-stage transaction processing pipeline in software.
use accountant::Accountant; use bank::Bank;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use entry::Entry; use entry::Entry;
use entry_writer::EntryWriter; use entry_writer::EntryWriter;
use event_processor::EventProcessor; use hash::Hash;
use packet; use packet;
use record_stage::RecordStage;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
use request_stage::RequestStage; use request_stage::RequestStage;
use result::Result; use result::Result;
@ -17,22 +18,27 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer; use streamer;
pub struct Rpu { pub struct Rpu {
event_processor: Arc<EventProcessor>, bank: Arc<Bank>,
start_hash: Hash,
tick_duration: Option<Duration>,
} }
impl Rpu { impl Rpu {
/// Create a new Rpu that wraps the given Accountant. /// Create a new Rpu that wraps the given Bank.
pub fn new(event_processor: EventProcessor) -> Self { pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option<Duration>) -> Self {
Rpu { Rpu {
event_processor: Arc::new(event_processor), bank: Arc::new(bank),
start_hash,
tick_duration,
} }
} }
fn write_service<W: Write + Send + 'static>( fn write_service<W: Write + Send + 'static>(
accountant: Arc<Accountant>, bank: Arc<Bank>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender, broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler, blob_recycler: packet::BlobRecycler,
@ -40,7 +46,7 @@ impl Rpu {
entry_receiver: Receiver<Entry>, entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
let entry_writer = EntryWriter::new(&accountant); let entry_writer = EntryWriter::new(&bank);
let _ = entry_writer.write_and_send_entries( let _ = entry_writer.write_and_send_entries(
&broadcast, &broadcast,
&blob_recycler, &blob_recycler,
@ -85,24 +91,29 @@ impl Rpu {
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = packet::BlobRecycler::default();
let request_processor = RequestProcessor::new(self.event_processor.accountant.clone()); let request_processor = RequestProcessor::new(self.bank.clone());
let request_stage = RequestStage::new( let request_stage = RequestStage::new(
request_processor, request_processor,
self.event_processor.clone(),
exit.clone(), exit.clone(),
sig_verify_stage.verified_receiver, sig_verify_stage.verified_receiver,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),
); );
let record_stage = RecordStage::new(
request_stage.signal_receiver,
&self.start_hash,
self.tick_duration,
);
let (broadcast_sender, broadcast_receiver) = channel(); let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service( let t_write = Self::write_service(
self.event_processor.accountant.clone(), self.bank.clone(),
exit.clone(), exit.clone(),
broadcast_sender, broadcast_sender,
blob_recycler.clone(), blob_recycler.clone(),
Mutex::new(writer), Mutex::new(writer),
request_stage.entry_receiver, record_stage.entry_receiver,
); );
let broadcast_socket = UdpSocket::bind(local)?; let broadcast_socket = UdpSocket::bind(local)?;

View File

@ -322,7 +322,7 @@ fn retransmit(
/// # Arguments /// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1. /// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit. /// * `exit` - Boolean to signal system exit.
/// * `crdt` - This structure needs to be updated and populated by the accountant and via gossip. /// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler. /// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn retransmitter( pub fn retransmitter(

View File

@ -154,9 +154,8 @@ impl ThinClient {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use accountant::Accountant; use bank::Bank;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use event_processor::EventProcessor;
use futures::Future; use futures::Future;
use logger; use logger;
use mint::Mint; use mint::Mint;
@ -188,11 +187,10 @@ mod tests {
); );
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let accountant = Accountant::new(&alice); let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)));
let rpu = Rpu::new(event_processor);
let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
@ -227,11 +225,10 @@ mod tests {
fn test_bad_sig() { fn test_bad_sig() {
let (leader_data, leader_gossip, _, leader_serve, _leader_events) = tvu::test_node(); let (leader_data, leader_gossip, _, leader_serve, _leader_events) = tvu::test_node();
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let accountant = Accountant::new(&alice); let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)));
let rpu = Rpu::new(event_processor);
let serve_addr = leader_serve.local_addr().unwrap(); let serve_addr = leader_serve.local_addr().unwrap();
let threads = rpu.serve( let threads = rpu.serve(
leader_data, leader_data,
@ -298,23 +295,25 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let leader_acc = { let leader_bank = {
let accountant = Accountant::new(&alice); let bank = Bank::new(&alice);
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)))
Rpu::new(event_processor)
}; };
let replicant_acc = { let replicant_bank = {
let accountant = Accountant::new(&alice); let bank = Bank::new(&alice);
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); Arc::new(Tvu::new(
Arc::new(Tvu::new(event_processor)) bank,
alice.last_id(),
Some(Duration::from_millis(30)),
))
}; };
let leader_threads = leader_acc let leader_threads = leader_bank
.serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink()) .serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink())
.unwrap(); .unwrap();
let replicant_threads = Tvu::serve( let replicant_threads = Tvu::serve(
&replicant_acc, &replicant_bank,
replicant.0.clone(), replicant.0.clone(),
replicant.1, replicant.1,
replicant.2, replicant.2,

View File

@ -1,13 +1,14 @@
//! The `tvu` module implements the Transaction Validation Unit, a //! The `tvu` module implements the Transaction Validation Unit, a
//! 5-stage transaction validation pipeline in software. //! 5-stage transaction validation pipeline in software.
use accountant::Accountant; use bank::Bank;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use entry::Entry; use entry::Entry;
use entry_writer::EntryWriter; use entry_writer::EntryWriter;
use event_processor::EventProcessor; use hash::Hash;
use ledger; use ledger;
use packet; use packet;
use record_stage::RecordStage;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
use request_stage::RequestStage; use request_stage::RequestStage;
use result::Result; use result::Result;
@ -21,24 +22,28 @@ use std::time::Duration;
use streamer; use streamer;
pub struct Tvu { pub struct Tvu {
event_processor: Arc<EventProcessor>, bank: Arc<Bank>,
start_hash: Hash,
tick_duration: Option<Duration>,
} }
impl Tvu { impl Tvu {
/// Create a new Tvu that wraps the given Accountant. /// Create a new Tvu that wraps the given Bank.
pub fn new(event_processor: EventProcessor) -> Self { pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option<Duration>) -> Self {
Tvu { Tvu {
event_processor: Arc::new(event_processor), bank: Arc::new(bank),
start_hash,
tick_duration,
} }
} }
fn drain_service( fn drain_service(
accountant: Arc<Accountant>, bank: Arc<Bank>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
entry_receiver: Receiver<Entry>, entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || { spawn(move || {
let entry_writer = EntryWriter::new(&accountant); let entry_writer = EntryWriter::new(&bank);
loop { loop {
let _ = entry_writer.drain_entries(&entry_receiver); let _ = entry_writer.drain_entries(&entry_receiver);
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
@ -60,9 +65,7 @@ impl Tvu {
let blobs = verified_receiver.recv_timeout(timer)?; let blobs = verified_receiver.recv_timeout(timer)?;
trace!("replicating blobs {}", blobs.len()); trace!("replicating blobs {}", blobs.len());
let entries = ledger::reconstruct_entries_from_blobs(&blobs); let entries = ledger::reconstruct_entries_from_blobs(&blobs);
obj.event_processor obj.bank.process_verified_entries(entries)?;
.accountant
.process_verified_entries(entries)?;
for blob in blobs { for blob in blobs {
blob_recycler.recycle(blob); blob_recycler.recycle(blob);
} }
@ -70,9 +73,9 @@ impl Tvu {
} }
/// This service receives messages from a leader in the network and processes the transactions /// This service receives messages from a leader in the network and processes the transactions
/// on the accountant state. /// on the bank state.
/// # Arguments /// # Arguments
/// * `obj` - The accountant state. /// * `obj` - The bank state.
/// * `me` - my configuration /// * `me` - my configuration
/// * `leader` - leader configuration /// * `leader` - leader configuration
/// * `exit` - The exit signal. /// * `exit` - The exit signal.
@ -170,22 +173,24 @@ impl Tvu {
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
let request_processor = RequestProcessor::new(obj.event_processor.accountant.clone()); let request_processor = RequestProcessor::new(obj.bank.clone());
let request_stage = RequestStage::new( let request_stage = RequestStage::new(
request_processor, request_processor,
obj.event_processor.clone(),
exit.clone(), exit.clone(),
sig_verify_stage.verified_receiver, sig_verify_stage.verified_receiver,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),
); );
let t_write = Self::drain_service( let record_stage = RecordStage::new(
obj.event_processor.accountant.clone(), request_stage.signal_receiver,
exit.clone(), &obj.start_hash,
request_stage.entry_receiver, obj.tick_duration,
); );
let t_write =
Self::drain_service(obj.bank.clone(), exit.clone(), record_stage.entry_receiver);
let t_responder = streamer::responder( let t_responder = streamer::responder(
respond_socket, respond_socket,
exit.clone(), exit.clone(),
@ -232,13 +237,12 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use accountant::Accountant; use bank::Bank;
use bincode::serialize; use bincode::serialize;
use chrono::prelude::*; use chrono::prelude::*;
use crdt::Crdt; use crdt::Crdt;
use entry; use entry;
use event::Event; use event::Event;
use event_processor::EventProcessor;
use hash::{hash, Hash}; use hash::{hash, Hash};
use logger; use logger;
use mint::Mint; use mint::Mint;
@ -303,10 +307,13 @@ mod tests {
); );
let starting_balance = 10_000; let starting_balance = 10_000;
let alice = Mint::new(starting_balance); let mint = Mint::new(starting_balance);
let accountant = Accountant::new(&alice); let bank = Bank::new(&mint);
let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); let tvu = Arc::new(Tvu::new(
let tvu = Arc::new(Tvu::new(event_processor)); bank,
mint.last_id(),
Some(Duration::from_millis(30)),
));
let replicate_addr = target1_data.replicate_addr; let replicate_addr = target1_data.replicate_addr;
let threads = Tvu::serve( let threads = Tvu::serve(
&tvu, &tvu,
@ -331,24 +338,24 @@ mod tests {
w.set_index(i).unwrap(); w.set_index(i).unwrap();
w.set_id(leader_id).unwrap(); w.set_id(leader_id).unwrap();
let accountant = &tvu.event_processor.accountant; let bank = &tvu.bank;
let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]);
accountant.register_entry_id(&cur_hash); bank.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash); cur_hash = hash(&cur_hash);
let tr1 = Transaction::new( let tr1 = Transaction::new(
&alice.keypair(), &mint.keypair(),
bob_keypair.pubkey(), bob_keypair.pubkey(),
transfer_amount, transfer_amount,
cur_hash, cur_hash,
); );
accountant.register_entry_id(&cur_hash); bank.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash); cur_hash = hash(&cur_hash);
let entry1 = let entry1 =
entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]); entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]);
accountant.register_entry_id(&cur_hash); bank.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash); cur_hash = hash(&cur_hash);
alice_ref_balance -= transfer_amount; alice_ref_balance -= transfer_amount;
@ -373,11 +380,11 @@ mod tests {
msgs.push(msg); msgs.push(msg);
} }
let accountant = &tvu.event_processor.accountant; let bank = &tvu.bank;
let alice_balance = accountant.get_balance(&alice.keypair().pubkey()).unwrap(); let alice_balance = bank.get_balance(&mint.keypair().pubkey()).unwrap();
assert_eq!(alice_balance, alice_ref_balance); assert_eq!(alice_balance, alice_ref_balance);
let bob_balance = accountant.get_balance(&bob_keypair.pubkey()).unwrap(); let bob_balance = bank.get_balance(&bob_keypair.pubkey()).unwrap();
assert_eq!(bob_balance, starting_balance - alice_ref_balance); assert_eq!(bob_balance, starting_balance - alice_ref_balance);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);