From 9f5a3d6064e45fc10090ebbd0281dd0d6fe68a0f Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 25 May 2018 15:51:41 -0600 Subject: [PATCH] events -> transactions --- src/bank.rs | 8 ++--- src/banking_stage.rs | 60 ++++++++++++++++++------------------- src/bin/client-demo.rs | 26 ++++++++-------- src/bin/genesis-demo.rs | 4 +-- src/bin/multinode-demo.rs | 15 ++++++---- src/bin/testnode.rs | 12 ++++---- src/crdt.rs | 12 ++++---- src/entry.rs | 62 +++++++++++++++++++++------------------ src/ledger.rs | 52 +++++++++++++++++--------------- src/mint.rs | 12 ++++---- src/plan.rs | 2 +- src/record_stage.rs | 54 +++++++++++++++++----------------- src/recorder.rs | 4 +-- src/request_processor.rs | 16 +--------- src/server.rs | 4 +-- src/streamer.rs | 8 ++--- src/thin_client.rs | 41 +++++++++++++------------- src/tpu.rs | 4 +-- src/tvu.rs | 14 ++++----- 19 files changed, 203 insertions(+), 207 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index 39fad69210..d26a249c1e 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -261,7 +261,7 @@ impl Bank { pub fn process_verified_entries(&self, entries: Vec) -> Result<()> { for entry in entries { self.register_entry_id(&entry.id); - for result in self.process_verified_transactions(entry.events) { + for result in self.process_verified_transactions(entry.transactions) { result?; } } @@ -468,7 +468,7 @@ mod tests { bank.process_verified_timestamp(mint.pubkey(), dt).unwrap(); assert_eq!(bank.get_balance(&pubkey), Some(1)); - // tx count is still 1, because we chose not to count timestamp events + // tx count is still 1, because we chose not to count timestamp transactions // tx count. assert_eq!(bank.transaction_count(), 1); @@ -524,7 +524,7 @@ mod tests { } #[test] - fn test_duplicate_event_signature() { + fn test_duplicate_transaction_signature() { let mint = Mint::new(1); let bank = Bank::new(&mint); let sig = Signature::default(); @@ -581,7 +581,7 @@ mod bench { use signature::KeyPairUtil; #[bench] - fn process_verified_event_bench(bencher: &mut Bencher) { + fn process_verified_transaction_bench(bencher: &mut Bencher) { let mint = Mint::new(100_000_000); let bank = Bank::new(&mint); // Create transactions between unrelated parties. diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 426c6032c5..735e68a7a4 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -49,7 +49,7 @@ impl BankingStage { } } - fn deserialize_events(p: &packet::Packets) -> Vec> { + fn deserialize_transactions(p: &packet::Packets) -> Vec> { p.packets .par_iter() .map(|x| { @@ -79,33 +79,33 @@ impl BankingStage { ); let proc_start = Instant::now(); for (msgs, vers) in mms { - let events = Self::deserialize_events(&msgs.read().unwrap()); - reqs_len += events.len(); - let events = events + let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); + reqs_len += transactions.len(); + let transactions = transactions .into_iter() .zip(vers) - .filter_map(|(event, ver)| match event { + .filter_map(|(tx, ver)| match tx { None => None, - Some((event, _addr)) => if event.verify_plan() && ver != 0 { - Some(event) + Some((tx, _addr)) => if tx.verify_plan() && ver != 0 { + Some(tx) } else { None }, }) .collect(); - debug!("process_events"); - let results = bank.process_verified_transactions(events); - let events = results.into_iter().filter_map(|x| x.ok()).collect(); - signal_sender.send(Signal::Events(events))?; - debug!("done process_events"); + debug!("process_transactions"); + let results = bank.process_verified_transactions(transactions); + let transactions = results.into_iter().filter_map(|x| x.ok()).collect(); + signal_sender.send(Signal::Events(transactions))?; + debug!("done process_transactions"); packet_recycler.recycle(msgs); } let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); info!( - "@{:?} done processing event batches: {} time: {:?}ms reqs: {} reqs/s: {}", + "@{:?} done processing transaction batches: {} time: {:?}ms reqs: {} reqs/s: {}", timing::timestamp(), mms_len, total_time_ms, @@ -132,13 +132,12 @@ impl BankingStage { //#[cfg(test)] //mod tests { // use bank::Bank; -// use event_processor::EventProcessor; // use mint::Mint; // use signature::{KeyPair, KeyPairUtil}; // use transaction::Transaction; // // #[test] -// // TODO: Move this test banking_stage. Calling process_events() directly +// // TODO: Move this test banking_stage. Calling process_transactions() directly // // defeats the purpose of this test. // fn test_banking_sequential_consistency() { // // In this attack we'll demonstrate that a verifier can interpret the ledger @@ -146,18 +145,18 @@ impl BankingStage { // // Entry OR if the verifier tries to parallelize across multiple Entries. // let mint = Mint::new(2); // let bank = Bank::new(&mint); -// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); +// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None); // // // Process a batch that includes a transaction that receives two tokens. // let alice = KeyPair::new(); // let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); -// let events = vec![tr]; -// let entry0 = event_processor.process_events(events).unwrap(); +// let transactions = vec![tr]; +// let entry0 = banking_stage.process_transactions(transactions).unwrap(); // // // Process a second batch that spends one of those tokens. // let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); -// let events = vec![tr]; -// let entry1 = event_processor.process_events(events).unwrap(); +// let transactions = vec![tr]; +// let entry1 = banking_stage.process_transactions(transactions).unwrap(); // // // Collect the ledger and feed it to a new bank. // let entries = vec![entry0, entry1]; @@ -169,7 +168,7 @@ impl BankingStage { // for entry in entries { // assert!( // bank -// .process_verified_transactions(entry.events) +// .process_verified_transactions(entry.transactions) // .into_iter() // .all(|x| x.is_ok()) // ); @@ -184,7 +183,6 @@ impl BankingStage { // use self::test::Bencher; // use bank::{Bank, MAX_ENTRY_IDS}; // use bincode::serialize; -// use event_processor::*; // use hash::hash; // use mint::Mint; // use rayon::prelude::*; @@ -194,7 +192,7 @@ impl BankingStage { // use transaction::Transaction; // // #[bench] -// fn process_events_bench(_bencher: &mut Bencher) { +// fn process_transactions_bench(_bencher: &mut Bencher) { // let mint = Mint::new(100_000_000); // let bank = Bank::new(&mint); // // Create transactions between unrelated parties. @@ -228,17 +226,17 @@ impl BankingStage { // }) // .collect(); // -// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); +// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None); // // let now = Instant::now(); -// assert!(event_processor.process_events(transactions).is_ok()); +// assert!(banking_stage.process_transactions(transactions).is_ok()); // let duration = now.elapsed(); // let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; // let tps = txs as f64 / sec; // // // Ensure that all transactions were successfully logged. -// drop(event_processor.historian_input); -// let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); +// drop(banking_stage.historian_input); +// let entries: Vec = banking_stage.output.lock().unwrap().iter().collect(); // assert_eq!(entries.len(), 1); // assert_eq!(entries[0].transactions.len(), txs as usize); // @@ -267,14 +265,14 @@ mod bench { let mint = Mint::new(1_000_000_000); let pubkey = KeyPair::new().pubkey(); - let events: Vec<_> = (0..tx) + let transactions: Vec<_> = (0..tx) .map(|i| Transaction::new(&mint.keypair(), pubkey, i as i64, mint.last_id())) .collect(); let (verified_sender, verified_receiver) = channel(); let (signal_sender, signal_receiver) = channel(); let packet_recycler = PacketRecycler::default(); - let verified: Vec<_> = to_packets(&packet_recycler, events) + let verified: Vec<_> = to_packets(&packet_recycler, transactions) .into_iter() .map(|x| { let len = (*x).read().unwrap().packets.len(); @@ -292,8 +290,8 @@ mod bench { &packet_recycler, ).unwrap(); let signal = signal_receiver.recv().unwrap(); - if let Signal::Events(ref events) = signal { - assert_eq!(events.len(), tx); + if let Signal::Events(ref transactions) = signal { + assert_eq!(transactions.len(), tx); } else { assert!(false); } diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 43bbc90d58..1c1dc7828b 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -64,9 +64,9 @@ fn main() { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } - let mut events_addr: SocketAddr = requests_addr.parse().unwrap(); - let requests_port = events_addr.port(); - events_addr.set_port(requests_port + 1); + let mut transactions_addr: SocketAddr = requests_addr.parse().unwrap(); + let requests_port = transactions_addr.port(); + transactions_addr.set_port(requests_port + 1); if stdin_isatty() { eprintln!("nothing found on stdin, expected a json file"); @@ -91,16 +91,16 @@ fn main() { requests_socket .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); - let events_socket = UdpSocket::bind(&events_addr).unwrap(); + let transactions_socket = UdpSocket::bind(&transactions_addr).unwrap(); let requests_addr: SocketAddr = server_addr.parse().unwrap(); let requests_port = requests_addr.port(); - let mut events_server_addr = requests_addr.clone(); - events_server_addr.set_port(requests_port + 3); + let mut transactions_addr = requests_addr.clone(); + transactions_addr.set_port(requests_port + 3); let mut client = ThinClient::new( requests_addr, requests_socket, - events_server_addr, - events_socket, + transactions_addr, + transactions_socket, ); println!("Get last ID..."); @@ -146,14 +146,14 @@ fn main() { requests_socket .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); - let mut events_addr: SocketAddr = requests_addr.clone(); - events_addr.set_port(0); - let events_socket = UdpSocket::bind(&events_addr).unwrap(); + let mut transactions_addr: SocketAddr = requests_addr.clone(); + transactions_addr.set_port(0); + let transactions_socket = UdpSocket::bind(&transactions_addr).unwrap(); let client = ThinClient::new( requests_addr, requests_socket, - events_server_addr, - events_socket, + transactions_addr, + transactions_socket, ); for tr in trs { client.transfer_signed(tr.clone()).unwrap(); diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index f904990d37..634bd86694 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -42,7 +42,7 @@ fn main() { let last_id = demo.mint.last_id(); eprintln!("Signing {} transactions...", num_accounts); - let events: Vec<_> = keypairs + let transactions: Vec<_> = keypairs .into_par_iter() .map(|rando| { let last_id = demo.mint.last_id(); @@ -55,7 +55,7 @@ fn main() { } eprintln!("Logging the creation of {} accounts...", num_accounts); - let entry = Entry::new(&last_id, 0, events); + let entry = Entry::new(&last_id, 0, transactions); println!("{}", serde_json::to_string(&entry).unwrap()); eprintln!("Creating {} empty entries...", MAX_ENTRY_IDS); diff --git a/src/bin/multinode-demo.rs b/src/bin/multinode-demo.rs index ffad279234..168646a392 100644 --- a/src/bin/multinode-demo.rs +++ b/src/bin/multinode-demo.rs @@ -164,14 +164,17 @@ fn main() { now = Instant::now(); let sample = tx_count - initial_tx_count; initial_tx_count = tx_count; - println!("{}: Transactions processed {}", val.events_addr, sample); + println!( + "{}: Transactions processed {}", + val.transactions_addr, sample + ); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let tps = (sample * 1_000_000_000) as f64 / ns as f64; - println!("{}: {} tps", val.events_addr, tps); + println!("{}: {} tps", val.transactions_addr, tps); let total = tx_count - first_count; println!( "{}: Total Transactions processed {}", - val.events_addr, total + val.transactions_addr, total ); if total == transactions.len() as u64 { break; @@ -191,15 +194,15 @@ fn main() { fn mk_client(locked_addr: &Arc>, r: &ReplicatedData) -> ThinClient { let mut addr = locked_addr.write().unwrap(); let port = addr.port(); - let events_socket = UdpSocket::bind(addr.clone()).unwrap(); + let transactions_socket = UdpSocket::bind(addr.clone()).unwrap(); addr.set_port(port + 1); let requests_socket = UdpSocket::bind(addr.clone()).unwrap(); addr.set_port(port + 2); ThinClient::new( r.requests_addr, requests_socket, - r.events_addr, - events_socket, + r.transactions_addr, + transactions_socket, ) } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 4fa66b9068..18e965f81d 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -96,7 +96,7 @@ fn main() { // fields are the same. That entry should be treated as a deposit, not a // transfer to oneself. let entry1: Entry = entries.next().unwrap(); - let tr = &entry1.events[0]; + let tr = &entry1.transactions[0]; let deposit = if let Instruction::NewContract(contract) = &tr.instruction { contract.plan.final_payment() } else { @@ -114,10 +114,10 @@ fn main() { let mut last_id = entry1.id; for entry in entries { last_id = entry.id; - let results = bank.process_verified_transactions(entry.events); + let results = bank.process_verified_transactions(entry.transactions); for result in results { if let Err(e) = result { - eprintln!("failed to process event {:?}", e); + eprintln!("failed to process transaction {:?}", e); exit(1); } } @@ -155,7 +155,7 @@ fn main() { Some(Duration::from_millis(1000)), repl_data.clone(), UdpSocket::bind(repl_data.requests_addr).unwrap(), - UdpSocket::bind(repl_data.events_addr).unwrap(), + UdpSocket::bind(repl_data.transactions_addr).unwrap(), UdpSocket::bind("0.0.0.0:0").unwrap(), UdpSocket::bind("0.0.0.0:0").unwrap(), UdpSocket::bind(repl_data.gossip_addr).unwrap(), @@ -183,7 +183,7 @@ fn next_port(server_addr: &SocketAddr, nxt: u16) -> SocketAddr { } fn make_repl_data(bind_addr: &SocketAddr) -> ReplicatedData { - let events_addr = bind_addr.clone(); + let transactions_addr = bind_addr.clone(); let gossip_addr = next_port(&bind_addr, 1); let replicate_addr = next_port(&bind_addr, 2); let requests_addr = next_port(&bind_addr, 3); @@ -193,7 +193,7 @@ fn make_repl_data(bind_addr: &SocketAddr) -> ReplicatedData { gossip_addr, replicate_addr, requests_addr, - events_addr, + transactions_addr, ) } diff --git a/src/crdt.rs b/src/crdt.rs index cd1668d95e..d7e6b4505a 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -43,8 +43,8 @@ pub struct ReplicatedData { pub replicate_addr: SocketAddr, /// address to connect to when this node is leader pub requests_addr: SocketAddr, - /// events address - pub events_addr: SocketAddr, + /// transactions address + pub transactions_addr: SocketAddr, /// current leader identity pub current_leader_id: PublicKey, /// last verified hash that was submitted to the leader @@ -59,7 +59,7 @@ impl ReplicatedData { gossip_addr: SocketAddr, replicate_addr: SocketAddr, requests_addr: SocketAddr, - events_addr: SocketAddr, + transactions_addr: SocketAddr, ) -> ReplicatedData { ReplicatedData { id, @@ -68,7 +68,7 @@ impl ReplicatedData { gossip_addr, replicate_addr, requests_addr, - events_addr, + transactions_addr, current_leader_id: PublicKey::default(), last_verified_hash: Hash::default(), last_verified_count: 0, @@ -531,14 +531,14 @@ mod tests { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - let events = UdpSocket::bind("0.0.0.0:0").unwrap(); + let transactions = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), serve.local_addr().unwrap(), - events.local_addr().unwrap(), + transactions.local_addr().unwrap(), ); let crdt = Crdt::new(d); trace!( diff --git a/src/entry.rs b/src/entry.rs index 997e4eb1cd..06f95b2c55 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -8,7 +8,7 @@ use transaction::Transaction; /// Each Entry contains three pieces of data. The `num_hashes` field is the number /// of hashes performed since the previous entry. The `id` field is the result -/// of hashing `id` from the previous entry `num_hashes` times. The `events` +/// of hashing `id` from the previous entry `num_hashes` times. The `transactions` /// field points to Events that took place shortly after `id` was generated. /// /// If you divide `num_hashes` by the amount of time it takes to generate a new hash, you @@ -21,65 +21,69 @@ use transaction::Transaction; pub struct Entry { pub num_hashes: u64, pub id: Hash, - pub events: Vec, + pub transactions: Vec, } impl Entry { /// Creates the next Entry `num_hashes` after `start_hash`. - pub fn new(start_hash: &Hash, cur_hashes: u64, events: Vec) -> Self { - let num_hashes = cur_hashes + if events.is_empty() { 0 } else { 1 }; - let id = next_hash(start_hash, 0, &events); + pub fn new(start_hash: &Hash, cur_hashes: u64, transactions: Vec) -> Self { + let num_hashes = cur_hashes + if transactions.is_empty() { 0 } else { 1 }; + let id = next_hash(start_hash, 0, &transactions); Entry { num_hashes, id, - events, + transactions, } } /// Creates the next Tick Entry `num_hashes` after `start_hash`. - pub fn new_mut(start_hash: &mut Hash, cur_hashes: &mut u64, events: Vec) -> Self { - let entry = Self::new(start_hash, *cur_hashes, events); + pub fn new_mut( + start_hash: &mut Hash, + cur_hashes: &mut u64, + transactions: Vec, + ) -> Self { + let entry = Self::new(start_hash, *cur_hashes, transactions); *start_hash = entry.id; *cur_hashes = 0; entry } - /// Creates a Entry from the number of hashes `num_hashes` since the previous event + /// Creates a Entry from the number of hashes `num_hashes` since the previous transaction /// and that resulting `id`. pub fn new_tick(num_hashes: u64, id: &Hash) -> Self { Entry { num_hashes, id: *id, - events: vec![], + transactions: vec![], } } /// Verifies self.id is the result of hashing a `start_hash` `self.num_hashes` times. - /// If the event is not a Tick, then hash that as well. + /// If the transaction is not a Tick, then hash that as well. pub fn verify(&self, start_hash: &Hash) -> bool { - self.events.par_iter().all(|event| event.verify_plan()) - && self.id == next_hash(start_hash, self.num_hashes, &self.events) + self.transactions.par_iter().all(|tx| tx.verify_plan()) + && self.id == next_hash(start_hash, self.num_hashes, &self.transactions) } } -fn add_event_data(hash_data: &mut Vec, tr: &Transaction) { +fn add_transaction_data(hash_data: &mut Vec, tr: &Transaction) { hash_data.push(0u8); hash_data.extend_from_slice(&tr.sig); } -/// Creates the hash `num_hashes` after `start_hash`. If the event contains +/// Creates the hash `num_hashes` after `start_hash`. If the transaction contains /// a signature, the final hash will be a hash of both the previous ID and /// the signature. -pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Transaction]) -> Hash { +pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -> Hash { let mut id = *start_hash; for _ in 1..num_hashes { id = hash(&id); } - // Hash all the event data + // Hash all the transaction data let mut hash_data = vec![]; - for event in events { - add_event_data(&mut hash_data, event); + for tx in transactions { + add_transaction_data(&mut hash_data, tx); } if !hash_data.is_empty() { @@ -92,11 +96,11 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Transaction]) -> } /// Creates the next Tick or Event Entry `num_hashes` after `start_hash`. -pub fn next_entry(start_hash: &Hash, num_hashes: u64, events: Vec) -> Entry { +pub fn next_entry(start_hash: &Hash, num_hashes: u64, transactions: Vec) -> Entry { Entry { num_hashes, - id: next_hash(start_hash, num_hashes, &events), - events: events, + id: next_hash(start_hash, num_hashes, &transactions), + transactions, } } @@ -120,7 +124,7 @@ mod tests { } #[test] - fn test_event_reorder_attack() { + fn test_transaction_reorder_attack() { let zero = Hash::default(); // First, verify entries @@ -130,9 +134,9 @@ mod tests { let mut e0 = Entry::new(&zero, 0, vec![tr0.clone(), tr1.clone()]); assert!(e0.verify(&zero)); - // Next, swap two events and ensure verification fails. - e0.events[0] = tr1; // <-- attack - e0.events[1] = tr0; + // Next, swap two transactions and ensure verification fails. + e0.transactions[0] = tr1; // <-- attack + e0.transactions[1] = tr0; assert!(!e0.verify(&zero)); } @@ -147,9 +151,9 @@ mod tests { let mut e0 = Entry::new(&zero, 0, vec![tr0.clone(), tr1.clone()]); assert!(e0.verify(&zero)); - // Next, swap two witness events and ensure verification fails. - e0.events[0] = tr1; // <-- attack - e0.events[1] = tr0; + // Next, swap two witness transactions and ensure verification fails. + e0.transactions[0] = tr1; // <-- attack + e0.transactions[1] = tr0; assert!(!e0.verify(&zero)); } diff --git a/src/ledger.rs b/src/ledger.rs index 715a63befa..406f265b8d 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -14,7 +14,7 @@ use std::mem::size_of; use transaction::Transaction; pub trait Block { - /// Verifies the hashes and counts of a slice of events are all consistent. + /// Verifies the hashes and counts of a slice of transactions are all consistent. fn verify(&self, start_hash: &Hash) -> bool; } @@ -26,17 +26,17 @@ impl Block for [Entry] { } } -/// Create a vector of Entries of length `event_set.len()` from `start_hash` hash, `num_hashes`, and `event_set`. +/// Create a vector of Entries of length `transaction_batches.len()` from `start_hash` hash, `num_hashes`, and `transaction_batches`. pub fn next_entries( start_hash: &Hash, num_hashes: u64, - event_set: Vec>, + transaction_batches: Vec>, ) -> Vec { let mut id = *start_hash; let mut entries = vec![]; - for event_list in &event_set { - let events = event_list.clone(); - let entry = next_entry(&id, num_hashes, events); + for transactions in &transaction_batches { + let transactions = transactions.clone(); + let entry = next_entry(&id, num_hashes, transactions); id = entry.id; entries.push(entry); } @@ -54,33 +54,37 @@ pub fn process_entry_list_into_blobs( let mut entries: Vec> = Vec::new(); let mut total = 0; for i in &list[start..] { - total += size_of::() * i.events.len(); + total += size_of::() * i.transactions.len(); total += size_of::(); if total >= BLOB_DATA_SIZE { break; } end += 1; } - // See if we need to split the events + // See if we need to split the transactions if end <= start { - let mut event_start = 0; - let num_events_per_blob = BLOB_DATA_SIZE / size_of::(); - let total_entry_chunks = - (list[end].events.len() + num_events_per_blob - 1) / num_events_per_blob; + let mut transaction_start = 0; + let num_transactions_per_blob = BLOB_DATA_SIZE / size_of::(); + let total_entry_chunks = (list[end].transactions.len() + num_transactions_per_blob - 1) + / num_transactions_per_blob; trace!( - "splitting events end: {} total_chunks: {}", + "splitting transactions end: {} total_chunks: {}", end, total_entry_chunks ); for _ in 0..total_entry_chunks { - let event_end = min(event_start + num_events_per_blob, list[end].events.len()); + let transaction_end = min( + transaction_start + num_transactions_per_blob, + list[end].transactions.len(), + ); let mut entry = Entry { num_hashes: list[end].num_hashes, id: list[end].id, - events: list[end].events[event_start..event_end].to_vec(), + transactions: list[end].transactions[transaction_start..transaction_end] + .to_vec(), }; entries.push(vec![entry]); - event_start = event_end; + transaction_start = transaction_end; } end += 1; } else { @@ -112,7 +116,7 @@ pub fn reconstruct_entries_from_blobs(blobs: &VecDeque) -> Vec Vec { + pub fn create_transactions(&self) -> Vec { let keypair = self.keypair(); let tr = Transaction::new(&keypair, self.pubkey(), self.tokens, self.seed()); vec![tr] @@ -54,7 +54,7 @@ impl Mint { pub fn create_entries(&self) -> Vec { let e0 = Entry::new(&self.seed(), 0, vec![]); - let e1 = Entry::new(&e0.id, 0, self.create_events()); + let e1 = Entry::new(&e0.id, 0, self.create_transactions()); vec![e0, e1] } } @@ -73,15 +73,15 @@ mod tests { use transaction::Instruction; #[test] - fn test_create_events() { - let mut events = Mint::new(100).create_events().into_iter(); - let tr = events.next().unwrap(); + fn test_create_transactions() { + let mut transactions = Mint::new(100).create_transactions().into_iter(); + let tr = transactions.next().unwrap(); if let Instruction::NewContract(contract) = tr.instruction { if let Plan::Pay(payment) = contract.plan { assert_eq!(tr.from, payment.to); } } - assert_eq!(events.next(), None); + assert_eq!(transactions.next(), None); } #[test] diff --git a/src/plan.rs b/src/plan.rs index 12a7023606..efa78844c0 100644 --- a/src/plan.rs +++ b/src/plan.rs @@ -1,5 +1,5 @@ //! The `plan` module provides a domain-specific language for payment plans. Users create Plan objects that -//! are given to an interpreter. The interpreter listens for `Witness` events, +//! are given to an interpreter. The interpreter listens for `Witness` transactions, //! which it uses to reduce the payment plan. When the plan is reduced to a //! `Payment`, the payment is executed. diff --git a/src/record_stage.rs b/src/record_stage.rs index d2f21a46cf..b4cd3cec91 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -2,8 +2,8 @@ //! It records Event items on behalf of its users. It continuously generates //! new hashes, only stopping to check if it has been sent an Event item. It //! tags each Event with an Entry, and sends it back. The Entry includes the -//! Event, the latest hash, and the number of hashes since the last event. -//! The resulting stream of entries represents ordered events in time. +//! Event, the latest hash, and the number of hashes since the last transaction. +//! The resulting stream of entries represents ordered transactions in time. use entry::Entry; use hash::Hash; @@ -28,7 +28,7 @@ impl RecordStage { /// A background thread that will continue tagging received Event messages and /// sending back Entry messages until either the receiver or sender channel is closed. pub fn new( - event_receiver: Receiver, + transaction_receiver: Receiver, start_hash: &Hash, tick_duration: Option, ) -> Self { @@ -39,10 +39,10 @@ impl RecordStage { let mut recorder = Recorder::new(start_hash); let duration_data = tick_duration.map(|dur| (Instant::now(), dur)); loop { - if let Err(_) = Self::process_events( + if let Err(_) = Self::process_transactions( &mut recorder, duration_data, - &event_receiver, + &transaction_receiver, &entry_sender, ) { return; @@ -59,7 +59,7 @@ impl RecordStage { } } - pub fn process_events( + pub fn process_transactions( recorder: &mut Recorder, duration_data: Option<(Instant, Duration)>, receiver: &Receiver, @@ -77,8 +77,8 @@ impl RecordStage { let entry = recorder.record(vec![]); sender.send(entry).or(Err(()))?; } - Signal::Events(events) => { - let entry = recorder.record(events); + Signal::Events(transactions) => { + let entry = recorder.record(transactions); sender.send(entry).or(Err(()))?; } }, @@ -99,15 +99,15 @@ mod tests { #[test] fn test_historian() { - let (input, event_receiver) = channel(); + let (tx_sender, tx_receiver) = channel(); let zero = Hash::default(); - let record_stage = RecordStage::new(event_receiver, &zero, None); + let record_stage = RecordStage::new(tx_receiver, &zero, None); - input.send(Signal::Tick).unwrap(); + tx_sender.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); - input.send(Signal::Tick).unwrap(); + tx_sender.send(Signal::Tick).unwrap(); sleep(Duration::new(0, 1_000_000)); - input.send(Signal::Tick).unwrap(); + tx_sender.send(Signal::Tick).unwrap(); let entry0 = record_stage.entry_receiver.recv().unwrap(); let entry1 = record_stage.entry_receiver.recv().unwrap(); @@ -117,7 +117,7 @@ mod tests { assert_eq!(entry1.num_hashes, 0); assert_eq!(entry2.num_hashes, 0); - drop(input); + drop(tx_sender); assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); assert!([entry0, entry1, entry2].verify(&zero)); @@ -125,25 +125,25 @@ mod tests { #[test] fn test_historian_closed_sender() { - let (input, event_receiver) = channel(); + let (tx_sender, tx_receiver) = channel(); let zero = Hash::default(); - let record_stage = RecordStage::new(event_receiver, &zero, None); + let record_stage = RecordStage::new(tx_receiver, &zero, None); drop(record_stage.entry_receiver); - input.send(Signal::Tick).unwrap(); + tx_sender.send(Signal::Tick).unwrap(); assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); } #[test] - fn test_events() { - let (input, signal_receiver) = channel(); + fn test_transactions() { + let (tx_sender, signal_receiver) = channel(); let zero = Hash::default(); let record_stage = RecordStage::new(signal_receiver, &zero, None); let alice_keypair = KeyPair::new(); let bob_pubkey = KeyPair::new().pubkey(); - let event0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); - let event1 = Transaction::new(&alice_keypair, bob_pubkey, 2, zero); - input.send(Signal::Events(vec![event0, event1])).unwrap(); - drop(input); + let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); + let tx1 = Transaction::new(&alice_keypair, bob_pubkey, 2, zero); + tx_sender.send(Signal::Events(vec![tx0, tx1])).unwrap(); + drop(tx_sender); let entries: Vec<_> = record_stage.entry_receiver.iter().collect(); assert_eq!(entries.len(), 1); } @@ -151,12 +151,12 @@ mod tests { #[test] #[ignore] fn test_ticking_historian() { - let (input, event_receiver) = channel(); + let (tx_sender, tx_receiver) = channel(); let zero = Hash::default(); - let record_stage = RecordStage::new(event_receiver, &zero, Some(Duration::from_millis(20))); + let record_stage = RecordStage::new(tx_receiver, &zero, Some(Duration::from_millis(20))); sleep(Duration::from_millis(900)); - input.send(Signal::Tick).unwrap(); - drop(input); + tx_sender.send(Signal::Tick).unwrap(); + drop(tx_sender); let entries: Vec = record_stage.entry_receiver.iter().collect(); assert!(entries.len() > 1); diff --git a/src/recorder.rs b/src/recorder.rs index 26a246e176..a3bf0a143f 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -26,8 +26,8 @@ impl Recorder { self.num_hashes += 1; } - pub fn record(&mut self, events: Vec) -> Entry { - Entry::new_mut(&mut self.last_hash, &mut self.num_hashes, events) + pub fn record(&mut self, transactions: Vec) -> Entry { + Entry::new_mut(&mut self.last_hash, &mut self.num_hashes, transactions) } pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option { diff --git a/src/request_processor.rs b/src/request_processor.rs index 4272576978..165b3eebce 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -14,7 +14,6 @@ use std::sync::mpsc::Receiver; use std::time::Instant; use streamer; use timing; -use transaction::Transaction; pub struct RequestProcessor { bank: Arc, @@ -63,20 +62,7 @@ impl RequestProcessor { .collect() } - fn deserialize_requests(p: &packet::Packets) -> Vec> { - p.packets - .par_iter() - .map(|x| { - deserialize(&x.data[0..x.meta.size]) - .map(|req| (req, x.meta.addr())) - .ok() - }) - .collect() - } - - // Copy-paste of deserialize_requests() because I can't figure out how to - // route the lifetimes in a generic version. - pub fn deserialize_events(p: &packet::Packets) -> Vec> { + pub fn deserialize_requests(p: &packet::Packets) -> Vec> { p.packets .par_iter() .map(|x| { diff --git a/src/server.rs b/src/server.rs index 12828128eb..ab4f5be8bb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -24,7 +24,7 @@ impl Server { tick_duration: Option, me: ReplicatedData, requests_socket: UdpSocket, - events_socket: UdpSocket, + transactions_socket: UdpSocket, broadcast_socket: UdpSocket, respond_socket: UdpSocket, gossip_socket: UdpSocket, @@ -40,7 +40,7 @@ impl Server { start_hash, tick_duration, me, - events_socket, + transactions_socket, broadcast_socket, gossip_socket, exit.clone(), diff --git a/src/streamer.rs b/src/streamer.rs index 1e88f1dc23..9f03163e6b 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -672,14 +672,14 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let serve = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let event = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let transaction = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let rep_data = ReplicatedData::new( pubkey_me, read.local_addr().unwrap(), send.local_addr().unwrap(), serve.local_addr().unwrap(), - event.local_addr().unwrap(), + transaction.local_addr().unwrap(), ); let mut crdt_me = Crdt::new(rep_data); let me_id = crdt_me.my_data().id; @@ -736,14 +736,14 @@ mod test { let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let serve = UdpSocket::bind("127.0.0.1:0").unwrap(); - let event = UdpSocket::bind("127.0.0.1:0").unwrap(); + let transaction = UdpSocket::bind("127.0.0.1:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), serve.local_addr().unwrap(), - event.local_addr().unwrap(), + transaction.local_addr().unwrap(), ); trace!("data: {:?}", d); let crdt = Crdt::new(d); diff --git a/src/thin_client.rs b/src/thin_client.rs index c1318b6e90..d05d71a736 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -16,8 +16,8 @@ use transaction::Transaction; pub struct ThinClient { requests_addr: SocketAddr, requests_socket: UdpSocket, - events_addr: SocketAddr, - events_socket: UdpSocket, + transactions_addr: SocketAddr, + transactions_socket: UdpSocket, last_id: Option, transaction_count: u64, balances: HashMap>, @@ -25,19 +25,19 @@ pub struct ThinClient { impl ThinClient { /// Create a new ThinClient that will interface with Rpu - /// over `requests_socket` and `events_socket`. To receive responses, the caller must bind `socket` + /// over `requests_socket` and `transactions_socket`. To receive responses, the caller must bind `socket` /// to a public address before invoking ThinClient methods. pub fn new( requests_addr: SocketAddr, requests_socket: UdpSocket, - events_addr: SocketAddr, - events_socket: UdpSocket, + transactions_addr: SocketAddr, + transactions_socket: UdpSocket, ) -> Self { let client = ThinClient { requests_addr, requests_socket, - events_addr, - events_socket, + transactions_addr, + transactions_socket, last_id: None, transaction_count: 0, balances: HashMap::new(), @@ -75,7 +75,8 @@ impl ThinClient { /// does not wait for a response. pub fn transfer_signed(&self, tr: Transaction) -> io::Result { let data = serialize(&tr).expect("serialize Transaction in pub fn transfer_signed"); - self.events_socket.send_to(&data, &self.events_addr) + self.transactions_socket + .send_to(&data, &self.transactions_addr) } /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. @@ -209,7 +210,7 @@ mod tests { Some(Duration::from_millis(30)), leader.data.clone(), leader.sockets.requests, - leader.sockets.event, + leader.sockets.transaction, leader.sockets.broadcast, leader.sockets.respond, leader.sockets.gossip, @@ -219,13 +220,13 @@ mod tests { sleep(Duration::from_millis(900)); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( leader.data.requests_addr, requests_socket, - leader.data.events_addr, - events_socket, + leader.data.transactions_addr, + transactions_socket, ); let last_id = client.get_last_id().wait().unwrap(); let _sig = client @@ -254,7 +255,7 @@ mod tests { Some(Duration::from_millis(30)), leader.data.clone(), leader.sockets.requests, - leader.sockets.event, + leader.sockets.transaction, leader.sockets.broadcast, leader.sockets.respond, leader.sockets.gossip, @@ -267,12 +268,12 @@ mod tests { requests_socket .set_read_timeout(Some(Duration::new(5, 0))) .unwrap(); - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( leader.data.requests_addr, requests_socket, - leader.data.events_addr, - events_socket, + leader.data.transactions_addr, + transactions_socket, ); let last_id = client.get_last_id().wait().unwrap(); @@ -383,7 +384,7 @@ mod tests { None, leader.data.clone(), leader.sockets.requests, - leader.sockets.event, + leader.sockets.transaction, leader.sockets.broadcast, leader.sockets.respond, leader.sockets.gossip, @@ -424,13 +425,13 @@ mod tests { requests_socket .set_read_timeout(Some(Duration::new(1, 0))) .unwrap(); - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); ThinClient::new( leader.requests_addr, requests_socket, - leader.events_addr, - events_socket, + leader.transactions_addr, + transactions_socket, ) } diff --git a/src/tpu.rs b/src/tpu.rs index 784fa7d5c2..a0c473fc27 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -28,7 +28,7 @@ impl Tpu { start_hash: Hash, tick_duration: Option, me: ReplicatedData, - events_socket: UdpSocket, + transactions_socket: UdpSocket, broadcast_socket: UdpSocket, gossip: UdpSocket, exit: Arc, @@ -37,7 +37,7 @@ impl Tpu { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( - events_socket, + transactions_socket, exit.clone(), packet_recycler.clone(), packet_sender, diff --git a/src/tvu.rs b/src/tvu.rs index 5b2be0f672..11211ef0c1 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -132,7 +132,7 @@ use std::time::Duration; pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { use signature::{KeyPair, KeyPairUtil}; - let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let transactions_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -145,9 +145,9 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), requests_socket.local_addr().unwrap(), - events_socket.local_addr().unwrap(), + transactions_socket.local_addr().unwrap(), ); - (d, gossip, replicate, requests_socket, events_socket) + (d, gossip, replicate, requests_socket, transactions_socket) } #[cfg(test)] @@ -307,7 +307,7 @@ pub mod tests { pub gossip: UdpSocket, pub requests: UdpSocket, pub replicate: UdpSocket, - pub event: UdpSocket, + pub transaction: UdpSocket, pub respond: UdpSocket, pub broadcast: UdpSocket, } @@ -319,7 +319,7 @@ pub mod tests { pub fn new() -> TestNode { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); - let event = UdpSocket::bind("0.0.0.0:0").unwrap(); + let transaction = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -329,7 +329,7 @@ pub mod tests { gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), requests.local_addr().unwrap(), - event.local_addr().unwrap(), + transaction.local_addr().unwrap(), ); TestNode { data: data, @@ -337,7 +337,7 @@ pub mod tests { gossip, requests, replicate, - event, + transaction, respond, broadcast, },