events -> transactions

This commit is contained in:
Greg Fitzgerald
2018-05-25 15:51:41 -06:00
parent 4cdf873f98
commit 9f5a3d6064
19 changed files with 203 additions and 207 deletions

View File

@ -261,7 +261,7 @@ impl Bank {
pub fn process_verified_entries(&self, entries: Vec<Entry>) -> Result<()> { pub fn process_verified_entries(&self, entries: Vec<Entry>) -> Result<()> {
for entry in entries { for entry in entries {
self.register_entry_id(&entry.id); self.register_entry_id(&entry.id);
for result in self.process_verified_transactions(entry.events) { for result in self.process_verified_transactions(entry.transactions) {
result?; result?;
} }
} }
@ -468,7 +468,7 @@ mod tests {
bank.process_verified_timestamp(mint.pubkey(), dt).unwrap(); bank.process_verified_timestamp(mint.pubkey(), dt).unwrap();
assert_eq!(bank.get_balance(&pubkey), Some(1)); assert_eq!(bank.get_balance(&pubkey), Some(1));
// tx count is still 1, because we chose not to count timestamp events // tx count is still 1, because we chose not to count timestamp transactions
// tx count. // tx count.
assert_eq!(bank.transaction_count(), 1); assert_eq!(bank.transaction_count(), 1);
@ -524,7 +524,7 @@ mod tests {
} }
#[test] #[test]
fn test_duplicate_event_signature() { fn test_duplicate_transaction_signature() {
let mint = Mint::new(1); let mint = Mint::new(1);
let bank = Bank::new(&mint); let bank = Bank::new(&mint);
let sig = Signature::default(); let sig = Signature::default();
@ -581,7 +581,7 @@ mod bench {
use signature::KeyPairUtil; use signature::KeyPairUtil;
#[bench] #[bench]
fn process_verified_event_bench(bencher: &mut Bencher) { fn process_verified_transaction_bench(bencher: &mut Bencher) {
let mint = Mint::new(100_000_000); let mint = Mint::new(100_000_000);
let bank = Bank::new(&mint); let bank = Bank::new(&mint);
// Create transactions between unrelated parties. // Create transactions between unrelated parties.

View File

@ -49,7 +49,7 @@ impl BankingStage {
} }
} }
fn deserialize_events(p: &packet::Packets) -> Vec<Option<(Transaction, SocketAddr)>> { fn deserialize_transactions(p: &packet::Packets) -> Vec<Option<(Transaction, SocketAddr)>> {
p.packets p.packets
.par_iter() .par_iter()
.map(|x| { .map(|x| {
@ -79,33 +79,33 @@ impl BankingStage {
); );
let proc_start = Instant::now(); let proc_start = Instant::now();
for (msgs, vers) in mms { for (msgs, vers) in mms {
let events = Self::deserialize_events(&msgs.read().unwrap()); let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
reqs_len += events.len(); reqs_len += transactions.len();
let events = events let transactions = transactions
.into_iter() .into_iter()
.zip(vers) .zip(vers)
.filter_map(|(event, ver)| match event { .filter_map(|(tx, ver)| match tx {
None => None, None => None,
Some((event, _addr)) => if event.verify_plan() && ver != 0 { Some((tx, _addr)) => if tx.verify_plan() && ver != 0 {
Some(event) Some(tx)
} else { } else {
None None
}, },
}) })
.collect(); .collect();
debug!("process_events"); debug!("process_transactions");
let results = bank.process_verified_transactions(events); let results = bank.process_verified_transactions(transactions);
let events = results.into_iter().filter_map(|x| x.ok()).collect(); let transactions = results.into_iter().filter_map(|x| x.ok()).collect();
signal_sender.send(Signal::Events(events))?; signal_sender.send(Signal::Events(transactions))?;
debug!("done process_events"); debug!("done process_transactions");
packet_recycler.recycle(msgs); packet_recycler.recycle(msgs);
} }
let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!( info!(
"@{:?} done processing event batches: {} time: {:?}ms reqs: {} reqs/s: {}", "@{:?} done processing transaction batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(), timing::timestamp(),
mms_len, mms_len,
total_time_ms, total_time_ms,
@ -132,13 +132,12 @@ impl BankingStage {
//#[cfg(test)] //#[cfg(test)]
//mod tests { //mod tests {
// use bank::Bank; // use bank::Bank;
// use event_processor::EventProcessor;
// use mint::Mint; // use mint::Mint;
// use signature::{KeyPair, KeyPairUtil}; // use signature::{KeyPair, KeyPairUtil};
// use transaction::Transaction; // use transaction::Transaction;
// //
// #[test] // #[test]
// // TODO: Move this test banking_stage. Calling process_events() directly // // TODO: Move this test banking_stage. Calling process_transactions() directly
// // defeats the purpose of this test. // // defeats the purpose of this test.
// fn test_banking_sequential_consistency() { // fn test_banking_sequential_consistency() {
// // In this attack we'll demonstrate that a verifier can interpret the ledger // // In this attack we'll demonstrate that a verifier can interpret the ledger
@ -146,18 +145,18 @@ impl BankingStage {
// // Entry OR if the verifier tries to parallelize across multiple Entries. // // Entry OR if the verifier tries to parallelize across multiple Entries.
// let mint = Mint::new(2); // let mint = Mint::new(2);
// let bank = Bank::new(&mint); // let bank = Bank::new(&mint);
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); // let banking_stage = EventProcessor::new(bank, &mint.last_id(), None);
// //
// // Process a batch that includes a transaction that receives two tokens. // // Process a batch that includes a transaction that receives two tokens.
// let alice = KeyPair::new(); // let alice = KeyPair::new();
// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); // let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
// let events = vec![tr]; // let transactions = vec![tr];
// let entry0 = event_processor.process_events(events).unwrap(); // let entry0 = banking_stage.process_transactions(transactions).unwrap();
// //
// // Process a second batch that spends one of those tokens. // // Process a second batch that spends one of those tokens.
// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); // let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
// let events = vec![tr]; // let transactions = vec![tr];
// let entry1 = event_processor.process_events(events).unwrap(); // let entry1 = banking_stage.process_transactions(transactions).unwrap();
// //
// // Collect the ledger and feed it to a new bank. // // Collect the ledger and feed it to a new bank.
// let entries = vec![entry0, entry1]; // let entries = vec![entry0, entry1];
@ -169,7 +168,7 @@ impl BankingStage {
// for entry in entries { // for entry in entries {
// assert!( // assert!(
// bank // bank
// .process_verified_transactions(entry.events) // .process_verified_transactions(entry.transactions)
// .into_iter() // .into_iter()
// .all(|x| x.is_ok()) // .all(|x| x.is_ok())
// ); // );
@ -184,7 +183,6 @@ impl BankingStage {
// use self::test::Bencher; // use self::test::Bencher;
// use bank::{Bank, MAX_ENTRY_IDS}; // use bank::{Bank, MAX_ENTRY_IDS};
// use bincode::serialize; // use bincode::serialize;
// use event_processor::*;
// use hash::hash; // use hash::hash;
// use mint::Mint; // use mint::Mint;
// use rayon::prelude::*; // use rayon::prelude::*;
@ -194,7 +192,7 @@ impl BankingStage {
// use transaction::Transaction; // use transaction::Transaction;
// //
// #[bench] // #[bench]
// fn process_events_bench(_bencher: &mut Bencher) { // fn process_transactions_bench(_bencher: &mut Bencher) {
// let mint = Mint::new(100_000_000); // let mint = Mint::new(100_000_000);
// let bank = Bank::new(&mint); // let bank = Bank::new(&mint);
// // Create transactions between unrelated parties. // // Create transactions between unrelated parties.
@ -228,17 +226,17 @@ impl BankingStage {
// }) // })
// .collect(); // .collect();
// //
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); // let banking_stage = EventProcessor::new(bank, &mint.last_id(), None);
// //
// let now = Instant::now(); // let now = Instant::now();
// assert!(event_processor.process_events(transactions).is_ok()); // assert!(banking_stage.process_transactions(transactions).is_ok());
// let duration = now.elapsed(); // let duration = now.elapsed();
// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; // let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
// let tps = txs as f64 / sec; // let tps = txs as f64 / sec;
// //
// // Ensure that all transactions were successfully logged. // // Ensure that all transactions were successfully logged.
// drop(event_processor.historian_input); // drop(banking_stage.historian_input);
// let entries: Vec<Entry> = event_processor.output.lock().unwrap().iter().collect(); // let entries: Vec<Entry> = banking_stage.output.lock().unwrap().iter().collect();
// assert_eq!(entries.len(), 1); // assert_eq!(entries.len(), 1);
// assert_eq!(entries[0].transactions.len(), txs as usize); // assert_eq!(entries[0].transactions.len(), txs as usize);
// //
@ -267,14 +265,14 @@ mod bench {
let mint = Mint::new(1_000_000_000); let mint = Mint::new(1_000_000_000);
let pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
let events: Vec<_> = (0..tx) let transactions: Vec<_> = (0..tx)
.map(|i| Transaction::new(&mint.keypair(), pubkey, i as i64, mint.last_id())) .map(|i| Transaction::new(&mint.keypair(), pubkey, i as i64, mint.last_id()))
.collect(); .collect();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (signal_sender, signal_receiver) = channel(); let (signal_sender, signal_receiver) = channel();
let packet_recycler = PacketRecycler::default(); let packet_recycler = PacketRecycler::default();
let verified: Vec<_> = to_packets(&packet_recycler, events) let verified: Vec<_> = to_packets(&packet_recycler, transactions)
.into_iter() .into_iter()
.map(|x| { .map(|x| {
let len = (*x).read().unwrap().packets.len(); let len = (*x).read().unwrap().packets.len();
@ -292,8 +290,8 @@ mod bench {
&packet_recycler, &packet_recycler,
).unwrap(); ).unwrap();
let signal = signal_receiver.recv().unwrap(); let signal = signal_receiver.recv().unwrap();
if let Signal::Events(ref events) = signal { if let Signal::Events(ref transactions) = signal {
assert_eq!(events.len(), tx); assert_eq!(transactions.len(), tx);
} else { } else {
assert!(false); assert!(false);
} }

View File

@ -64,9 +64,9 @@ fn main() {
threads = matches.opt_str("t").unwrap().parse().expect("integer"); threads = matches.opt_str("t").unwrap().parse().expect("integer");
} }
let mut events_addr: SocketAddr = requests_addr.parse().unwrap(); let mut transactions_addr: SocketAddr = requests_addr.parse().unwrap();
let requests_port = events_addr.port(); let requests_port = transactions_addr.port();
events_addr.set_port(requests_port + 1); transactions_addr.set_port(requests_port + 1);
if stdin_isatty() { if stdin_isatty() {
eprintln!("nothing found on stdin, expected a json file"); eprintln!("nothing found on stdin, expected a json file");
@ -91,16 +91,16 @@ fn main() {
requests_socket requests_socket
.set_read_timeout(Some(Duration::new(5, 0))) .set_read_timeout(Some(Duration::new(5, 0)))
.unwrap(); .unwrap();
let events_socket = UdpSocket::bind(&events_addr).unwrap(); let transactions_socket = UdpSocket::bind(&transactions_addr).unwrap();
let requests_addr: SocketAddr = server_addr.parse().unwrap(); let requests_addr: SocketAddr = server_addr.parse().unwrap();
let requests_port = requests_addr.port(); let requests_port = requests_addr.port();
let mut events_server_addr = requests_addr.clone(); let mut transactions_addr = requests_addr.clone();
events_server_addr.set_port(requests_port + 3); transactions_addr.set_port(requests_port + 3);
let mut client = ThinClient::new( let mut client = ThinClient::new(
requests_addr, requests_addr,
requests_socket, requests_socket,
events_server_addr, transactions_addr,
events_socket, transactions_socket,
); );
println!("Get last ID..."); println!("Get last ID...");
@ -146,14 +146,14 @@ fn main() {
requests_socket requests_socket
.set_read_timeout(Some(Duration::new(5, 0))) .set_read_timeout(Some(Duration::new(5, 0)))
.unwrap(); .unwrap();
let mut events_addr: SocketAddr = requests_addr.clone(); let mut transactions_addr: SocketAddr = requests_addr.clone();
events_addr.set_port(0); transactions_addr.set_port(0);
let events_socket = UdpSocket::bind(&events_addr).unwrap(); let transactions_socket = UdpSocket::bind(&transactions_addr).unwrap();
let client = ThinClient::new( let client = ThinClient::new(
requests_addr, requests_addr,
requests_socket, requests_socket,
events_server_addr, transactions_addr,
events_socket, transactions_socket,
); );
for tr in trs { for tr in trs {
client.transfer_signed(tr.clone()).unwrap(); client.transfer_signed(tr.clone()).unwrap();

View File

@ -42,7 +42,7 @@ fn main() {
let last_id = demo.mint.last_id(); let last_id = demo.mint.last_id();
eprintln!("Signing {} transactions...", num_accounts); eprintln!("Signing {} transactions...", num_accounts);
let events: Vec<_> = keypairs let transactions: Vec<_> = keypairs
.into_par_iter() .into_par_iter()
.map(|rando| { .map(|rando| {
let last_id = demo.mint.last_id(); let last_id = demo.mint.last_id();
@ -55,7 +55,7 @@ fn main() {
} }
eprintln!("Logging the creation of {} accounts...", num_accounts); eprintln!("Logging the creation of {} accounts...", num_accounts);
let entry = Entry::new(&last_id, 0, events); let entry = Entry::new(&last_id, 0, transactions);
println!("{}", serde_json::to_string(&entry).unwrap()); println!("{}", serde_json::to_string(&entry).unwrap());
eprintln!("Creating {} empty entries...", MAX_ENTRY_IDS); eprintln!("Creating {} empty entries...", MAX_ENTRY_IDS);

View File

@ -164,14 +164,17 @@ fn main() {
now = Instant::now(); now = Instant::now();
let sample = tx_count - initial_tx_count; let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count; initial_tx_count = tx_count;
println!("{}: Transactions processed {}", val.events_addr, sample); println!(
"{}: Transactions processed {}",
val.transactions_addr, sample
);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (sample * 1_000_000_000) as f64 / ns as f64; let tps = (sample * 1_000_000_000) as f64 / ns as f64;
println!("{}: {} tps", val.events_addr, tps); println!("{}: {} tps", val.transactions_addr, tps);
let total = tx_count - first_count; let total = tx_count - first_count;
println!( println!(
"{}: Total Transactions processed {}", "{}: Total Transactions processed {}",
val.events_addr, total val.transactions_addr, total
); );
if total == transactions.len() as u64 { if total == transactions.len() as u64 {
break; break;
@ -191,15 +194,15 @@ fn main() {
fn mk_client(locked_addr: &Arc<RwLock<SocketAddr>>, r: &ReplicatedData) -> ThinClient { fn mk_client(locked_addr: &Arc<RwLock<SocketAddr>>, r: &ReplicatedData) -> ThinClient {
let mut addr = locked_addr.write().unwrap(); let mut addr = locked_addr.write().unwrap();
let port = addr.port(); let port = addr.port();
let events_socket = UdpSocket::bind(addr.clone()).unwrap(); let transactions_socket = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 1); addr.set_port(port + 1);
let requests_socket = UdpSocket::bind(addr.clone()).unwrap(); let requests_socket = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 2); addr.set_port(port + 2);
ThinClient::new( ThinClient::new(
r.requests_addr, r.requests_addr,
requests_socket, requests_socket,
r.events_addr, r.transactions_addr,
events_socket, transactions_socket,
) )
} }

View File

@ -96,7 +96,7 @@ fn main() {
// fields are the same. That entry should be treated as a deposit, not a // fields are the same. That entry should be treated as a deposit, not a
// transfer to oneself. // transfer to oneself.
let entry1: Entry = entries.next().unwrap(); let entry1: Entry = entries.next().unwrap();
let tr = &entry1.events[0]; let tr = &entry1.transactions[0];
let deposit = if let Instruction::NewContract(contract) = &tr.instruction { let deposit = if let Instruction::NewContract(contract) = &tr.instruction {
contract.plan.final_payment() contract.plan.final_payment()
} else { } else {
@ -114,10 +114,10 @@ fn main() {
let mut last_id = entry1.id; let mut last_id = entry1.id;
for entry in entries { for entry in entries {
last_id = entry.id; last_id = entry.id;
let results = bank.process_verified_transactions(entry.events); let results = bank.process_verified_transactions(entry.transactions);
for result in results { for result in results {
if let Err(e) = result { if let Err(e) = result {
eprintln!("failed to process event {:?}", e); eprintln!("failed to process transaction {:?}", e);
exit(1); exit(1);
} }
} }
@ -155,7 +155,7 @@ fn main() {
Some(Duration::from_millis(1000)), Some(Duration::from_millis(1000)),
repl_data.clone(), repl_data.clone(),
UdpSocket::bind(repl_data.requests_addr).unwrap(), UdpSocket::bind(repl_data.requests_addr).unwrap(),
UdpSocket::bind(repl_data.events_addr).unwrap(), UdpSocket::bind(repl_data.transactions_addr).unwrap(),
UdpSocket::bind("0.0.0.0:0").unwrap(), UdpSocket::bind("0.0.0.0:0").unwrap(),
UdpSocket::bind("0.0.0.0:0").unwrap(), UdpSocket::bind("0.0.0.0:0").unwrap(),
UdpSocket::bind(repl_data.gossip_addr).unwrap(), UdpSocket::bind(repl_data.gossip_addr).unwrap(),
@ -183,7 +183,7 @@ fn next_port(server_addr: &SocketAddr, nxt: u16) -> SocketAddr {
} }
fn make_repl_data(bind_addr: &SocketAddr) -> ReplicatedData { fn make_repl_data(bind_addr: &SocketAddr) -> ReplicatedData {
let events_addr = bind_addr.clone(); let transactions_addr = bind_addr.clone();
let gossip_addr = next_port(&bind_addr, 1); let gossip_addr = next_port(&bind_addr, 1);
let replicate_addr = next_port(&bind_addr, 2); let replicate_addr = next_port(&bind_addr, 2);
let requests_addr = next_port(&bind_addr, 3); let requests_addr = next_port(&bind_addr, 3);
@ -193,7 +193,7 @@ fn make_repl_data(bind_addr: &SocketAddr) -> ReplicatedData {
gossip_addr, gossip_addr,
replicate_addr, replicate_addr,
requests_addr, requests_addr,
events_addr, transactions_addr,
) )
} }

View File

@ -43,8 +43,8 @@ pub struct ReplicatedData {
pub replicate_addr: SocketAddr, pub replicate_addr: SocketAddr,
/// address to connect to when this node is leader /// address to connect to when this node is leader
pub requests_addr: SocketAddr, pub requests_addr: SocketAddr,
/// events address /// transactions address
pub events_addr: SocketAddr, pub transactions_addr: SocketAddr,
/// current leader identity /// current leader identity
pub current_leader_id: PublicKey, pub current_leader_id: PublicKey,
/// last verified hash that was submitted to the leader /// last verified hash that was submitted to the leader
@ -59,7 +59,7 @@ impl ReplicatedData {
gossip_addr: SocketAddr, gossip_addr: SocketAddr,
replicate_addr: SocketAddr, replicate_addr: SocketAddr,
requests_addr: SocketAddr, requests_addr: SocketAddr,
events_addr: SocketAddr, transactions_addr: SocketAddr,
) -> ReplicatedData { ) -> ReplicatedData {
ReplicatedData { ReplicatedData {
id, id,
@ -68,7 +68,7 @@ impl ReplicatedData {
gossip_addr, gossip_addr,
replicate_addr, replicate_addr,
requests_addr, requests_addr,
events_addr, transactions_addr,
current_leader_id: PublicKey::default(), current_leader_id: PublicKey::default(),
last_verified_hash: Hash::default(), last_verified_hash: Hash::default(),
last_verified_count: 0, last_verified_count: 0,
@ -531,14 +531,14 @@ mod tests {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
let events = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new( let d = ReplicatedData::new(
pubkey, pubkey,
gossip.local_addr().unwrap(), gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(), replicate.local_addr().unwrap(),
serve.local_addr().unwrap(), serve.local_addr().unwrap(),
events.local_addr().unwrap(), transactions.local_addr().unwrap(),
); );
let crdt = Crdt::new(d); let crdt = Crdt::new(d);
trace!( trace!(

View File

@ -8,7 +8,7 @@ use transaction::Transaction;
/// Each Entry contains three pieces of data. The `num_hashes` field is the number /// Each Entry contains three pieces of data. The `num_hashes` field is the number
/// of hashes performed since the previous entry. The `id` field is the result /// of hashes performed since the previous entry. The `id` field is the result
/// of hashing `id` from the previous entry `num_hashes` times. The `events` /// of hashing `id` from the previous entry `num_hashes` times. The `transactions`
/// field points to Events that took place shortly after `id` was generated. /// field points to Events that took place shortly after `id` was generated.
/// ///
/// If you divide `num_hashes` by the amount of time it takes to generate a new hash, you /// If you divide `num_hashes` by the amount of time it takes to generate a new hash, you
@ -21,65 +21,69 @@ use transaction::Transaction;
pub struct Entry { pub struct Entry {
pub num_hashes: u64, pub num_hashes: u64,
pub id: Hash, pub id: Hash,
pub events: Vec<Transaction>, pub transactions: Vec<Transaction>,
} }
impl Entry { impl Entry {
/// Creates the next Entry `num_hashes` after `start_hash`. /// Creates the next Entry `num_hashes` after `start_hash`.
pub fn new(start_hash: &Hash, cur_hashes: u64, events: Vec<Transaction>) -> Self { pub fn new(start_hash: &Hash, cur_hashes: u64, transactions: Vec<Transaction>) -> Self {
let num_hashes = cur_hashes + if events.is_empty() { 0 } else { 1 }; let num_hashes = cur_hashes + if transactions.is_empty() { 0 } else { 1 };
let id = next_hash(start_hash, 0, &events); let id = next_hash(start_hash, 0, &transactions);
Entry { Entry {
num_hashes, num_hashes,
id, id,
events, transactions,
} }
} }
/// Creates the next Tick Entry `num_hashes` after `start_hash`. /// Creates the next Tick Entry `num_hashes` after `start_hash`.
pub fn new_mut(start_hash: &mut Hash, cur_hashes: &mut u64, events: Vec<Transaction>) -> Self { pub fn new_mut(
let entry = Self::new(start_hash, *cur_hashes, events); start_hash: &mut Hash,
cur_hashes: &mut u64,
transactions: Vec<Transaction>,
) -> Self {
let entry = Self::new(start_hash, *cur_hashes, transactions);
*start_hash = entry.id; *start_hash = entry.id;
*cur_hashes = 0; *cur_hashes = 0;
entry entry
} }
/// Creates a Entry from the number of hashes `num_hashes` since the previous event /// Creates a Entry from the number of hashes `num_hashes` since the previous transaction
/// and that resulting `id`. /// and that resulting `id`.
pub fn new_tick(num_hashes: u64, id: &Hash) -> Self { pub fn new_tick(num_hashes: u64, id: &Hash) -> Self {
Entry { Entry {
num_hashes, num_hashes,
id: *id, id: *id,
events: vec![], transactions: vec![],
} }
} }
/// Verifies self.id is the result of hashing a `start_hash` `self.num_hashes` times. /// Verifies self.id is the result of hashing a `start_hash` `self.num_hashes` times.
/// If the event is not a Tick, then hash that as well. /// If the transaction is not a Tick, then hash that as well.
pub fn verify(&self, start_hash: &Hash) -> bool { pub fn verify(&self, start_hash: &Hash) -> bool {
self.events.par_iter().all(|event| event.verify_plan()) self.transactions.par_iter().all(|tx| tx.verify_plan())
&& self.id == next_hash(start_hash, self.num_hashes, &self.events) && self.id == next_hash(start_hash, self.num_hashes, &self.transactions)
} }
} }
fn add_event_data(hash_data: &mut Vec<u8>, tr: &Transaction) { fn add_transaction_data(hash_data: &mut Vec<u8>, tr: &Transaction) {
hash_data.push(0u8); hash_data.push(0u8);
hash_data.extend_from_slice(&tr.sig); hash_data.extend_from_slice(&tr.sig);
} }
/// Creates the hash `num_hashes` after `start_hash`. If the event contains /// Creates the hash `num_hashes` after `start_hash`. If the transaction contains
/// a signature, the final hash will be a hash of both the previous ID and /// a signature, the final hash will be a hash of both the previous ID and
/// the signature. /// the signature.
pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Transaction]) -> Hash { pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -> Hash {
let mut id = *start_hash; let mut id = *start_hash;
for _ in 1..num_hashes { for _ in 1..num_hashes {
id = hash(&id); id = hash(&id);
} }
// Hash all the event data // Hash all the transaction data
let mut hash_data = vec![]; let mut hash_data = vec![];
for event in events { for tx in transactions {
add_event_data(&mut hash_data, event); add_transaction_data(&mut hash_data, tx);
} }
if !hash_data.is_empty() { if !hash_data.is_empty() {
@ -92,11 +96,11 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Transaction]) ->
} }
/// Creates the next Tick or Event Entry `num_hashes` after `start_hash`. /// Creates the next Tick or Event Entry `num_hashes` after `start_hash`.
pub fn next_entry(start_hash: &Hash, num_hashes: u64, events: Vec<Transaction>) -> Entry { pub fn next_entry(start_hash: &Hash, num_hashes: u64, transactions: Vec<Transaction>) -> Entry {
Entry { Entry {
num_hashes, num_hashes,
id: next_hash(start_hash, num_hashes, &events), id: next_hash(start_hash, num_hashes, &transactions),
events: events, transactions,
} }
} }
@ -120,7 +124,7 @@ mod tests {
} }
#[test] #[test]
fn test_event_reorder_attack() { fn test_transaction_reorder_attack() {
let zero = Hash::default(); let zero = Hash::default();
// First, verify entries // First, verify entries
@ -130,9 +134,9 @@ mod tests {
let mut e0 = Entry::new(&zero, 0, vec![tr0.clone(), tr1.clone()]); let mut e0 = Entry::new(&zero, 0, vec![tr0.clone(), tr1.clone()]);
assert!(e0.verify(&zero)); assert!(e0.verify(&zero));
// Next, swap two events and ensure verification fails. // Next, swap two transactions and ensure verification fails.
e0.events[0] = tr1; // <-- attack e0.transactions[0] = tr1; // <-- attack
e0.events[1] = tr0; e0.transactions[1] = tr0;
assert!(!e0.verify(&zero)); assert!(!e0.verify(&zero));
} }
@ -147,9 +151,9 @@ mod tests {
let mut e0 = Entry::new(&zero, 0, vec![tr0.clone(), tr1.clone()]); let mut e0 = Entry::new(&zero, 0, vec![tr0.clone(), tr1.clone()]);
assert!(e0.verify(&zero)); assert!(e0.verify(&zero));
// Next, swap two witness events and ensure verification fails. // Next, swap two witness transactions and ensure verification fails.
e0.events[0] = tr1; // <-- attack e0.transactions[0] = tr1; // <-- attack
e0.events[1] = tr0; e0.transactions[1] = tr0;
assert!(!e0.verify(&zero)); assert!(!e0.verify(&zero));
} }

View File

@ -14,7 +14,7 @@ use std::mem::size_of;
use transaction::Transaction; use transaction::Transaction;
pub trait Block { pub trait Block {
/// Verifies the hashes and counts of a slice of events are all consistent. /// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify(&self, start_hash: &Hash) -> bool; fn verify(&self, start_hash: &Hash) -> bool;
} }
@ -26,17 +26,17 @@ impl Block for [Entry] {
} }
} }
/// Create a vector of Entries of length `event_set.len()` from `start_hash` hash, `num_hashes`, and `event_set`. /// Create a vector of Entries of length `transaction_batches.len()` from `start_hash` hash, `num_hashes`, and `transaction_batches`.
pub fn next_entries( pub fn next_entries(
start_hash: &Hash, start_hash: &Hash,
num_hashes: u64, num_hashes: u64,
event_set: Vec<Vec<Transaction>>, transaction_batches: Vec<Vec<Transaction>>,
) -> Vec<Entry> { ) -> Vec<Entry> {
let mut id = *start_hash; let mut id = *start_hash;
let mut entries = vec![]; let mut entries = vec![];
for event_list in &event_set { for transactions in &transaction_batches {
let events = event_list.clone(); let transactions = transactions.clone();
let entry = next_entry(&id, num_hashes, events); let entry = next_entry(&id, num_hashes, transactions);
id = entry.id; id = entry.id;
entries.push(entry); entries.push(entry);
} }
@ -54,33 +54,37 @@ pub fn process_entry_list_into_blobs(
let mut entries: Vec<Vec<Entry>> = Vec::new(); let mut entries: Vec<Vec<Entry>> = Vec::new();
let mut total = 0; let mut total = 0;
for i in &list[start..] { for i in &list[start..] {
total += size_of::<Transaction>() * i.events.len(); total += size_of::<Transaction>() * i.transactions.len();
total += size_of::<Entry>(); total += size_of::<Entry>();
if total >= BLOB_DATA_SIZE { if total >= BLOB_DATA_SIZE {
break; break;
} }
end += 1; end += 1;
} }
// See if we need to split the events // See if we need to split the transactions
if end <= start { if end <= start {
let mut event_start = 0; let mut transaction_start = 0;
let num_events_per_blob = BLOB_DATA_SIZE / size_of::<Transaction>(); let num_transactions_per_blob = BLOB_DATA_SIZE / size_of::<Transaction>();
let total_entry_chunks = let total_entry_chunks = (list[end].transactions.len() + num_transactions_per_blob - 1)
(list[end].events.len() + num_events_per_blob - 1) / num_events_per_blob; / num_transactions_per_blob;
trace!( trace!(
"splitting events end: {} total_chunks: {}", "splitting transactions end: {} total_chunks: {}",
end, end,
total_entry_chunks total_entry_chunks
); );
for _ in 0..total_entry_chunks { for _ in 0..total_entry_chunks {
let event_end = min(event_start + num_events_per_blob, list[end].events.len()); let transaction_end = min(
transaction_start + num_transactions_per_blob,
list[end].transactions.len(),
);
let mut entry = Entry { let mut entry = Entry {
num_hashes: list[end].num_hashes, num_hashes: list[end].num_hashes,
id: list[end].id, id: list[end].id,
events: list[end].events[event_start..event_end].to_vec(), transactions: list[end].transactions[transaction_start..transaction_end]
.to_vec(),
}; };
entries.push(vec![entry]); entries.push(vec![entry]);
event_start = event_end; transaction_start = transaction_end;
} }
end += 1; end += 1;
} else { } else {
@ -112,7 +116,7 @@ pub fn reconstruct_entries_from_blobs(blobs: &VecDeque<SharedBlob>) -> Vec<Entry
for entry in entries { for entry in entries {
if entry.id == last_id { if entry.id == last_id {
if let Some(last_entry) = entries_to_apply.last_mut() { if let Some(last_entry) = entries_to_apply.last_mut() {
last_entry.events.extend(entry.events); last_entry.transactions.extend(entry.transactions);
} }
} else { } else {
last_id = entry.id; last_id = entry.id;
@ -152,8 +156,8 @@ mod tests {
let one = hash(&zero); let one = hash(&zero);
let keypair = KeyPair::new(); let keypair = KeyPair::new();
let tr0 = Transaction::new(&keypair, keypair.pubkey(), 1, one); let tr0 = Transaction::new(&keypair, keypair.pubkey(), 1, one);
let events = vec![tr0.clone(); 10000]; let transactions = vec![tr0.clone(); 10000];
let e0 = Entry::new(&zero, 0, events); let e0 = Entry::new(&zero, 0, transactions);
let entry_list = vec![e0.clone(); 1]; let entry_list = vec![e0.clone(); 1];
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
@ -170,15 +174,15 @@ mod tests {
let next_id = hash(&id); let next_id = hash(&id);
let keypair = KeyPair::new(); let keypair = KeyPair::new();
let tr0 = Transaction::new(&keypair, keypair.pubkey(), 1, next_id); let tr0 = Transaction::new(&keypair, keypair.pubkey(), 1, next_id);
let events = vec![tr0.clone(); 5]; let transactions = vec![tr0.clone(); 5];
let event_set = vec![events.clone(); 5]; let transaction_batches = vec![transactions.clone(); 5];
let entries0 = next_entries(&id, 0, event_set); let entries0 = next_entries(&id, 0, transaction_batches);
assert_eq!(entries0.len(), 5); assert_eq!(entries0.len(), 5);
let mut entries1 = vec![]; let mut entries1 = vec![];
for _ in 0..5 { for _ in 0..5 {
let entry = next_entry(&id, 0, events.clone()); let entry = next_entry(&id, 0, transactions.clone());
id = entry.id; id = entry.id;
entries1.push(entry); entries1.push(entry);
} }
@ -193,7 +197,7 @@ mod bench {
use ledger::*; use ledger::*;
#[bench] #[bench]
fn event_bench(bencher: &mut Bencher) { fn next_entries_bench(bencher: &mut Bencher) {
let start_hash = Hash::default(); let start_hash = Hash::default();
let entries = next_entries(&start_hash, 10_000, vec![vec![]; 8]); let entries = next_entries(&start_hash, 10_000, vec![vec![]; 8]);
bencher.iter(|| { bencher.iter(|| {

View File

@ -46,7 +46,7 @@ impl Mint {
self.pubkey self.pubkey
} }
pub fn create_events(&self) -> Vec<Transaction> { pub fn create_transactions(&self) -> Vec<Transaction> {
let keypair = self.keypair(); let keypair = self.keypair();
let tr = Transaction::new(&keypair, self.pubkey(), self.tokens, self.seed()); let tr = Transaction::new(&keypair, self.pubkey(), self.tokens, self.seed());
vec![tr] vec![tr]
@ -54,7 +54,7 @@ impl Mint {
pub fn create_entries(&self) -> Vec<Entry> { pub fn create_entries(&self) -> Vec<Entry> {
let e0 = Entry::new(&self.seed(), 0, vec![]); let e0 = Entry::new(&self.seed(), 0, vec![]);
let e1 = Entry::new(&e0.id, 0, self.create_events()); let e1 = Entry::new(&e0.id, 0, self.create_transactions());
vec![e0, e1] vec![e0, e1]
} }
} }
@ -73,15 +73,15 @@ mod tests {
use transaction::Instruction; use transaction::Instruction;
#[test] #[test]
fn test_create_events() { fn test_create_transactions() {
let mut events = Mint::new(100).create_events().into_iter(); let mut transactions = Mint::new(100).create_transactions().into_iter();
let tr = events.next().unwrap(); let tr = transactions.next().unwrap();
if let Instruction::NewContract(contract) = tr.instruction { if let Instruction::NewContract(contract) = tr.instruction {
if let Plan::Pay(payment) = contract.plan { if let Plan::Pay(payment) = contract.plan {
assert_eq!(tr.from, payment.to); assert_eq!(tr.from, payment.to);
} }
} }
assert_eq!(events.next(), None); assert_eq!(transactions.next(), None);
} }
#[test] #[test]

View File

@ -1,5 +1,5 @@
//! The `plan` module provides a domain-specific language for payment plans. Users create Plan objects that //! The `plan` module provides a domain-specific language for payment plans. Users create Plan objects that
//! are given to an interpreter. The interpreter listens for `Witness` events, //! are given to an interpreter. The interpreter listens for `Witness` transactions,
//! which it uses to reduce the payment plan. When the plan is reduced to a //! which it uses to reduce the payment plan. When the plan is reduced to a
//! `Payment`, the payment is executed. //! `Payment`, the payment is executed.

View File

@ -2,8 +2,8 @@
//! It records Event items on behalf of its users. It continuously generates //! It records Event items on behalf of its users. It continuously generates
//! new hashes, only stopping to check if it has been sent an Event item. It //! new hashes, only stopping to check if it has been sent an Event item. It
//! tags each Event with an Entry, and sends it back. The Entry includes the //! tags each Event with an Entry, and sends it back. The Entry includes the
//! Event, the latest hash, and the number of hashes since the last event. //! Event, the latest hash, and the number of hashes since the last transaction.
//! The resulting stream of entries represents ordered events in time. //! The resulting stream of entries represents ordered transactions in time.
use entry::Entry; use entry::Entry;
use hash::Hash; use hash::Hash;
@ -28,7 +28,7 @@ impl RecordStage {
/// A background thread that will continue tagging received Event messages and /// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed. /// sending back Entry messages until either the receiver or sender channel is closed.
pub fn new( pub fn new(
event_receiver: Receiver<Signal>, transaction_receiver: Receiver<Signal>,
start_hash: &Hash, start_hash: &Hash,
tick_duration: Option<Duration>, tick_duration: Option<Duration>,
) -> Self { ) -> Self {
@ -39,10 +39,10 @@ impl RecordStage {
let mut recorder = Recorder::new(start_hash); let mut recorder = Recorder::new(start_hash);
let duration_data = tick_duration.map(|dur| (Instant::now(), dur)); let duration_data = tick_duration.map(|dur| (Instant::now(), dur));
loop { loop {
if let Err(_) = Self::process_events( if let Err(_) = Self::process_transactions(
&mut recorder, &mut recorder,
duration_data, duration_data,
&event_receiver, &transaction_receiver,
&entry_sender, &entry_sender,
) { ) {
return; return;
@ -59,7 +59,7 @@ impl RecordStage {
} }
} }
pub fn process_events( pub fn process_transactions(
recorder: &mut Recorder, recorder: &mut Recorder,
duration_data: Option<(Instant, Duration)>, duration_data: Option<(Instant, Duration)>,
receiver: &Receiver<Signal>, receiver: &Receiver<Signal>,
@ -77,8 +77,8 @@ impl RecordStage {
let entry = recorder.record(vec![]); let entry = recorder.record(vec![]);
sender.send(entry).or(Err(()))?; sender.send(entry).or(Err(()))?;
} }
Signal::Events(events) => { Signal::Events(transactions) => {
let entry = recorder.record(events); let entry = recorder.record(transactions);
sender.send(entry).or(Err(()))?; sender.send(entry).or(Err(()))?;
} }
}, },
@ -99,15 +99,15 @@ mod tests {
#[test] #[test]
fn test_historian() { fn test_historian() {
let (input, event_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(event_receiver, &zero, None); let record_stage = RecordStage::new(tx_receiver, &zero, None);
input.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
input.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
sleep(Duration::new(0, 1_000_000)); sleep(Duration::new(0, 1_000_000));
input.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
let entry0 = record_stage.entry_receiver.recv().unwrap(); let entry0 = record_stage.entry_receiver.recv().unwrap();
let entry1 = record_stage.entry_receiver.recv().unwrap(); let entry1 = record_stage.entry_receiver.recv().unwrap();
@ -117,7 +117,7 @@ mod tests {
assert_eq!(entry1.num_hashes, 0); assert_eq!(entry1.num_hashes, 0);
assert_eq!(entry2.num_hashes, 0); assert_eq!(entry2.num_hashes, 0);
drop(input); drop(tx_sender);
assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
assert!([entry0, entry1, entry2].verify(&zero)); assert!([entry0, entry1, entry2].verify(&zero));
@ -125,25 +125,25 @@ mod tests {
#[test] #[test]
fn test_historian_closed_sender() { fn test_historian_closed_sender() {
let (input, event_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(event_receiver, &zero, None); let record_stage = RecordStage::new(tx_receiver, &zero, None);
drop(record_stage.entry_receiver); drop(record_stage.entry_receiver);
input.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
assert_eq!(record_stage.thread_hdl.join().unwrap(), ()); assert_eq!(record_stage.thread_hdl.join().unwrap(), ());
} }
#[test] #[test]
fn test_events() { fn test_transactions() {
let (input, signal_receiver) = channel(); let (tx_sender, signal_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(signal_receiver, &zero, None); let record_stage = RecordStage::new(signal_receiver, &zero, None);
let alice_keypair = KeyPair::new(); let alice_keypair = KeyPair::new();
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let event0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero); let tx0 = Transaction::new(&alice_keypair, bob_pubkey, 1, zero);
let event1 = Transaction::new(&alice_keypair, bob_pubkey, 2, zero); let tx1 = Transaction::new(&alice_keypair, bob_pubkey, 2, zero);
input.send(Signal::Events(vec![event0, event1])).unwrap(); tx_sender.send(Signal::Events(vec![tx0, tx1])).unwrap();
drop(input); drop(tx_sender);
let entries: Vec<_> = record_stage.entry_receiver.iter().collect(); let entries: Vec<_> = record_stage.entry_receiver.iter().collect();
assert_eq!(entries.len(), 1); assert_eq!(entries.len(), 1);
} }
@ -151,12 +151,12 @@ mod tests {
#[test] #[test]
#[ignore] #[ignore]
fn test_ticking_historian() { fn test_ticking_historian() {
let (input, event_receiver) = channel(); let (tx_sender, tx_receiver) = channel();
let zero = Hash::default(); let zero = Hash::default();
let record_stage = RecordStage::new(event_receiver, &zero, Some(Duration::from_millis(20))); let record_stage = RecordStage::new(tx_receiver, &zero, Some(Duration::from_millis(20)));
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
input.send(Signal::Tick).unwrap(); tx_sender.send(Signal::Tick).unwrap();
drop(input); drop(tx_sender);
let entries: Vec<Entry> = record_stage.entry_receiver.iter().collect(); let entries: Vec<Entry> = record_stage.entry_receiver.iter().collect();
assert!(entries.len() > 1); assert!(entries.len() > 1);

View File

@ -26,8 +26,8 @@ impl Recorder {
self.num_hashes += 1; self.num_hashes += 1;
} }
pub fn record(&mut self, events: Vec<Transaction>) -> Entry { pub fn record(&mut self, transactions: Vec<Transaction>) -> Entry {
Entry::new_mut(&mut self.last_hash, &mut self.num_hashes, events) Entry::new_mut(&mut self.last_hash, &mut self.num_hashes, transactions)
} }
pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option<Entry> { pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option<Entry> {

View File

@ -14,7 +14,6 @@ use std::sync::mpsc::Receiver;
use std::time::Instant; use std::time::Instant;
use streamer; use streamer;
use timing; use timing;
use transaction::Transaction;
pub struct RequestProcessor { pub struct RequestProcessor {
bank: Arc<Bank>, bank: Arc<Bank>,
@ -63,20 +62,7 @@ impl RequestProcessor {
.collect() .collect()
} }
fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> { pub fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
p.packets
.par_iter()
.map(|x| {
deserialize(&x.data[0..x.meta.size])
.map(|req| (req, x.meta.addr()))
.ok()
})
.collect()
}
// Copy-paste of deserialize_requests() because I can't figure out how to
// route the lifetimes in a generic version.
pub fn deserialize_events(p: &packet::Packets) -> Vec<Option<(Transaction, SocketAddr)>> {
p.packets p.packets
.par_iter() .par_iter()
.map(|x| { .map(|x| {

View File

@ -24,7 +24,7 @@ impl Server {
tick_duration: Option<Duration>, tick_duration: Option<Duration>,
me: ReplicatedData, me: ReplicatedData,
requests_socket: UdpSocket, requests_socket: UdpSocket,
events_socket: UdpSocket, transactions_socket: UdpSocket,
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
respond_socket: UdpSocket, respond_socket: UdpSocket,
gossip_socket: UdpSocket, gossip_socket: UdpSocket,
@ -40,7 +40,7 @@ impl Server {
start_hash, start_hash,
tick_duration, tick_duration,
me, me,
events_socket, transactions_socket,
broadcast_socket, broadcast_socket,
gossip_socket, gossip_socket,
exit.clone(), exit.clone(),

View File

@ -672,14 +672,14 @@ mod test {
let addr = read.local_addr().unwrap(); let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let serve = UdpSocket::bind("127.0.0.1:0").expect("bind"); let serve = UdpSocket::bind("127.0.0.1:0").expect("bind");
let event = UdpSocket::bind("127.0.0.1:0").expect("bind"); let transaction = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let rep_data = ReplicatedData::new( let rep_data = ReplicatedData::new(
pubkey_me, pubkey_me,
read.local_addr().unwrap(), read.local_addr().unwrap(),
send.local_addr().unwrap(), send.local_addr().unwrap(),
serve.local_addr().unwrap(), serve.local_addr().unwrap(),
event.local_addr().unwrap(), transaction.local_addr().unwrap(),
); );
let mut crdt_me = Crdt::new(rep_data); let mut crdt_me = Crdt::new(rep_data);
let me_id = crdt_me.my_data().id; let me_id = crdt_me.my_data().id;
@ -736,14 +736,14 @@ mod test {
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let serve = UdpSocket::bind("127.0.0.1:0").unwrap(); let serve = UdpSocket::bind("127.0.0.1:0").unwrap();
let event = UdpSocket::bind("127.0.0.1:0").unwrap(); let transaction = UdpSocket::bind("127.0.0.1:0").unwrap();
let pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new( let d = ReplicatedData::new(
pubkey, pubkey,
gossip.local_addr().unwrap(), gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(), replicate.local_addr().unwrap(),
serve.local_addr().unwrap(), serve.local_addr().unwrap(),
event.local_addr().unwrap(), transaction.local_addr().unwrap(),
); );
trace!("data: {:?}", d); trace!("data: {:?}", d);
let crdt = Crdt::new(d); let crdt = Crdt::new(d);

View File

@ -16,8 +16,8 @@ use transaction::Transaction;
pub struct ThinClient { pub struct ThinClient {
requests_addr: SocketAddr, requests_addr: SocketAddr,
requests_socket: UdpSocket, requests_socket: UdpSocket,
events_addr: SocketAddr, transactions_addr: SocketAddr,
events_socket: UdpSocket, transactions_socket: UdpSocket,
last_id: Option<Hash>, last_id: Option<Hash>,
transaction_count: u64, transaction_count: u64,
balances: HashMap<PublicKey, Option<i64>>, balances: HashMap<PublicKey, Option<i64>>,
@ -25,19 +25,19 @@ pub struct ThinClient {
impl ThinClient { impl ThinClient {
/// Create a new ThinClient that will interface with Rpu /// Create a new ThinClient that will interface with Rpu
/// over `requests_socket` and `events_socket`. To receive responses, the caller must bind `socket` /// over `requests_socket` and `transactions_socket`. To receive responses, the caller must bind `socket`
/// to a public address before invoking ThinClient methods. /// to a public address before invoking ThinClient methods.
pub fn new( pub fn new(
requests_addr: SocketAddr, requests_addr: SocketAddr,
requests_socket: UdpSocket, requests_socket: UdpSocket,
events_addr: SocketAddr, transactions_addr: SocketAddr,
events_socket: UdpSocket, transactions_socket: UdpSocket,
) -> Self { ) -> Self {
let client = ThinClient { let client = ThinClient {
requests_addr, requests_addr,
requests_socket, requests_socket,
events_addr, transactions_addr,
events_socket, transactions_socket,
last_id: None, last_id: None,
transaction_count: 0, transaction_count: 0,
balances: HashMap::new(), balances: HashMap::new(),
@ -75,7 +75,8 @@ impl ThinClient {
/// does not wait for a response. /// does not wait for a response.
pub fn transfer_signed(&self, tr: Transaction) -> io::Result<usize> { pub fn transfer_signed(&self, tr: Transaction) -> io::Result<usize> {
let data = serialize(&tr).expect("serialize Transaction in pub fn transfer_signed"); let data = serialize(&tr).expect("serialize Transaction in pub fn transfer_signed");
self.events_socket.send_to(&data, &self.events_addr) self.transactions_socket
.send_to(&data, &self.transactions_addr)
} }
/// Creates, signs, and processes a Transaction. Useful for writing unit-tests. /// Creates, signs, and processes a Transaction. Useful for writing unit-tests.
@ -209,7 +210,7 @@ mod tests {
Some(Duration::from_millis(30)), Some(Duration::from_millis(30)),
leader.data.clone(), leader.data.clone(),
leader.sockets.requests, leader.sockets.requests,
leader.sockets.event, leader.sockets.transaction,
leader.sockets.broadcast, leader.sockets.broadcast,
leader.sockets.respond, leader.sockets.respond,
leader.sockets.gossip, leader.sockets.gossip,
@ -219,13 +220,13 @@ mod tests {
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader.data.requests_addr, leader.data.requests_addr,
requests_socket, requests_socket,
leader.data.events_addr, leader.data.transactions_addr,
events_socket, transactions_socket,
); );
let last_id = client.get_last_id().wait().unwrap(); let last_id = client.get_last_id().wait().unwrap();
let _sig = client let _sig = client
@ -254,7 +255,7 @@ mod tests {
Some(Duration::from_millis(30)), Some(Duration::from_millis(30)),
leader.data.clone(), leader.data.clone(),
leader.sockets.requests, leader.sockets.requests,
leader.sockets.event, leader.sockets.transaction,
leader.sockets.broadcast, leader.sockets.broadcast,
leader.sockets.respond, leader.sockets.respond,
leader.sockets.gossip, leader.sockets.gossip,
@ -267,12 +268,12 @@ mod tests {
requests_socket requests_socket
.set_read_timeout(Some(Duration::new(5, 0))) .set_read_timeout(Some(Duration::new(5, 0)))
.unwrap(); .unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader.data.requests_addr, leader.data.requests_addr,
requests_socket, requests_socket,
leader.data.events_addr, leader.data.transactions_addr,
events_socket, transactions_socket,
); );
let last_id = client.get_last_id().wait().unwrap(); let last_id = client.get_last_id().wait().unwrap();
@ -383,7 +384,7 @@ mod tests {
None, None,
leader.data.clone(), leader.data.clone(),
leader.sockets.requests, leader.sockets.requests,
leader.sockets.event, leader.sockets.transaction,
leader.sockets.broadcast, leader.sockets.broadcast,
leader.sockets.respond, leader.sockets.respond,
leader.sockets.gossip, leader.sockets.gossip,
@ -424,13 +425,13 @@ mod tests {
requests_socket requests_socket
.set_read_timeout(Some(Duration::new(1, 0))) .set_read_timeout(Some(Duration::new(1, 0)))
.unwrap(); .unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
ThinClient::new( ThinClient::new(
leader.requests_addr, leader.requests_addr,
requests_socket, requests_socket,
leader.events_addr, leader.transactions_addr,
events_socket, transactions_socket,
) )
} }

View File

@ -28,7 +28,7 @@ impl Tpu {
start_hash: Hash, start_hash: Hash,
tick_duration: Option<Duration>, tick_duration: Option<Duration>,
me: ReplicatedData, me: ReplicatedData,
events_socket: UdpSocket, transactions_socket: UdpSocket,
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
gossip: UdpSocket, gossip: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
@ -37,7 +37,7 @@ impl Tpu {
let packet_recycler = packet::PacketRecycler::default(); let packet_recycler = packet::PacketRecycler::default();
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver( let t_receiver = streamer::receiver(
events_socket, transactions_socket,
exit.clone(), exit.clone(),
packet_recycler.clone(), packet_recycler.clone(),
packet_sender, packet_sender,

View File

@ -132,7 +132,7 @@ use std::time::Duration;
pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) {
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let transactions_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
@ -145,9 +145,9 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
gossip.local_addr().unwrap(), gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(), replicate.local_addr().unwrap(),
requests_socket.local_addr().unwrap(), requests_socket.local_addr().unwrap(),
events_socket.local_addr().unwrap(), transactions_socket.local_addr().unwrap(),
); );
(d, gossip, replicate, requests_socket, events_socket) (d, gossip, replicate, requests_socket, transactions_socket)
} }
#[cfg(test)] #[cfg(test)]
@ -307,7 +307,7 @@ pub mod tests {
pub gossip: UdpSocket, pub gossip: UdpSocket,
pub requests: UdpSocket, pub requests: UdpSocket,
pub replicate: UdpSocket, pub replicate: UdpSocket,
pub event: UdpSocket, pub transaction: UdpSocket,
pub respond: UdpSocket, pub respond: UdpSocket,
pub broadcast: UdpSocket, pub broadcast: UdpSocket,
} }
@ -319,7 +319,7 @@ pub mod tests {
pub fn new() -> TestNode { pub fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap(); let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let event = UdpSocket::bind("0.0.0.0:0").unwrap(); let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -329,7 +329,7 @@ pub mod tests {
gossip.local_addr().unwrap(), gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(), replicate.local_addr().unwrap(),
requests.local_addr().unwrap(), requests.local_addr().unwrap(),
event.local_addr().unwrap(), transaction.local_addr().unwrap(),
); );
TestNode { TestNode {
data: data, data: data,
@ -337,7 +337,7 @@ pub mod tests {
gossip, gossip,
requests, requests,
replicate, replicate,
event, transaction,
respond, respond,
broadcast, broadcast,
}, },