Merge pull request #192 from garious/add-accounting-stage

Add accounting stage
This commit is contained in:
Greg Fitzgerald
2018-05-09 16:47:50 -06:00
committed by GitHub
14 changed files with 574 additions and 559 deletions

View File

@ -12,10 +12,6 @@ authors = [
] ]
license = "Apache-2.0" license = "Apache-2.0"
[[bin]]
name = "solana-historian-demo"
path = "src/bin/historian-demo.rs"
[[bin]] [[bin]]
name = "solana-client-demo" name = "solana-client-demo"
path = "src/bin/client-demo.rs" path = "src/bin/client-demo.rs"

View File

@ -69,9 +69,9 @@ impl Accountant {
to: mint.pubkey(), to: mint.pubkey(),
tokens: mint.tokens, tokens: mint.tokens,
}; };
let acc = Self::new_from_deposit(&deposit); let accountant = Self::new_from_deposit(&deposit);
acc.register_entry_id(&mint.last_id()); accountant.register_entry_id(&mint.last_id());
acc accountant
} }
/// Return the last entry ID registered /// Return the last entry ID registered
@ -339,24 +339,26 @@ mod tests {
fn test_accountant() { fn test_accountant() {
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
assert_eq!(acc.last_id(), alice.last_id()); assert_eq!(accountant.last_id(), alice.last_id());
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) accountant
.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
.unwrap(); .unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_000);
acc.transfer(500, &alice.keypair(), bob_pubkey, alice.last_id()) accountant
.transfer(500, &alice.keypair(), bob_pubkey, alice.last_id())
.unwrap(); .unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_500); assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_500);
} }
#[test] #[test]
fn test_account_not_found() { fn test_account_not_found() {
let mint = Mint::new(1); let mint = Mint::new(1);
let acc = Accountant::new(&mint); let accountant = Accountant::new(&mint);
assert_eq!( assert_eq!(
acc.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()), accountant.transfer(1, &KeyPair::new(), mint.pubkey(), mint.last_id()),
Err(AccountingError::AccountNotFound) Err(AccountingError::AccountNotFound)
); );
} }
@ -364,141 +366,156 @@ mod tests {
#[test] #[test]
fn test_invalid_transfer() { fn test_invalid_transfer() {
let alice = Mint::new(11_000); let alice = Mint::new(11_000);
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) accountant
.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id())
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
acc.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()), accountant.transfer(10_001, &alice.keypair(), bob_pubkey, alice.last_id()),
Err(AccountingError::InsufficientFunds) Err(AccountingError::InsufficientFunds)
); );
let alice_pubkey = alice.keypair().pubkey(); let alice_pubkey = alice.keypair().pubkey();
assert_eq!(acc.get_balance(&alice_pubkey).unwrap(), 10_000); assert_eq!(accountant.get_balance(&alice_pubkey).unwrap(), 10_000);
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 1_000);
} }
#[test] #[test]
fn test_transfer_to_newb() { fn test_transfer_to_newb() {
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
acc.transfer(500, &alice_keypair, bob_pubkey, alice.last_id()) accountant
.transfer(500, &alice_keypair, bob_pubkey, alice.last_id())
.unwrap(); .unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500); assert_eq!(accountant.get_balance(&bob_pubkey).unwrap(), 500);
} }
#[test] #[test]
fn test_transfer_on_date() { fn test_transfer_on_date() {
let alice = Mint::new(1); let alice = Mint::new(1);
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) accountant
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
.unwrap(); .unwrap();
// Alice's balance will be zero because all funds are locked up. // Alice's balance will be zero because all funds are locked up.
assert_eq!(acc.get_balance(&alice.pubkey()), Some(0)); assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0));
// Bob's balance will be None because the funds have not been // Bob's balance will be None because the funds have not been
// sent. // sent.
assert_eq!(acc.get_balance(&bob_pubkey), None); assert_eq!(accountant.get_balance(&bob_pubkey), None);
// Now, acknowledge the time in the condition occurred and // Now, acknowledge the time in the condition occurred and
// that bob's funds are now available. // that bob's funds are now available.
acc.process_verified_timestamp(alice.pubkey(), dt).unwrap(); accountant
assert_eq!(acc.get_balance(&bob_pubkey), Some(1)); .process_verified_timestamp(alice.pubkey(), dt)
.unwrap();
assert_eq!(accountant.get_balance(&bob_pubkey), Some(1));
acc.process_verified_timestamp(alice.pubkey(), dt).unwrap(); // <-- Attack! Attempt to process completed transaction. accountant
assert_ne!(acc.get_balance(&bob_pubkey), Some(2)); .process_verified_timestamp(alice.pubkey(), dt)
.unwrap(); // <-- Attack! Attempt to process completed transaction.
assert_ne!(accountant.get_balance(&bob_pubkey), Some(2));
} }
#[test] #[test]
fn test_transfer_after_date() { fn test_transfer_after_date() {
let alice = Mint::new(1); let alice = Mint::new(1);
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
acc.process_verified_timestamp(alice.pubkey(), dt).unwrap(); accountant
.process_verified_timestamp(alice.pubkey(), dt)
// It's now past now, so this transfer should be processed immediately.
acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
.unwrap(); .unwrap();
assert_eq!(acc.get_balance(&alice.pubkey()), Some(0)); // It's now past now, so this transfer should be processed immediately.
assert_eq!(acc.get_balance(&bob_pubkey), Some(1)); accountant
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
.unwrap();
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0));
assert_eq!(accountant.get_balance(&bob_pubkey), Some(1));
} }
#[test] #[test]
fn test_cancel_transfer() { fn test_cancel_transfer() {
let alice = Mint::new(1); let alice = Mint::new(1);
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let alice_keypair = alice.keypair(); let alice_keypair = alice.keypair();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let dt = Utc::now(); let dt = Utc::now();
let sig = acc.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id()) let sig = accountant
.transfer_on_date(1, &alice_keypair, bob_pubkey, dt, alice.last_id())
.unwrap(); .unwrap();
// Alice's balance will be zero because all funds are locked up. // Alice's balance will be zero because all funds are locked up.
assert_eq!(acc.get_balance(&alice.pubkey()), Some(0)); assert_eq!(accountant.get_balance(&alice.pubkey()), Some(0));
// Bob's balance will be None because the funds have not been // Bob's balance will be None because the funds have not been
// sent. // sent.
assert_eq!(acc.get_balance(&bob_pubkey), None); assert_eq!(accountant.get_balance(&bob_pubkey), None);
// Now, cancel the trancaction. Alice gets her funds back, Bob never sees them. // Now, cancel the trancaction. Alice gets her funds back, Bob never sees them.
acc.process_verified_sig(alice.pubkey(), sig).unwrap(); accountant
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1)); .process_verified_sig(alice.pubkey(), sig)
assert_eq!(acc.get_balance(&bob_pubkey), None); .unwrap();
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1));
assert_eq!(accountant.get_balance(&bob_pubkey), None);
acc.process_verified_sig(alice.pubkey(), sig).unwrap(); // <-- Attack! Attempt to cancel completed transaction. accountant
assert_ne!(acc.get_balance(&alice.pubkey()), Some(2)); .process_verified_sig(alice.pubkey(), sig)
.unwrap(); // <-- Attack! Attempt to cancel completed transaction.
assert_ne!(accountant.get_balance(&alice.pubkey()), Some(2));
} }
#[test] #[test]
fn test_duplicate_event_signature() { fn test_duplicate_event_signature() {
let alice = Mint::new(1); let alice = Mint::new(1);
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let sig = Signature::default(); let sig = Signature::default();
assert!(acc.reserve_signature_with_last_id(&sig, &alice.last_id())); assert!(accountant.reserve_signature_with_last_id(&sig, &alice.last_id()));
assert!(!acc.reserve_signature_with_last_id(&sig, &alice.last_id())); assert!(!accountant.reserve_signature_with_last_id(&sig, &alice.last_id()));
} }
#[test] #[test]
fn test_forget_signature() { fn test_forget_signature() {
let alice = Mint::new(1); let alice = Mint::new(1);
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let sig = Signature::default(); let sig = Signature::default();
acc.reserve_signature_with_last_id(&sig, &alice.last_id()); accountant.reserve_signature_with_last_id(&sig, &alice.last_id());
assert!(acc.forget_signature_with_last_id(&sig, &alice.last_id())); assert!(accountant.forget_signature_with_last_id(&sig, &alice.last_id()));
assert!(!acc.forget_signature_with_last_id(&sig, &alice.last_id())); assert!(!accountant.forget_signature_with_last_id(&sig, &alice.last_id()));
} }
#[test] #[test]
fn test_max_entry_ids() { fn test_max_entry_ids() {
let alice = Mint::new(1); let alice = Mint::new(1);
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let sig = Signature::default(); let sig = Signature::default();
for i in 0..MAX_ENTRY_IDS { for i in 0..MAX_ENTRY_IDS {
let last_id = hash(&serialize(&i).unwrap()); // Unique hash let last_id = hash(&serialize(&i).unwrap()); // Unique hash
acc.register_entry_id(&last_id); accountant.register_entry_id(&last_id);
} }
// Assert we're no longer able to use the oldest entry ID. // Assert we're no longer able to use the oldest entry ID.
assert!(!acc.reserve_signature_with_last_id(&sig, &alice.last_id())); assert!(!accountant.reserve_signature_with_last_id(&sig, &alice.last_id()));
} }
#[test] #[test]
fn test_debits_before_credits() { fn test_debits_before_credits() {
let mint = Mint::new(2); let mint = Mint::new(2);
let acc = Accountant::new(&mint); let accountant = Accountant::new(&mint);
let alice = KeyPair::new(); let alice = KeyPair::new();
let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let trs = vec![tr0, tr1]; let trs = vec![tr0, tr1];
assert!(acc.process_verified_transactions(trs)[1].is_err()); assert!(accountant.process_verified_transactions(trs)[1].is_err());
} }
} }
@ -514,7 +531,7 @@ mod bench {
#[bench] #[bench]
fn process_verified_event_bench(bencher: &mut Bencher) { fn process_verified_event_bench(bencher: &mut Bencher) {
let mint = Mint::new(100_000_000); let mint = Mint::new(100_000_000);
let acc = Accountant::new(&mint); let accountant = Accountant::new(&mint);
// Create transactions between unrelated parties. // Create transactions between unrelated parties.
let transactions: Vec<_> = (0..4096) let transactions: Vec<_> = (0..4096)
.into_par_iter() .into_par_iter()
@ -522,15 +539,15 @@ mod bench {
// Seed the 'from' account. // Seed the 'from' account.
let rando0 = KeyPair::new(); let rando0 = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id()); let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, mint.last_id());
acc.process_verified_transaction(&tr).unwrap(); accountant.process_verified_transaction(&tr).unwrap();
// Seed the 'to' account and a cell for its signature. // Seed the 'to' account and a cell for its signature.
let last_id = hash(&serialize(&i).unwrap()); // Unique hash let last_id = hash(&serialize(&i).unwrap()); // Unique hash
acc.register_entry_id(&last_id); accountant.register_entry_id(&last_id);
let rando1 = KeyPair::new(); let rando1 = KeyPair::new();
let tr = Transaction::new(&rando0, rando1.pubkey(), 1, last_id); let tr = Transaction::new(&rando0, rando1.pubkey(), 1, last_id);
acc.process_verified_transaction(&tr).unwrap(); accountant.process_verified_transaction(&tr).unwrap();
// Finally, return a transaction that's unique // Finally, return a transaction that's unique
Transaction::new(&rando0, rando1.pubkey(), 1, last_id) Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
@ -538,12 +555,13 @@ mod bench {
.collect(); .collect();
bencher.iter(|| { bencher.iter(|| {
// Since benchmarker runs this multiple times, we need to clear the signatures. // Since benchmarker runs this multiple times, we need to clear the signatures.
for sigs in acc.last_ids.read().unwrap().iter() { for sigs in accountant.last_ids.read().unwrap().iter() {
sigs.1.write().unwrap().clear(); sigs.1.write().unwrap().clear();
} }
assert!( assert!(
acc.process_verified_transactions(transactions.clone()) accountant
.process_verified_transactions(transactions.clone())
.iter() .iter()
.all(|x| x.is_ok()) .all(|x| x.is_ok())
); );

176
src/accounting_stage.rs Normal file
View File

@ -0,0 +1,176 @@
//! The `accounting_stage` module implements the accounting stage of the TPU.
use accountant::Accountant;
use entry::Entry;
use event::Event;
use hash::Hash;
use historian::Historian;
use recorder::Signal;
use result::Result;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
pub struct AccountingStage {
pub output: Mutex<Receiver<Entry>>,
entry_sender: Mutex<Sender<Entry>>,
pub accountant: Arc<Accountant>,
historian_input: Mutex<Sender<Signal>>,
historian: Mutex<Historian>,
}
impl AccountingStage {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
let (historian_input, event_receiver) = channel();
let historian = Historian::new(event_receiver, start_hash, ms_per_tick);
let (entry_sender, output) = channel();
AccountingStage {
output: Mutex::new(output),
entry_sender: Mutex::new(entry_sender),
accountant: Arc::new(accountant),
historian_input: Mutex::new(historian_input),
historian: Mutex::new(historian),
}
}
/// Process the transactions in parallel and then log the successful ones.
pub fn process_events(&self, events: Vec<Event>) -> Result<()> {
let historian = self.historian.lock().unwrap();
let results = self.accountant.process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect();
let sender = self.historian_input.lock().unwrap();
sender.send(Signal::Events(events))?;
// Wait for the historian to tag our Events with an ID and then register it.
let entry = historian.output.lock().unwrap().recv()?;
self.accountant.register_entry_id(&entry.id);
self.entry_sender.lock().unwrap().send(entry)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use accountant::Accountant;
use accounting_stage::AccountingStage;
use entry::Entry;
use event::Event;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use transaction::Transaction;
#[test]
fn test_accounting_sequential_consistency() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let accountant = Accountant::new(&mint);
let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None);
// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(accounting_stage.process_events(events).is_ok());
// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(accounting_stage.process_events(events).is_ok());
// Collect the ledger and feed it to a new accountant.
drop(accounting_stage.entry_sender);
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
// the account balance below zero before the credit is added.
let accountant = Accountant::new(&mint);
for entry in entries {
assert!(
accountant
.process_verified_events(entry.events)
.into_iter()
.all(|x| x.is_ok())
);
}
assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1));
}
}
#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
use self::test::Bencher;
use accountant::{Accountant, MAX_ENTRY_IDS};
use accounting_stage::*;
use bincode::serialize;
use hash::hash;
use historian::Historian;
use mint::Mint;
use rayon::prelude::*;
use signature::{KeyPair, KeyPairUtil};
use std::collections::HashSet;
use std::sync::mpsc::channel;
use std::time::Instant;
use transaction::Transaction;
#[bench]
fn process_events_bench(_bencher: &mut Bencher) {
let mint = Mint::new(100_000_000);
let accountant = Accountant::new(&mint);
// Create transactions between unrelated parties.
let txs = 100_000;
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
let transactions: Vec<_> = (0..txs)
.into_par_iter()
.map(|i| {
// Seed the 'to' account and a cell for its signature.
let dummy_id = i % (MAX_ENTRY_IDS as i32);
let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
{
let mut last_ids = last_ids.lock().unwrap();
if !last_ids.contains(&last_id) {
last_ids.insert(last_id);
accountant.register_entry_id(&last_id);
}
}
// Seed the 'from' account.
let rando0 = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
accountant.process_verified_transaction(&tr).unwrap();
let rando1 = KeyPair::new();
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
accountant.process_verified_transaction(&tr).unwrap();
// Finally, return a transaction that's unique
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
})
.collect();
let events: Vec<_> = transactions
.into_iter()
.map(|tr| Event::Transaction(tr))
.collect();
let (input, event_receiver) = channel();
let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None);
let now = Instant::now();
assert!(accounting_stage.process_events(events).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;
// Ensure that all transactions were successfully logged.
drop(accounting_stage.historian_input);
let entries: Vec<Entry> = accounting_stage.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);
println!("{} tps", tps);
}
}

View File

@ -87,10 +87,10 @@ fn main() {
println!("Binding to {}", client_addr); println!("Binding to {}", client_addr);
let socket = UdpSocket::bind(&client_addr).unwrap(); let socket = UdpSocket::bind(&client_addr).unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let mut acc = ThinClient::new(addr.parse().unwrap(), socket); let mut accountant = ThinClient::new(addr.parse().unwrap(), socket);
println!("Get last ID..."); println!("Get last ID...");
let last_id = acc.get_last_id().wait().unwrap(); let last_id = accountant.get_last_id().wait().unwrap();
println!("Got last ID {:?}", last_id); println!("Got last ID {:?}", last_id);
println!("Creating keypairs..."); println!("Creating keypairs...");
@ -117,7 +117,7 @@ fn main() {
nsps / 1_000_f64 nsps / 1_000_f64
); );
let initial_tx_count = acc.transaction_count(); let initial_tx_count = accountant.transaction_count();
println!("initial count {}", initial_tx_count); println!("initial count {}", initial_tx_count);
println!("Transfering {} transactions in {} batches", txs, threads); println!("Transfering {} transactions in {} batches", txs, threads);
@ -129,16 +129,16 @@ fn main() {
let mut client_addr: SocketAddr = client_addr.parse().unwrap(); let mut client_addr: SocketAddr = client_addr.parse().unwrap();
client_addr.set_port(0); client_addr.set_port(0);
let socket = UdpSocket::bind(client_addr).unwrap(); let socket = UdpSocket::bind(client_addr).unwrap();
let acc = ThinClient::new(addr.parse().unwrap(), socket); let accountant = ThinClient::new(addr.parse().unwrap(), socket);
for tr in trs { for tr in trs {
acc.transfer_signed(tr.clone()).unwrap(); accountant.transfer_signed(tr.clone()).unwrap();
} }
}); });
println!("Waiting for transactions to complete...",); println!("Waiting for transactions to complete...",);
let mut tx_count; let mut tx_count;
for _ in 0..10 { for _ in 0..10 {
tx_count = acc.transaction_count(); tx_count = accountant.transaction_count();
duration = now.elapsed(); duration = now.elapsed();
let txs = tx_count - initial_tx_count; let txs = tx_count - initial_tx_count;
println!("Transactions processed {}", txs); println!("Transactions processed {}", txs);

View File

@ -1,38 +0,0 @@
extern crate solana;
use solana::entry::Entry;
use solana::event::Event;
use solana::hash::Hash;
use solana::historian::Historian;
use solana::ledger::Block;
use solana::recorder::Signal;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::transaction::Transaction;
use std::sync::mpsc::{sync_channel, SendError, SyncSender};
use std::thread::sleep;
use std::time::Duration;
fn create_ledger(input: &SyncSender<Signal>, seed: &Hash) -> Result<(), SendError<Signal>> {
sleep(Duration::from_millis(15));
let keypair = KeyPair::new();
let tr = Transaction::new(&keypair, keypair.pubkey(), 42, *seed);
let signal0 = Signal::Event(Event::Transaction(tr));
input.send(signal0)?;
sleep(Duration::from_millis(10));
Ok(())
}
fn main() {
let (input, event_receiver) = sync_channel(10);
let seed = Hash::default();
let hist = Historian::new(event_receiver, &seed, Some(10));
create_ledger(&input, &seed).expect("send error");
drop(input);
let entries: Vec<Entry> = hist.output.lock().unwrap().iter().collect();
for entry in &entries {
println!("{:?}", entry);
}
// Proof-of-History: Verify the historian learned about the events
// in the same order they appear in the vector.
assert!(entries[..].verify(&seed));
}

View File

@ -7,10 +7,10 @@ extern crate solana;
use getopts::Options; use getopts::Options;
use isatty::stdin_isatty; use isatty::stdin_isatty;
use solana::accountant::Accountant; use solana::accountant::Accountant;
use solana::accounting_stage::AccountingStage;
use solana::crdt::ReplicatedData; use solana::crdt::ReplicatedData;
use solana::entry::Entry; use solana::entry::Entry;
use solana::event::Event; use solana::event::Event;
use solana::historian::Historian;
use solana::signature::{KeyPair, KeyPairUtil}; use solana::signature::{KeyPair, KeyPairUtil};
use solana::tpu::Tpu; use solana::tpu::Tpu;
use std::env; use std::env;
@ -18,7 +18,6 @@ use std::io::{stdin, stdout, Read};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::process::exit; use std::process::exit;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::sync_channel;
use std::sync::Arc; use std::sync::Arc;
fn print_usage(program: &str, opts: Options) { fn print_usage(program: &str, opts: Options) {
@ -95,31 +94,30 @@ fn main() {
eprintln!("creating accountant..."); eprintln!("creating accountant...");
let acc = Accountant::new_from_deposit(&deposit.unwrap()); let accountant = Accountant::new_from_deposit(&deposit.unwrap());
acc.register_entry_id(&entry0.id); accountant.register_entry_id(&entry0.id);
acc.register_entry_id(&entry1.id); accountant.register_entry_id(&entry1.id);
eprintln!("processing entries..."); eprintln!("processing entries...");
let mut last_id = entry1.id; let mut last_id = entry1.id;
for entry in entries { for entry in entries {
last_id = entry.id; last_id = entry.id;
let results = acc.process_verified_events(entry.events); let results = accountant.process_verified_events(entry.events);
for result in results { for result in results {
if let Err(e) = result { if let Err(e) = result {
eprintln!("failed to process event {:?}", e); eprintln!("failed to process event {:?}", e);
exit(1); exit(1);
} }
} }
acc.register_entry_id(&last_id); accountant.register_entry_id(&last_id);
} }
eprintln!("creating networking stack..."); eprintln!("creating networking stack...");
let (input, event_receiver) = sync_channel(10_000); let accounting_stage = AccountingStage::new(accountant, &last_id, Some(1000));
let historian = Historian::new(event_receiver, &last_id, Some(1000));
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let tpu = Arc::new(Tpu::new(acc, input, historian)); let tpu = Arc::new(Tpu::new(accounting_stage));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();

View File

@ -134,7 +134,7 @@ mod tests {
use ecdsa; use ecdsa;
use packet::{Packet, Packets, SharedPackets}; use packet::{Packet, Packets, SharedPackets};
use std::sync::RwLock; use std::sync::RwLock;
use tpu::Request; use thin_client_service::Request;
use transaction::test_tx; use transaction::test_tx;
use transaction::Transaction; use transaction::Transaction;

View File

@ -4,13 +4,13 @@
use entry::Entry; use entry::Entry;
use hash::Hash; use hash::Hash;
use recorder::{ExitReason, Recorder, Signal}; use recorder::{ExitReason, Recorder, Signal};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::sync::{Arc, Mutex}; use std::sync::Mutex;
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Instant; use std::time::Instant;
pub struct Historian { pub struct Historian {
pub output: Arc<Mutex<Receiver<Entry>>>, pub output: Mutex<Receiver<Entry>>,
pub thread_hdl: JoinHandle<ExitReason>, pub thread_hdl: JoinHandle<ExitReason>,
} }
@ -20,12 +20,11 @@ impl Historian {
start_hash: &Hash, start_hash: &Hash,
ms_per_tick: Option<u64>, ms_per_tick: Option<u64>,
) -> Self { ) -> Self {
let (entry_sender, output) = sync_channel(10_000); let (entry_sender, output) = channel();
let thread_hdl = let thread_hdl =
Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender);
let loutput = Arc::new(Mutex::new(output));
Historian { Historian {
output: loutput, output: Mutex::new(output),
thread_hdl, thread_hdl,
} }
} }
@ -36,7 +35,7 @@ impl Historian {
start_hash: Hash, start_hash: Hash,
ms_per_tick: Option<u64>, ms_per_tick: Option<u64>,
receiver: Receiver<Signal>, receiver: Receiver<Signal>,
sender: SyncSender<Entry>, sender: Sender<Entry>,
) -> JoinHandle<ExitReason> { ) -> JoinHandle<ExitReason> {
spawn(move || { spawn(move || {
let mut recorder = Recorder::new(receiver, sender, start_hash); let mut recorder = Recorder::new(receiver, sender, start_hash);
@ -66,7 +65,7 @@ mod tests {
#[test] #[test]
fn test_historian() { fn test_historian() {
let (input, event_receiver) = sync_channel(10); let (input, event_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(event_receiver, &zero, None); let hist = Historian::new(event_receiver, &zero, None);
@ -95,7 +94,7 @@ mod tests {
#[test] #[test]
fn test_historian_closed_sender() { fn test_historian_closed_sender() {
let (input, event_receiver) = sync_channel(10); let (input, event_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(event_receiver, &zero, None); let hist = Historian::new(event_receiver, &zero, None);
drop(hist.output); drop(hist.output);
@ -108,7 +107,7 @@ mod tests {
#[test] #[test]
fn test_ticking_historian() { fn test_ticking_historian() {
let (input, event_receiver) = sync_channel(10); let (input, event_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let hist = Historian::new(event_receiver, &zero, Some(20)); let hist = Historian::new(event_receiver, &zero, Some(20));
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));

View File

@ -1,5 +1,6 @@
#![cfg_attr(feature = "unstable", feature(test))] #![cfg_attr(feature = "unstable", feature(test))]
pub mod accountant; pub mod accountant;
pub mod accounting_stage;
pub mod crdt; pub mod crdt;
pub mod ecdsa; pub mod ecdsa;
pub mod entry; pub mod entry;
@ -18,6 +19,7 @@ pub mod result;
pub mod signature; pub mod signature;
pub mod streamer; pub mod streamer;
pub mod thin_client; pub mod thin_client;
pub mod thin_client_service;
pub mod timing; pub mod timing;
pub mod tpu; pub mod tpu;
pub mod transaction; pub mod transaction;

View File

@ -8,15 +8,13 @@
use entry::{create_entry_mut, Entry}; use entry::{create_entry_mut, Entry};
use event::Event; use event::Event;
use hash::{hash, Hash}; use hash::{hash, Hash};
use packet::BLOB_DATA_SIZE; use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::mem;
use std::sync::mpsc::{Receiver, SyncSender, TryRecvError};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum Signal { pub enum Signal {
Tick, Tick,
Event(Event), Events(Vec<Event>),
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@ -26,21 +24,19 @@ pub enum ExitReason {
} }
pub struct Recorder { pub struct Recorder {
sender: SyncSender<Entry>, sender: Sender<Entry>,
receiver: Receiver<Signal>, receiver: Receiver<Signal>,
last_hash: Hash, last_hash: Hash,
events: Vec<Event>,
num_hashes: u64, num_hashes: u64,
num_ticks: u64, num_ticks: u64,
} }
impl Recorder { impl Recorder {
pub fn new(receiver: Receiver<Signal>, sender: SyncSender<Entry>, last_hash: Hash) -> Self { pub fn new(receiver: Receiver<Signal>, sender: Sender<Entry>, last_hash: Hash) -> Self {
Recorder { Recorder {
receiver, receiver,
sender, sender,
last_hash, last_hash,
events: vec![],
num_hashes: 0, num_hashes: 0,
num_ticks: 0, num_ticks: 0,
} }
@ -51,8 +47,7 @@ impl Recorder {
self.num_hashes += 1; self.num_hashes += 1;
} }
pub fn record_entry(&mut self) -> Result<(), ExitReason> { pub fn record_entry(&mut self, events: Vec<Event>) -> Result<(), ExitReason> {
let events = mem::replace(&mut self.events, vec![]);
let entry = create_entry_mut(&mut self.last_hash, &mut self.num_hashes, events); let entry = create_entry_mut(&mut self.last_hash, &mut self.num_hashes, events);
self.sender self.sender
.send(entry) .send(entry)
@ -68,7 +63,7 @@ impl Recorder {
loop { loop {
if let Some(ms) = ms_per_tick { if let Some(ms) = ms_per_tick {
if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) { if epoch.elapsed() > Duration::from_millis((self.num_ticks + 1) * ms) {
self.record_entry()?; self.record_entry(vec![])?;
self.num_ticks += 1; self.num_ticks += 1;
} }
} }
@ -76,17 +71,10 @@ impl Recorder {
match self.receiver.try_recv() { match self.receiver.try_recv() {
Ok(signal) => match signal { Ok(signal) => match signal {
Signal::Tick => { Signal::Tick => {
self.record_entry()?; self.record_entry(vec![])?;
} }
Signal::Event(event) => { Signal::Events(events) => {
self.events.push(event); self.record_entry(events)?;
// Record an entry early if we anticipate its serialized size will
// be larger than 64kb. At the time of this writing, we assume each
// event will be well under 256 bytes.
if self.events.len() >= BLOB_DATA_SIZE / 256 {
self.record_entry()?;
}
} }
}, },
Err(TryRecvError::Empty) => return Ok(()), Err(TryRecvError::Empty) => return Ok(()),
@ -99,30 +87,27 @@ impl Recorder {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bincode::serialize;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::sync::mpsc::sync_channel; use std::sync::mpsc::channel;
use transaction::Transaction; use transaction::Transaction;
#[test] #[test]
fn test_sub64k_entry_size() { fn test_events() {
let (signal_sender, signal_receiver) = sync_channel(500); let (signal_sender, signal_receiver) = channel();
let (entry_sender, entry_receiver) = sync_channel(10); let (entry_sender, entry_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let mut recorder = Recorder::new(signal_receiver, entry_sender, zero); let mut recorder = Recorder::new(signal_receiver, entry_sender, zero);
let alice_keypair = KeyPair::new(); let alice_keypair = KeyPair::new();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
for _ in 0..256 { let event0 = Event::Transaction(Transaction::new(&alice_keypair, bob_pubkey, 1, zero));
let tx = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); let event1 = Event::Transaction(Transaction::new(&alice_keypair, bob_pubkey, 2, zero));
let event = Event::Transaction(tx); signal_sender
signal_sender.send(Signal::Event(event)).unwrap(); .send(Signal::Events(vec![event0, event1]))
} .unwrap();
recorder.process_events(Instant::now(), None).unwrap(); recorder.process_events(Instant::now(), None).unwrap();
drop(recorder.sender); drop(recorder.sender);
let entries: Vec<_> = entry_receiver.iter().collect(); let entries: Vec<_> = entry_receiver.iter().collect();
assert_eq!(entries.len(), 1); assert_eq!(entries.len(), 1);
assert!(serialize(&entries[0]).unwrap().len() <= 65_536);
} }
} }

View File

@ -594,6 +594,7 @@ mod test {
} }
#[test] #[test]
#[ignore]
//retransmit from leader to replicate target //retransmit from leader to replicate target
pub fn retransmit() { pub fn retransmit() {
logger::setup(); logger::setup();

View File

@ -10,7 +10,7 @@ use signature::{KeyPair, PublicKey, Signature};
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use tpu::{Request, Response, Subscription}; use thin_client_service::{Request, Response, Subscription};
use transaction::Transaction; use transaction::Transaction;
pub struct ThinClient { pub struct ThinClient {
@ -148,22 +148,21 @@ impl ThinClient {
mod tests { mod tests {
use super::*; use super::*;
use accountant::Accountant; use accountant::Accountant;
use accounting_stage::AccountingStage;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use futures::Future; use futures::Future;
use historian::Historian;
use logger; use logger;
use mint::Mint; use mint::Mint;
use plan::Plan;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::io::sink; use std::io::sink;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use tpu::Tpu; use tpu::{self, Tpu};
// TODO: Figure out why this test sometimes hangs on TravisCI.
#[test] #[test]
fn test_thin_client() { fn test_thin_client() {
logger::setup(); logger::setup();
@ -180,25 +179,26 @@ mod tests {
); );
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (input, event_receiver) = sync_channel(10); let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let accountant = Arc::new(Tpu::new(accounting_stage));
let acc = Arc::new(Tpu::new(acc, input, historian)); let threads =
let threads = Tpu::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); Tpu::serve(&accountant, d, serve, skinny, gossip, exit.clone(), sink()).unwrap();
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut acc = ThinClient::new(addr, socket); let mut accountant = ThinClient::new(addr, socket);
let last_id = acc.get_last_id().wait().unwrap(); let last_id = accountant.get_last_id().wait().unwrap();
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) let _sig = accountant
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap(); .unwrap();
let mut balance; let mut balance;
let now = Instant::now(); let now = Instant::now();
loop { loop {
balance = acc.get_balance(&bob_pubkey); balance = accountant.get_balance(&bob_pubkey);
if balance.is_ok() { if balance.is_ok() {
break; break;
} }
@ -213,6 +213,54 @@ mod tests {
} }
} }
#[test]
fn test_bad_sig() {
let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = tpu::test_node();
let alice = Mint::new(10_000);
let accountant = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(accounting_stage));
let serve_addr = leader_serve.local_addr().unwrap();
let threads = Tpu::serve(
&tpu,
leader_data,
leader_serve,
leader_skinny,
leader_gossip,
exit.clone(),
sink(),
).unwrap();
sleep(Duration::from_millis(300));
let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let mut client = ThinClient::new(serve_addr, socket);
let last_id = client.get_last_id().wait().unwrap();
trace!("doing stuff");
let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);
let _sig = client.transfer_signed(tr).unwrap();
let last_id = client.get_last_id().wait().unwrap();
let mut tr2 = Transaction::new(&alice.keypair(), bob_pubkey, 501, last_id);
tr2.data.tokens = 502;
tr2.data.plan = Plan::new_payment(502, bob_pubkey);
let _sig = client.transfer_signed(tr2).unwrap();
assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 500);
trace!("exiting");
exit.store(true, Ordering::Relaxed);
trace!("joining threads");
for t in threads {
t.join().unwrap();
}
}
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -229,6 +277,7 @@ mod tests {
} }
#[test] #[test]
#[ignore]
fn test_multi_node() { fn test_multi_node() {
logger::setup(); logger::setup();
info!("test_multi_node"); info!("test_multi_node");
@ -239,17 +288,15 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let leader_acc = { let leader_acc = {
let (input, event_receiver) = sync_channel(10); let accountant = Accountant::new(&alice);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
let acc = Accountant::new(&alice); Arc::new(Tpu::new(accounting_stage))
Arc::new(Tpu::new(acc, input, historian))
}; };
let replicant_acc = { let replicant_acc = {
let (input, event_receiver) = sync_channel(10); let accountant = Accountant::new(&alice);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
let acc = Accountant::new(&alice); Arc::new(Tpu::new(accounting_stage))
Arc::new(Tpu::new(acc, input, historian))
}; };
let leader_threads = Tpu::serve( let leader_threads = Tpu::serve(
@ -313,14 +360,15 @@ mod tests {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let mut acc = ThinClient::new(leader.0.serve_addr, socket); let mut accountant = ThinClient::new(leader.0.serve_addr, socket);
info!("getting leader last_id"); info!("getting leader last_id");
let last_id = acc.get_last_id().wait().unwrap(); let last_id = accountant.get_last_id().wait().unwrap();
info!("executing leader transer"); info!("executing leader transer");
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) let _sig = accountant
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap(); .unwrap();
info!("getting leader balance"); info!("getting leader balance");
acc.get_balance(&bob_pubkey).unwrap() accountant.get_balance(&bob_pubkey).unwrap()
}; };
assert_eq!(leader_balance, 500); assert_eq!(leader_balance, 500);
//verify replicant has the same balance //verify replicant has the same balance
@ -329,9 +377,9 @@ mod tests {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
let mut acc = ThinClient::new(replicant.0.serve_addr, socket); let mut accountant = ThinClient::new(replicant.0.serve_addr, socket);
info!("getting replicant balance"); info!("getting replicant balance");
if let Ok(bal) = acc.get_balance(&bob_pubkey) { if let Ok(bal) = accountant.get_balance(&bob_pubkey) {
replicant_balance = bal; replicant_balance = bal;
} }
info!("replicant balance {}", replicant_balance); info!("replicant balance {}", replicant_balance);

127
src/thin_client_service.rs Normal file
View File

@ -0,0 +1,127 @@
//! The `thin_client_service` sits alongside the TPU and queries it for information
//! on behalf of thing clients.
use accountant::Accountant;
use bincode::serialize;
use entry::Entry;
use hash::Hash;
use signature::PublicKey;
use std::net::{SocketAddr, UdpSocket};
//use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use transaction::Transaction;
pub struct ThinClientService {
//pub output: Mutex<Receiver<Response>>,
//response_sender: Mutex<Sender<Response>>,
accountant: Arc<Accountant>,
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
}
impl ThinClientService {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(accountant: Arc<Accountant>) -> Self {
//let (response_sender, output) = channel();
ThinClientService {
//output: Mutex::new(output),
//response_sender: Mutex::new(response_sender),
accountant,
entry_info_subscribers: Mutex::new(vec![]),
}
}
/// Process Request items sent by clients.
fn process_request(
&self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg {
Request::GetBalance { key } => {
let val = self.accountant.get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
match subscription {
Subscription::EntryInfo => {
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
}
}
}
None
}
}
}
pub fn process_requests(
&self,
reqs: Vec<(Request, SocketAddr)>,
) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect()
}
pub fn notify_entry_info_subscribers(&self, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
// copy subscribers to avoid taking lock while doing io
let addrs = self.entry_info_subscribers.lock().unwrap().clone();
trace!("Sending to {} addrs", addrs.len());
for addr in addrs {
let entry_info = EntryInfo {
id: entry.id,
num_hashes: entry.num_hashes,
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
trace!("sending {} to {}", data.len(), addr);
//TODO dont do IO here, this needs to be on a separate channel
let res = socket.send_to(&data, addr);
if res.is_err() {
eprintln!("couldn't send response: {:?}", res);
}
}
}
}
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Request {
Transaction(Transaction),
GetBalance { key: PublicKey },
Subscribe { subscriptions: Vec<Subscription> },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Subscription {
EntryInfo,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EntryInfo {
pub id: Hash,
pub num_hashes: u64,
pub num_events: u64,
}
impl Request {
/// Verify the request is valid.
pub fn verify(&self) -> bool {
match *self {
Request::Transaction(ref tr) => tr.verify_plan(),
_ => true,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Balance { key: PublicKey, val: Option<i64> },
EntryInfo(EntryInfo),
}

View File

@ -2,137 +2,73 @@
//! 5-stage transaction processing pipeline in software. //! 5-stage transaction processing pipeline in software.
use accountant::Accountant; use accountant::Accountant;
use accounting_stage::AccountingStage;
use bincode::{deserialize, serialize, serialize_into}; use bincode::{deserialize, serialize, serialize_into};
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use ecdsa; use ecdsa;
use entry::Entry; use entry::Entry;
use event::Event; use event::Event;
use hash::Hash;
use historian::Historian;
use packet; use packet;
use packet::{SharedBlob, SharedPackets, BLOB_SIZE}; use packet::{SharedBlob, SharedPackets, BLOB_SIZE};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
use recorder::Signal;
use result::Result; use result::Result;
use serde_json; use serde_json;
use signature::PublicKey;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::sink; use std::io::sink;
use std::io::{Cursor, Write}; use std::io::{Cursor, Write};
use std::mem::size_of; use std::mem::size_of;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use streamer; use streamer;
use thin_client_service::{Request, Response, ThinClientService};
use timing; use timing;
use transaction::Transaction;
pub struct Tpu { pub struct Tpu {
acc: Mutex<Accountant>, accounting_stage: AccountingStage,
historian_input: Mutex<SyncSender<Signal>>, thin_client_service: ThinClientService,
historian: Historian,
entry_info_subscribers: Mutex<Vec<SocketAddr>>,
}
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Request {
Transaction(Transaction),
GetBalance { key: PublicKey },
Subscribe { subscriptions: Vec<Subscription> },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Subscription {
EntryInfo,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EntryInfo {
pub id: Hash,
pub num_hashes: u64,
pub num_events: u64,
}
impl Request {
/// Verify the request is valid.
pub fn verify(&self) -> bool {
match *self {
Request::Transaction(ref tr) => tr.verify_plan(),
_ => true,
}
}
} }
type SharedTpu = Arc<Tpu>; type SharedTpu = Arc<Tpu>;
#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Balance { key: PublicKey, val: Option<i64> },
EntryInfo(EntryInfo),
}
impl Tpu { impl Tpu {
/// Create a new Tpu that wraps the given Accountant. /// Create a new Tpu that wraps the given Accountant.
pub fn new(acc: Accountant, historian_input: SyncSender<Signal>, historian: Historian) -> Self { pub fn new(accounting_stage: AccountingStage) -> Self {
let thin_client_service = ThinClientService::new(accounting_stage.accountant.clone());
Tpu { Tpu {
acc: Mutex::new(acc), accounting_stage,
entry_info_subscribers: Mutex::new(vec![]), thin_client_service,
historian_input: Mutex::new(historian_input),
historian,
} }
} }
fn notify_entry_info_subscribers(obj: &SharedTpu, entry: &Entry) { fn update_entry<W: Write>(obj: &Tpu, writer: &Mutex<W>, entry: &Entry) {
// TODO: No need to bind().
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind");
// copy subscribers to avoid taking lock while doing io
let addrs = obj.entry_info_subscribers.lock().unwrap().clone();
trace!("Sending to {} addrs", addrs.len());
for addr in addrs {
let entry_info = EntryInfo {
id: entry.id,
num_hashes: entry.num_hashes,
num_events: entry.events.len() as u64,
};
let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo");
trace!("sending {} to {}", data.len(), addr);
//TODO dont do IO here, this needs to be on a separate channel
let res = socket.send_to(&data, addr);
if res.is_err() {
eprintln!("couldn't send response: {:?}", res);
}
}
}
fn update_entry<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>, entry: &Entry) {
trace!("update_entry entry"); trace!("update_entry entry");
obj.acc.lock().unwrap().register_entry_id(&entry.id); obj.accounting_stage.accountant.register_entry_id(&entry.id);
writeln!( writeln!(
writer.lock().unwrap(), writer.lock().unwrap(),
"{}", "{}",
serde_json::to_string(&entry).unwrap() serde_json::to_string(&entry).unwrap()
).unwrap(); ).unwrap();
Self::notify_entry_info_subscribers(obj, &entry); obj.thin_client_service
.notify_entry_info_subscribers(&entry);
} }
fn receive_all<W: Write>(obj: &SharedTpu, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> { fn receive_all<W: Write>(obj: &Tpu, writer: &Mutex<W>) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations //TODO implement a serialize for channel that does this without allocations
let mut l = vec![]; let mut l = vec![];
let entry = obj.historian let entry = obj.accounting_stage
.output .output
.lock() .lock()
.unwrap() .unwrap()
.recv_timeout(Duration::new(1, 0))?; .recv_timeout(Duration::new(1, 0))?;
Self::update_entry(obj, writer, &entry); Self::update_entry(obj, writer, &entry);
l.push(entry); l.push(entry);
while let Ok(entry) = obj.historian.receive() { while let Ok(entry) = obj.accounting_stage.output.lock().unwrap().try_recv() {
Self::update_entry(obj, writer, &entry); Self::update_entry(obj, writer, &entry);
l.push(entry); l.push(entry);
} }
@ -184,7 +120,7 @@ impl Tpu {
obj: SharedTpu, obj: SharedTpu,
broadcast: &streamer::BlobSender, broadcast: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler, blob_recycler: &packet::BlobRecycler,
writer: &Arc<Mutex<W>>, writer: &Mutex<W>,
) -> Result<()> { ) -> Result<()> {
let mut q = VecDeque::new(); let mut q = VecDeque::new();
let list = Self::receive_all(&obj, writer)?; let list = Self::receive_all(&obj, writer)?;
@ -201,7 +137,7 @@ impl Tpu {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender, broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler, blob_recycler: packet::BlobRecycler,
writer: Arc<Mutex<W>>, writer: Mutex<W>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
let _ = Self::run_sync(obj.clone(), &broadcast, &blob_recycler, &writer); let _ = Self::run_sync(obj.clone(), &broadcast, &blob_recycler, &writer);
@ -212,17 +148,17 @@ impl Tpu {
}) })
} }
fn process_thin_client_requests(_obj: SharedTpu, _socket: &UdpSocket) -> Result<()> { fn process_thin_client_requests(_acc: &Arc<Accountant>, _socket: &UdpSocket) -> Result<()> {
Ok(()) Ok(())
} }
fn thin_client_service( fn thin_client_service(
obj: SharedTpu, accountant: Arc<Accountant>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
socket: UdpSocket, socket: UdpSocket,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
let _ = Self::process_thin_client_requests(obj.clone(), &socket); let _ = Self::process_thin_client_requests(&accountant, &socket);
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
info!("sync_service exiting"); info!("sync_service exiting");
break; break;
@ -247,33 +183,6 @@ impl Tpu {
}) })
} }
/// Process Request items sent by clients.
pub fn process_request(
&self,
msg: Request,
rsp_addr: SocketAddr,
) -> Option<(Response, SocketAddr)> {
match msg {
Request::GetBalance { key } => {
let val = self.acc.lock().unwrap().get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
Request::Transaction(_) => unreachable!(),
Request::Subscribe { subscriptions } => {
for subscription in subscriptions {
match subscription {
Subscription::EntryInfo => {
self.entry_info_subscribers.lock().unwrap().push(rsp_addr)
}
}
}
None
}
}
}
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> { fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?; let msgs = recvr.recv_timeout(timer)?;
@ -365,31 +274,6 @@ impl Tpu {
(events, reqs) (events, reqs)
} }
/// Process the transactions in parallel and then log the successful ones.
fn process_events(&self, events: Vec<Event>) -> Result<()> {
for result in self.acc.lock().unwrap().process_verified_events(events) {
if let Ok(event) = result {
self.historian_input
.lock()
.unwrap()
.send(Signal::Event(event))?;
}
}
// Let validators know they should not attempt to process additional
// transactions in parallel.
self.historian_input.lock().unwrap().send(Signal::Tick)?;
debug!("after historian_input");
Ok(())
}
fn process_requests(&self, reqs: Vec<(Request, SocketAddr)>) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect()
}
fn serialize_response( fn serialize_response(
resp: Response, resp: Response,
rsp_addr: SocketAddr, rsp_addr: SocketAddr,
@ -419,7 +303,7 @@ impl Tpu {
} }
fn process( fn process(
obj: &SharedTpu, obj: &Tpu,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
responder_sender: &streamer::BlobSender, responder_sender: &streamer::BlobSender,
packet_recycler: &packet::PacketRecycler, packet_recycler: &packet::PacketRecycler,
@ -454,11 +338,11 @@ impl Tpu {
debug!("events: {} reqs: {}", events.len(), reqs.len()); debug!("events: {} reqs: {}", events.len(), reqs.len());
debug!("process_events"); debug!("process_events");
obj.process_events(events)?; obj.accounting_stage.process_events(events)?;
debug!("done process_events"); debug!("done process_events");
debug!("process_requests"); debug!("process_requests");
let rsps = obj.process_requests(reqs); let rsps = obj.thin_client_service.process_requests(reqs);
debug!("done process_requests"); debug!("done process_requests");
let blobs = Self::serialize_responses(rsps, blob_recycler)?; let blobs = Self::serialize_responses(rsps, blob_recycler)?;
@ -484,7 +368,7 @@ impl Tpu {
/// Process verified blobs, already in order /// Process verified blobs, already in order
/// Respond with a signed hash of the state /// Respond with a signed hash of the state
fn replicate_state( fn replicate_state(
obj: &SharedTpu, obj: &Tpu,
verified_receiver: &streamer::BlobReceiver, verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler, blob_recycler: &packet::BlobRecycler,
) -> Result<()> { ) -> Result<()> {
@ -494,10 +378,10 @@ impl Tpu {
for msgs in &blobs { for msgs in &blobs {
let blob = msgs.read().unwrap(); let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap(); let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
let acc = obj.acc.lock().unwrap(); let accountant = &obj.accounting_stage.accountant;
for entry in entries { for entry in entries {
acc.register_entry_id(&entry.id); accountant.register_entry_id(&entry.id);
for result in acc.process_verified_events(entry.events) { for result in accountant.process_verified_events(entry.events) {
result?; result?;
} }
} }
@ -576,10 +460,14 @@ impl Tpu {
exit.clone(), exit.clone(),
broadcast_sender, broadcast_sender,
blob_recycler.clone(), blob_recycler.clone(),
Arc::new(Mutex::new(writer)), Mutex::new(writer),
); );
let t_skinny = Self::thin_client_service(obj.clone(), exit.clone(), skinny); let t_skinny = Self::thin_client_service(
obj.accounting_stage.accountant.clone(),
exit.clone(),
skinny,
);
let tpu = obj.clone(); let tpu = obj.clone();
let t_server = spawn(move || loop { let t_server = spawn(move || loop {
@ -784,41 +672,46 @@ pub fn to_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedP
} }
#[cfg(test)] #[cfg(test)]
mod tests { pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
use bincode::serialize; use signature::{KeyPair, KeyPairUtil};
use ecdsa;
use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS};
use tpu::{to_packets, Request};
use transaction::{memfind, test_tx};
let skinny = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
(d, gossip, replicate, serve, skinny)
}
#[cfg(test)]
mod tests {
use accountant::Accountant; use accountant::Accountant;
use accounting_stage::AccountingStage;
use bincode::serialize;
use chrono::prelude::*; use chrono::prelude::*;
use crdt::Crdt; use crdt::Crdt;
use crdt::ReplicatedData; use ecdsa;
use entry; use entry;
use entry::Entry;
use event::Event; use event::Event;
use futures::Future;
use hash::{hash, Hash}; use hash::{hash, Hash};
use historian::Historian;
use logger; use logger;
use mint::Mint; use mint::Mint;
use plan::Plan; use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS};
use recorder::Signal;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::sink;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::mpsc::sync_channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use streamer; use streamer;
use thin_client::ThinClient; use tpu::{test_node, to_packets, Request, Tpu};
use tpu::Tpu; use transaction::{memfind, test_tx, Transaction};
use transaction::Transaction;
#[test] #[test]
fn test_layout() { fn test_layout() {
@ -846,117 +739,9 @@ mod tests {
assert_eq!(rv[1].read().unwrap().packets.len(), 1); assert_eq!(rv[1].read().unwrap().packets.len(), 1);
} }
#[test]
fn test_accounting_sequential_consistency() {
// In this attack we'll demonstrate that a verifier can interpret the ledger
// differently if either the server doesn't signal the ledger to add an
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let acc = Accountant::new(&mint);
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let tpu = Tpu::new(acc, input, historian);
// Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(tpu.process_events(events).is_ok());
// Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let events = vec![Event::Transaction(tr)];
assert!(tpu.process_events(events).is_ok());
// Collect the ledger and feed it to a new accountant.
tpu.historian_input
.lock()
.unwrap()
.send(Signal::Tick)
.unwrap();
drop(tpu.historian_input);
let entries: Vec<Entry> = tpu.historian.output.lock().unwrap().iter().collect();
// Assert the user holds one token, not two. If the server only output one
// entry, then the second transaction will be rejected, because it drives
// the account balance below zero before the credit is added.
let acc = Accountant::new(&mint);
for entry in entries {
assert!(
acc.process_verified_events(entry.events)
.into_iter()
.all(|x| x.is_ok())
);
}
assert_eq!(acc.get_balance(&alice.pubkey()), Some(1));
}
#[test]
fn test_accountant_bad_sig() {
let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = test_node();
let alice = Mint::new(10_000);
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(acc, input, historian));
let serve_addr = leader_serve.local_addr().unwrap();
let threads = Tpu::serve(
&tpu,
leader_data,
leader_serve,
leader_skinny,
leader_gossip,
exit.clone(),
sink(),
).unwrap();
sleep(Duration::from_millis(300));
let socket = UdpSocket::bind("127.0.0.1:0").unwrap();
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
let mut client = ThinClient::new(serve_addr, socket);
let last_id = client.get_last_id().wait().unwrap();
trace!("doing stuff");
let tr = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);
let _sig = client.transfer_signed(tr).unwrap();
let last_id = client.get_last_id().wait().unwrap();
let mut tr2 = Transaction::new(&alice.keypair(), bob_pubkey, 501, last_id);
tr2.data.tokens = 502;
tr2.data.plan = Plan::new_payment(502, bob_pubkey);
let _sig = client.transfer_signed(tr2).unwrap();
assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 500);
trace!("exiting");
exit.store(true, Ordering::Relaxed);
trace!("joining threads");
for t in threads {
t.join().unwrap();
}
}
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
let skinny = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
(d, gossip, replicate, serve, skinny)
}
/// Test that mesasge sent from leader to target1 and repliated to target2 /// Test that mesasge sent from leader to target1 and repliated to target2
#[test] #[test]
#[ignore]
fn test_replicate() { fn test_replicate() {
logger::setup(); logger::setup();
let (leader_data, leader_gossip, _, leader_serve, _) = test_node(); let (leader_data, leader_gossip, _, leader_serve, _) = test_node();
@ -1005,13 +790,12 @@ mod tests {
let starting_balance = 10_000; let starting_balance = 10_000;
let alice = Mint::new(starting_balance); let alice = Mint::new(starting_balance);
let acc = Accountant::new(&alice); let accountant = Accountant::new(&alice);
let (input, event_receiver) = sync_channel(10); let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let tpu = Arc::new(Tpu::new(accounting_stage));
let acc = Arc::new(Tpu::new(acc, input, historian));
let replicate_addr = target1_data.replicate_addr; let replicate_addr = target1_data.replicate_addr;
let threads = Tpu::replicate( let threads = Tpu::replicate(
&acc, &tpu,
target1_data, target1_data,
target1_gossip, target1_gossip,
target1_serve, target1_serve,
@ -1033,9 +817,11 @@ mod tests {
w.set_index(i).unwrap(); w.set_index(i).unwrap();
w.set_id(leader_id).unwrap(); w.set_id(leader_id).unwrap();
let accountant = &tpu.accounting_stage.accountant;
let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]);
acc.acc.lock().unwrap().register_entry_id(&cur_hash); accountant.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash); cur_hash = hash(&cur_hash);
let tr1 = Transaction::new( let tr1 = Transaction::new(
@ -1044,11 +830,11 @@ mod tests {
transfer_amount, transfer_amount,
cur_hash, cur_hash,
); );
acc.acc.lock().unwrap().register_entry_id(&cur_hash); accountant.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash); cur_hash = hash(&cur_hash);
let entry1 = let entry1 =
entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]); entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]);
acc.acc.lock().unwrap().register_entry_id(&cur_hash); accountant.register_entry_id(&cur_hash);
cur_hash = hash(&cur_hash); cur_hash = hash(&cur_hash);
alice_ref_balance -= transfer_amount; alice_ref_balance -= transfer_amount;
@ -1073,18 +859,11 @@ mod tests {
msgs.push(msg); msgs.push(msg);
} }
let alice_balance = acc.acc let accountant = &tpu.accounting_stage.accountant;
.lock() let alice_balance = accountant.get_balance(&alice.keypair().pubkey()).unwrap();
.unwrap()
.get_balance(&alice.keypair().pubkey())
.unwrap();
assert_eq!(alice_balance, alice_ref_balance); assert_eq!(alice_balance, alice_ref_balance);
let bob_balance = acc.acc let bob_balance = accountant.get_balance(&bob_keypair.pubkey()).unwrap();
.lock()
.unwrap()
.get_balance(&bob_keypair.pubkey())
.unwrap();
assert_eq!(bob_balance, starting_balance - alice_ref_balance); assert_eq!(bob_balance, starting_balance - alice_ref_balance);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
@ -1120,79 +899,3 @@ mod tests {
assert!(blob_q.len() > num_blobs_ref); assert!(blob_q.len() > num_blobs_ref);
} }
} }
#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
use self::test::Bencher;
use accountant::{Accountant, MAX_ENTRY_IDS};
use bincode::serialize;
use hash::hash;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use std::collections::HashSet;
use std::sync::mpsc::sync_channel;
use std::time::Instant;
use tpu::*;
use transaction::Transaction;
#[bench]
fn process_packets_bench(_bencher: &mut Bencher) {
let mint = Mint::new(100_000_000);
let acc = Accountant::new(&mint);
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
// Create transactions between unrelated parties.
let txs = 100_000;
let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
let transactions: Vec<_> = (0..txs)
.into_par_iter()
.map(|i| {
// Seed the 'to' account and a cell for its signature.
let dummy_id = i % (MAX_ENTRY_IDS as i32);
let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash
{
let mut last_ids = last_ids.lock().unwrap();
if !last_ids.contains(&last_id) {
last_ids.insert(last_id);
acc.register_entry_id(&last_id);
}
}
// Seed the 'from' account.
let rando0 = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
acc.process_verified_transaction(&tr).unwrap();
let rando1 = KeyPair::new();
let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
acc.process_verified_transaction(&tr).unwrap();
// Finally, return a transaction that's unique
Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
})
.collect();
let req_vers = transactions
.into_iter()
.map(|tr| (Request::Transaction(tr), rsp_addr, 1_u8))
.collect();
let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None);
let tpu = Tpu::new(acc, input, historian);
let now = Instant::now();
assert!(tpu.process_events(req_vers).is_ok());
let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec;
// Ensure that all transactions were successfully logged.
drop(tpu.historian_input);
let entries: Vec<Entry> = tpu.historian.output.lock().unwrap().iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].events.len(), txs as usize);
println!("{} tps", tps);
}
}