From 2030dfa435860d65e03d6da0f380cad627ec30b7 Mon Sep 17 00:00:00 2001 From: Carl Date: Sat, 15 Sep 2018 23:46:16 -0700 Subject: [PATCH] Implement PR comments, tidy up --- src/banking_stage.rs | 11 +++--- src/broadcast_stage.rs | 76 +++++++++++++++++++++--------------------- src/crdt.rs | 11 ++++++ src/fullnode.rs | 8 ++--- src/record_stage.rs | 3 +- src/request_stage.rs | 3 +- src/rpc.rs | 3 +- src/write_stage.rs | 72 +++++++++++++++++++-------------------- tests/multinode.rs | 35 +++++-------------- 9 files changed, 104 insertions(+), 118 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index dd569a017c..ab72cc4137 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -49,10 +49,8 @@ impl BankingStage { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::SendError => { - break; - } - _ => println!("BANKING ERROR {:?}", e), + Error::SendError => break, + _ => error!("{:?}", e), } } }).unwrap(); @@ -111,9 +109,8 @@ impl BankingStage { debug!("process_transactions"); let results = bank.process_transactions(transactions); let transactions = results.into_iter().filter_map(|x| x.ok()).collect(); - match signal_sender.send(Signal::Transactions(transactions)) { - Err(_) => return Err(Error::SendError), - _ => (), + if let Err(_) = signal_sender.send(Signal::Transactions(transactions)) { + return Err(Error::SendError); } debug!("done process_transactions"); diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index ecfb5b565a..4324bc7160 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -255,10 +255,9 @@ impl BroadcastStage { .spawn(move || { let _exit = Finalizer::new(exit_sender); Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver) - }) - .unwrap(); + }).unwrap(); - (BroadcastStage { thread_hdl }) + BroadcastStage { thread_hdl } } } @@ -286,18 +285,20 @@ mod tests { use std::sync::{Arc, RwLock}; use window::{new_window_from_entries, SharedWindow}; - fn setup_dummy_broadcast_stage() -> ( - Pubkey, - Pubkey, - BroadcastStage, - SharedWindow, - Sender>, - Arc>, - Vec, - ) { + struct DummyBroadcastStage { + my_id: Pubkey, + buddy_id: Pubkey, + broadcast_stage: BroadcastStage, + shared_window: SharedWindow, + entry_sender: Sender>, + crdt: Arc>, + entries: Vec, + } + + fn setup_dummy_broadcast_stage() -> DummyBroadcastStage { // Setup dummy leader info let leader_keypair = Keypair::new(); - let id = leader_keypair.pubkey(); + let my_id = leader_keypair.pubkey(); let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); // Give the leader somebody to broadcast to so he isn't lonely @@ -335,15 +336,15 @@ mod tests { exit_sender, ); - ( - id, + DummyBroadcastStage { + my_id, buddy_id, broadcast_stage, shared_window, entry_sender, crdt, entries, - ) + } } fn find_highest_window_index(shared_window: &SharedWindow) -> u64 { @@ -359,25 +360,20 @@ mod tests { #[test] fn test_broadcast_stage_leader_rotation_exit() { - let ( - id, - buddy_id, - broadcast_stage, - shared_window, - entry_sender, - crdt, - entries, - ) = setup_dummy_broadcast_stage(); + let broadcast_info = setup_dummy_broadcast_stage(); + { - let mut wcrdt = crdt.write().unwrap(); - // Set leader to myself - wcrdt.set_leader(id); - // Set the leader for the next rotation to also be myself - wcrdt.set_scheduled_leader(LEADER_ROTATION_INTERVAL, id); + let mut wcrdt = broadcast_info.crdt.write().unwrap(); + // Set the leader for the next rotation to be myself + wcrdt.set_scheduled_leader(LEADER_ROTATION_INTERVAL, broadcast_info.my_id); } - let genesis_len = entries.len() as u64; - let last_entry_hash = entries.last().expect("Ledger should not be empty").id; + let genesis_len = broadcast_info.entries.len() as u64; + let last_entry_hash = broadcast_info + .entries + .last() + .expect("Ledger should not be empty") + .id; // Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will // trigger a check for leader rotation. Because the next scheduled leader @@ -386,20 +382,22 @@ mod tests { for _ in genesis_len..LEADER_ROTATION_INTERVAL { let new_entry = recorder.record(vec![]); - entry_sender.send(new_entry).unwrap(); + broadcast_info.entry_sender.send(new_entry).unwrap(); } // Set the scheduled next leader in the crdt to the other buddy on the network - crdt.write() + broadcast_info + .crdt + .write() .unwrap() - .set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, buddy_id); + .set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, broadcast_info.buddy_id); // Input another LEADER_ROTATION_INTERVAL dummy entries, which will take us // past the point of the leader rotation. The write_stage will see that // it's no longer the leader after checking the crdt, and exit for _ in 0..LEADER_ROTATION_INTERVAL { let new_entry = recorder.record(vec![]); - match entry_sender.send(new_entry) { + match broadcast_info.entry_sender.send(new_entry) { // We disconnected, break out of loop and check the results Err(_) => break, _ => (), @@ -408,11 +406,13 @@ mod tests { // Make sure the threads closed cleanly assert_eq!( - broadcast_stage.join().unwrap(), + broadcast_info.broadcast_stage.join().unwrap(), BroadcastStageReturnType::LeaderRotation ); - let highest_index = find_highest_window_index(&shared_window); + let highest_index = find_highest_window_index(&broadcast_info.shared_window); + // The blob index is zero indexed, so it will always be one behind the entry height + // which starts at one. assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1); } } diff --git a/src/crdt.rs b/src/crdt.rs index 53b73b3de0..dd55f93177 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -319,6 +319,17 @@ impl Crdt { self.scheduled_leaders.insert(entry_height, new_leader_id); } + pub fn get_valid_peers(&self) -> Vec { + let me = self.my_data().id; + self.table + .values() + .into_iter() + .filter(|x| x.id != me) + .filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) + .cloned() + .collect() + } + pub fn get_external_liveness_entry(&self, key: &Pubkey) -> Option<&HashMap> { self.external_liveness.get(key) } diff --git a/src/fullnode.rs b/src/fullnode.rs index c915ccdf6d..9e04f54c49 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -377,6 +377,8 @@ impl Fullnode { } } + // Tell the RPU to serve requests out of the new bank we've created + // instead of the old one self.rpu.set_new_bank(self.bank.clone()); let tvu = Tvu::new( self.keypair.clone(), @@ -476,9 +478,6 @@ impl Service for Fullnode { _ => (), } - // TODO: Case on join values above to determine if - // a leader rotation was in order, and propagate up a - // signal to reflect that Ok(None) } } @@ -535,8 +534,7 @@ mod tests { &validator_ledger_path, false, ) - }) - .collect(); + }).collect(); //each validator can exit in parallel to speed many sequential calls to `join` vals.iter().for_each(|v| v.exit()); diff --git a/src/record_stage.rs b/src/record_stage.rs index e7ccf58adc..3edbb7a660 100644 --- a/src/record_stage.rs +++ b/src/record_stage.rs @@ -146,8 +146,7 @@ impl Service for RecordStage { type JoinReturnType = (); fn join(self) -> thread::Result<()> { - self.thread_hdl.join()?; - Ok(()) + self.thread_hdl.join() } } diff --git a/src/request_stage.rs b/src/request_stage.rs index 720a4ba07b..c25d1867d0 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -116,7 +116,6 @@ impl Service for RequestStage { type JoinReturnType = (); fn join(self) -> thread::Result<()> { - self.thread_hdl.join()?; - Ok(()) + self.thread_hdl.join() } } diff --git a/src/rpc.rs b/src/rpc.rs index ba07b964d9..375f7790c8 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -71,8 +71,7 @@ impl Service for JsonRpcService { type JoinReturnType = (); fn join(self) -> thread::Result<()> { - self.thread_hdl.join()?; - Ok(()) + self.thread_hdl.join() } } diff --git a/src/write_stage.rs b/src/write_stage.rs index 890a694d11..98baa90119 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -199,8 +199,8 @@ impl WriteStage { loop { // Note that entry height is not zero indexed, it starts at 1, so the // old leader is in power up to and including entry height - // n * LEADER_ROTATION_INTERVAL, so once we've forwarded that last block, - // check for the next leader. + // n * LEADER_ROTATION_INTERVAL for some "n". Once we've forwarded + // that last block, check for the next scheduled leader. if entry_height % (LEADER_ROTATION_INTERVAL as u64) == 0 { let rcrdt = crdt.read().unwrap(); let my_id = rcrdt.my_data().id; @@ -294,6 +294,17 @@ mod tests { use std::sync::{Arc, RwLock}; use write_stage::{WriteStage, WriteStageReturnType}; + struct DummyWriteStage { + my_id: Pubkey, + write_stage: WriteStage, + entry_sender: Sender>, + write_stage_entry_receiver: Receiver>, + crdt: Arc>, + bank: Arc, + leader_ledger_path: String, + ledger_tail: Vec, + } + fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec) { let entries = read_ledger(ledger_path, true).expect("opening ledger"); @@ -304,19 +315,10 @@ mod tests { bank.process_ledger(entries).expect("process_ledger") } - fn setup_dummy_write_stage() -> ( - Pubkey, - WriteStage, - Sender>, - Receiver>, - Arc>, - Arc, - String, - Vec, - ) { + fn setup_dummy_write_stage() -> DummyWriteStage { // Setup leader info let leader_keypair = Arc::new(Keypair::new()); - let id = leader_keypair.pubkey(); + let my_id = leader_keypair.pubkey(); let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let crdt = Arc::new(RwLock::new(Crdt::new(leader_info.info).expect("Crdt::new"))); @@ -343,8 +345,8 @@ mod tests { entry_height, ); - ( - id, + DummyWriteStage { + my_id, write_stage, entry_sender, write_stage_entry_receiver, @@ -352,29 +354,26 @@ mod tests { bank, leader_ledger_path, ledger_tail, - ) + } } #[test] fn test_write_stage_leader_rotation_exit() { - let ( - id, - write_stage, - entry_sender, - _write_stage_entry_receiver, - crdt, - bank, - leader_ledger_path, - ledger_tail, - ) = setup_dummy_write_stage(); + let write_stage_info = setup_dummy_write_stage(); - crdt.write() + write_stage_info + .crdt + .write() .unwrap() - .set_scheduled_leader(LEADER_ROTATION_INTERVAL, id); + .set_scheduled_leader(LEADER_ROTATION_INTERVAL, write_stage_info.my_id); - let last_entry_hash = ledger_tail.last().expect("Ledger should not be empty").id; + let last_entry_hash = write_stage_info + .ledger_tail + .last() + .expect("Ledger should not be empty") + .id; - let genesis_entry_height = ledger_tail.len() as u64; + let genesis_entry_height = write_stage_info.ledger_tail.len() as u64; // Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will // trigger a check for leader rotation. Because the next scheduled leader @@ -382,7 +381,7 @@ mod tests { let mut recorder = Recorder::new(last_entry_hash); for _ in genesis_entry_height..LEADER_ROTATION_INTERVAL { let new_entry = recorder.record(vec![]); - entry_sender.send(new_entry).unwrap(); + write_stage_info.entry_sender.send(new_entry).unwrap(); } // Set the scheduled next leader in the crdt to some other node @@ -390,7 +389,7 @@ mod tests { let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey()); { - let mut wcrdt = crdt.write().unwrap(); + let mut wcrdt = write_stage_info.crdt.write().unwrap(); wcrdt.insert(&leader2_info.info); wcrdt.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, leader2_keypair.pubkey()); } @@ -401,17 +400,18 @@ mod tests { // checking the schedule, and exit for _ in 0..LEADER_ROTATION_INTERVAL { let new_entry = recorder.record(vec![]); - entry_sender.send(new_entry).unwrap(); + write_stage_info.entry_sender.send(new_entry).unwrap(); } assert_eq!( - write_stage.join().unwrap(), + write_stage_info.write_stage.join().unwrap(), WriteStageReturnType::LeaderRotation ); // Make sure the ledger contains exactly LEADER_ROTATION_INTERVAL entries - let (entry_height, _) = process_ledger(&leader_ledger_path, &bank); - remove_dir_all(leader_ledger_path).unwrap(); + let (entry_height, _) = + process_ledger(&write_stage_info.leader_ledger_path, &write_stage_info.bank); + remove_dir_all(write_stage_info.leader_ledger_path).unwrap(); assert_eq!(entry_height, 2 * LEADER_ROTATION_INTERVAL); } } diff --git a/tests/multinode.rs b/tests/multinode.rs index 28a434bce8..3e75f2f76d 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -56,23 +56,14 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { //lets spy on the network - let (ncp, spy_ref, me) = make_spy_node(leader); + let (ncp, spy_ref, _) = make_spy_node(leader); //wait for the network to converge let mut converged = false; let mut rv = vec![]; for _ in 0..30 { let num = spy_ref.read().unwrap().convergence(); - let mut v: Vec = spy_ref - .read() - .unwrap() - .table - .values() - .into_iter() - .filter(|x| x.id != me) - .filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) - .cloned() - .collect(); + let mut v = spy_ref.read().unwrap().get_valid_peers(); if num >= num_nodes as u64 && v.len() >= num_nodes { rv.append(&mut v); converged = true; @@ -753,22 +744,13 @@ fn test_leader_to_validator_transition() { // Make an extra node for our leader to broadcast to, // who won't vote and mess with our leader's entry count - let (ncp, spy_node, me) = make_spy_node(&leader_info); + let (ncp, spy_node, _) = make_spy_node(&leader_info); // Wait for the leader to see the spy node let mut converged = false; for _ in 0..30 { let num = spy_node.read().unwrap().convergence(); - let mut v: Vec = spy_node - .read() - .unwrap() - .table - .values() - .into_iter() - .filter(|x| x.id != me) - .filter(|x| Crdt::is_valid_address(&x.contact_info.rpu)) - .cloned() - .collect(); + let mut v: Vec = spy_node.read().unwrap().get_valid_peers(); // There's only one person excluding the spy node (the leader) who should see // two nodes on the network if num >= 2 as u64 && v.len() >= 1 { @@ -780,15 +762,16 @@ fn test_leader_to_validator_transition() { assert!(converged); - let extra_transactions = std::cmp::max(LEADER_ROTATION_INTERVAL / 4, 1); + let extra_transactions = std::cmp::max(LEADER_ROTATION_INTERVAL / 20, 1); - // Push leader "extra_transactions" past LEADER_ROTATION_INTERVAL entry height, + // Push leader "extra_transactions" past LEADER_ROTATION_INTERVAL entry height, // make sure the leader stops. assert!(genesis_height < LEADER_ROTATION_INTERVAL); for i in genesis_height..(LEADER_ROTATION_INTERVAL + extra_transactions) { let expected_balance = std::cmp::min( LEADER_ROTATION_INTERVAL - genesis_height, - i - genesis_height); + i - genesis_height, + ); send_tx_and_retry_get_balance( &leader_info, @@ -806,7 +789,7 @@ fn test_leader_to_validator_transition() { } // Query now validator to make sure that he has the proper balances in his bank - // after the transition, even though we submitted "extra_transactions" + // after the transition, even though we submitted "extra_transactions" // transactions earlier let mut leader_client = mk_client(&leader_info);