diff --git a/src/event_processor.rs b/src/event_processor.rs deleted file mode 100644 index 3ce63d27b2..0000000000 --- a/src/event_processor.rs +++ /dev/null @@ -1,172 +0,0 @@ -//! The `event_processor` module implements the accounting stage of the TPU. - -use accountant::Accountant; -use entry::Entry; -use event::Event; -use hash::Hash; -use record_stage::RecordStage; -use recorder::Signal; -use result::Result; -use std::sync::mpsc::{channel, Sender}; -use std::sync::{Arc, Mutex}; -use std::time::Duration; - -pub struct EventProcessor { - pub accountant: Arc, - historian_input: Mutex>, - record_stage: Mutex, - pub start_hash: Hash, - pub tick_duration: Option, -} - -impl EventProcessor { - /// Create a new stage of the TPU for event and transaction processing - pub fn new(accountant: Accountant, start_hash: &Hash, tick_duration: Option) -> Self { - let (historian_input, event_receiver) = channel(); - let record_stage = RecordStage::new(event_receiver, start_hash, tick_duration); - EventProcessor { - accountant: Arc::new(accountant), - historian_input: Mutex::new(historian_input), - record_stage: Mutex::new(record_stage), - start_hash: *start_hash, - tick_duration, - } - } - - /// Process the transactions in parallel and then log the successful ones. - pub fn process_events(&self, events: Vec) -> Result { - let record_stage = self.record_stage.lock().unwrap(); - let results = self.accountant.process_verified_events(events); - let events = results.into_iter().filter_map(|x| x.ok()).collect(); - let sender = self.historian_input.lock().unwrap(); - sender.send(Signal::Events(events))?; - - // Wait for the historian to tag our Events with an ID and then register it. - let entry = record_stage.entry_receiver.recv()?; - self.accountant.register_entry_id(&entry.id); - Ok(entry) - } -} - -#[cfg(test)] -mod tests { - use accountant::Accountant; - use event::Event; - use event_processor::EventProcessor; - use mint::Mint; - use signature::{KeyPair, KeyPairUtil}; - use transaction::Transaction; - - #[test] - // TODO: Move this test accounting_stage. Calling process_events() directly - // defeats the purpose of this test. - fn test_accounting_sequential_consistency() { - // In this attack we'll demonstrate that a verifier can interpret the ledger - // differently if either the server doesn't signal the ledger to add an - // Entry OR if the verifier tries to parallelize across multiple Entries. - let mint = Mint::new(2); - let accountant = Accountant::new(&mint); - let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); - - // Process a batch that includes a transaction that receives two tokens. - let alice = KeyPair::new(); - let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); - let events = vec![Event::Transaction(tr)]; - let entry0 = event_processor.process_events(events).unwrap(); - - // Process a second batch that spends one of those tokens. - let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); - let events = vec![Event::Transaction(tr)]; - let entry1 = event_processor.process_events(events).unwrap(); - - // Collect the ledger and feed it to a new accountant. - let entries = vec![entry0, entry1]; - - // Assert the user holds one token, not two. If the server only output one - // entry, then the second transaction will be rejected, because it drives - // the account balance below zero before the credit is added. - let accountant = Accountant::new(&mint); - for entry in entries { - assert!( - accountant - .process_verified_events(entry.events) - .into_iter() - .all(|x| x.is_ok()) - ); - } - assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1)); - } -} - -#[cfg(all(feature = "unstable", test))] -mod bench { - extern crate test; - use self::test::Bencher; - use accountant::{Accountant, MAX_ENTRY_IDS}; - use bincode::serialize; - use event_processor::*; - use hash::hash; - use mint::Mint; - use rayon::prelude::*; - use signature::{KeyPair, KeyPairUtil}; - use std::collections::HashSet; - use std::time::Instant; - use transaction::Transaction; - - #[bench] - fn process_events_bench(_bencher: &mut Bencher) { - let mint = Mint::new(100_000_000); - let accountant = Accountant::new(&mint); - // Create transactions between unrelated parties. - let txs = 100_000; - let last_ids: Mutex> = Mutex::new(HashSet::new()); - let transactions: Vec<_> = (0..txs) - .into_par_iter() - .map(|i| { - // Seed the 'to' account and a cell for its signature. - let dummy_id = i % (MAX_ENTRY_IDS as i32); - let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash - { - let mut last_ids = last_ids.lock().unwrap(); - if !last_ids.contains(&last_id) { - last_ids.insert(last_id); - accountant.register_entry_id(&last_id); - } - } - - // Seed the 'from' account. - let rando0 = KeyPair::new(); - let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); - accountant.process_verified_transaction(&tr).unwrap(); - - let rando1 = KeyPair::new(); - let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); - accountant.process_verified_transaction(&tr).unwrap(); - - // Finally, return a transaction that's unique - Transaction::new(&rando0, rando1.pubkey(), 1, last_id) - }) - .collect(); - - let events: Vec<_> = transactions - .into_iter() - .map(|tr| Event::Transaction(tr)) - .collect(); - - let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); - - let now = Instant::now(); - assert!(event_processor.process_events(events).is_ok()); - let duration = now.elapsed(); - let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; - let tps = txs as f64 / sec; - - // Ensure that all transactions were successfully logged. - drop(event_processor.historian_input); - let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); - assert_eq!(entries.len(), 1); - assert_eq!(entries[0].events.len(), txs as usize); - - println!("{} tps", tps); - } -} diff --git a/src/lib.rs b/src/lib.rs index 7099ef43d8..d31c950b70 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,6 @@ pub mod entry_writer; #[cfg(feature = "erasure")] pub mod erasure; pub mod event; -pub mod event_processor; pub mod hash; pub mod ledger; pub mod logger; diff --git a/src/request_stage.rs b/src/request_stage.rs index 2f9935a2f6..e5ef5ef20d 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -51,3 +51,139 @@ impl RequestStage { } } } + +// TODO: When accounting is pulled out of RequestStage, add this test back in. + +//use accountant::Accountant; +//use entry::Entry; +//use event::Event; +//use hash::Hash; +//use record_stage::RecordStage; +//use recorder::Signal; +//use result::Result; +//use std::sync::mpsc::{channel, Sender}; +//use std::sync::{Arc, Mutex}; +//use std::time::Duration; +// +//#[cfg(test)] +//mod tests { +// use accountant::Accountant; +// use event::Event; +// use event_processor::EventProcessor; +// use mint::Mint; +// use signature::{KeyPair, KeyPairUtil}; +// use transaction::Transaction; +// +// #[test] +// // TODO: Move this test accounting_stage. Calling process_events() directly +// // defeats the purpose of this test. +// fn test_accounting_sequential_consistency() { +// // In this attack we'll demonstrate that a verifier can interpret the ledger +// // differently if either the server doesn't signal the ledger to add an +// // Entry OR if the verifier tries to parallelize across multiple Entries. +// let mint = Mint::new(2); +// let accountant = Accountant::new(&mint); +// let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); +// +// // Process a batch that includes a transaction that receives two tokens. +// let alice = KeyPair::new(); +// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); +// let events = vec![Event::Transaction(tr)]; +// let entry0 = event_processor.process_events(events).unwrap(); +// +// // Process a second batch that spends one of those tokens. +// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); +// let events = vec![Event::Transaction(tr)]; +// let entry1 = event_processor.process_events(events).unwrap(); +// +// // Collect the ledger and feed it to a new accountant. +// let entries = vec![entry0, entry1]; +// +// // Assert the user holds one token, not two. If the server only output one +// // entry, then the second transaction will be rejected, because it drives +// // the account balance below zero before the credit is added. +// let accountant = Accountant::new(&mint); +// for entry in entries { +// assert!( +// accountant +// .process_verified_events(entry.events) +// .into_iter() +// .all(|x| x.is_ok()) +// ); +// } +// assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1)); +// } +//} +// +//#[cfg(all(feature = "unstable", test))] +//mod bench { +// extern crate test; +// use self::test::Bencher; +// use accountant::{Accountant, MAX_ENTRY_IDS}; +// use bincode::serialize; +// use event_processor::*; +// use hash::hash; +// use mint::Mint; +// use rayon::prelude::*; +// use signature::{KeyPair, KeyPairUtil}; +// use std::collections::HashSet; +// use std::time::Instant; +// use transaction::Transaction; +// +// #[bench] +// fn process_events_bench(_bencher: &mut Bencher) { +// let mint = Mint::new(100_000_000); +// let accountant = Accountant::new(&mint); +// // Create transactions between unrelated parties. +// let txs = 100_000; +// let last_ids: Mutex> = Mutex::new(HashSet::new()); +// let transactions: Vec<_> = (0..txs) +// .into_par_iter() +// .map(|i| { +// // Seed the 'to' account and a cell for its signature. +// let dummy_id = i % (MAX_ENTRY_IDS as i32); +// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash +// { +// let mut last_ids = last_ids.lock().unwrap(); +// if !last_ids.contains(&last_id) { +// last_ids.insert(last_id); +// accountant.register_entry_id(&last_id); +// } +// } +// +// // Seed the 'from' account. +// let rando0 = KeyPair::new(); +// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); +// accountant.process_verified_transaction(&tr).unwrap(); +// +// let rando1 = KeyPair::new(); +// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); +// accountant.process_verified_transaction(&tr).unwrap(); +// +// // Finally, return a transaction that's unique +// Transaction::new(&rando0, rando1.pubkey(), 1, last_id) +// }) +// .collect(); +// +// let events: Vec<_> = transactions +// .into_iter() +// .map(|tr| Event::Transaction(tr)) +// .collect(); +// +// let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); +// +// let now = Instant::now(); +// assert!(event_processor.process_events(events).is_ok()); +// let duration = now.elapsed(); +// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; +// let tps = txs as f64 / sec; +// +// // Ensure that all transactions were successfully logged. +// drop(event_processor.historian_input); +// let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); +// assert_eq!(entries.len(), 1); +// assert_eq!(entries[0].events.len(), txs as usize); +// +// println!("{} tps", tps); +// } +//} diff --git a/src/thin_client.rs b/src/thin_client.rs index a682dfb22e..4f45018f11 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -156,7 +156,6 @@ mod tests { use super::*; use accountant::Accountant; use crdt::{Crdt, ReplicatedData}; - use event_processor::EventProcessor; use futures::Future; use logger; use mint::Mint; @@ -303,12 +302,11 @@ mod tests { let replicant_acc = { let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new( + Arc::new(Tvu::new( accountant, - &alice.last_id(), + alice.last_id(), Some(Duration::from_millis(30)), - ); - Arc::new(Tvu::new(event_processor)) + )) }; let leader_threads = leader_acc diff --git a/src/tvu.rs b/src/tvu.rs index 4e33bb77fa..df7a8989a1 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -5,7 +5,7 @@ use accountant::Accountant; use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; -use event_processor::EventProcessor; +use hash::Hash; use ledger; use packet; use record_stage::RecordStage; @@ -22,14 +22,18 @@ use std::time::Duration; use streamer; pub struct Tvu { - event_processor: Arc, + accountant: Arc, + start_hash: Hash, + tick_duration: Option, } impl Tvu { /// Create a new Tvu that wraps the given Accountant. - pub fn new(event_processor: EventProcessor) -> Self { + pub fn new(accountant: Accountant, start_hash: Hash, tick_duration: Option) -> Self { Tvu { - event_processor: Arc::new(event_processor), + accountant: Arc::new(accountant), + start_hash, + tick_duration, } } @@ -61,9 +65,7 @@ impl Tvu { let blobs = verified_receiver.recv_timeout(timer)?; trace!("replicating blobs {}", blobs.len()); let entries = ledger::reconstruct_entries_from_blobs(&blobs); - obj.event_processor - .accountant - .process_verified_entries(entries)?; + obj.accountant.process_verified_entries(entries)?; for blob in blobs { blob_recycler.recycle(blob); } @@ -171,7 +173,7 @@ impl Tvu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let request_processor = RequestProcessor::new(obj.event_processor.accountant.clone()); + let request_processor = RequestProcessor::new(obj.accountant.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), @@ -182,12 +184,12 @@ impl Tvu { let record_stage = RecordStage::new( request_stage.signal_receiver, - &obj.event_processor.start_hash, - obj.event_processor.tick_duration, + &obj.start_hash, + obj.tick_duration, ); let t_write = Self::drain_service( - obj.event_processor.accountant.clone(), + obj.accountant.clone(), exit.clone(), record_stage.entry_receiver, ); @@ -244,7 +246,6 @@ mod tests { use crdt::Crdt; use entry; use event::Event; - use event_processor::EventProcessor; use hash::{hash, Hash}; use logger; use mint::Mint; @@ -311,12 +312,11 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); let accountant = Accountant::new(&alice); - let event_processor = EventProcessor::new( + let tvu = Arc::new(Tvu::new( accountant, - &alice.last_id(), + alice.last_id(), Some(Duration::from_millis(30)), - ); - let tvu = Arc::new(Tvu::new(event_processor)); + )); let replicate_addr = target1_data.replicate_addr; let threads = Tvu::serve( &tvu, @@ -341,7 +341,7 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); - let accountant = &tvu.event_processor.accountant; + let accountant = &tvu.accountant; let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); @@ -383,7 +383,7 @@ mod tests { msgs.push(msg); } - let accountant = &tvu.event_processor.accountant; + let accountant = &tvu.accountant; let alice_balance = accountant.get_balance(&alice.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance);