Implement PR comments, tidy up

This commit is contained in:
Carl
2018-09-15 23:46:16 -07:00
committed by Greg Fitzgerald
parent bfe64f5f6e
commit 2030dfa435
9 changed files with 104 additions and 118 deletions

View File

@ -49,10 +49,8 @@ impl BankingStage {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => { Error::SendError => break,
break; _ => error!("{:?}", e),
}
_ => println!("BANKING ERROR {:?}", e),
} }
} }
}).unwrap(); }).unwrap();
@ -111,9 +109,8 @@ impl BankingStage {
debug!("process_transactions"); debug!("process_transactions");
let results = bank.process_transactions(transactions); let results = bank.process_transactions(transactions);
let transactions = results.into_iter().filter_map(|x| x.ok()).collect(); let transactions = results.into_iter().filter_map(|x| x.ok()).collect();
match signal_sender.send(Signal::Transactions(transactions)) { if let Err(_) = signal_sender.send(Signal::Transactions(transactions)) {
Err(_) => return Err(Error::SendError), return Err(Error::SendError);
_ => (),
} }
debug!("done process_transactions"); debug!("done process_transactions");

View File

@ -255,10 +255,9 @@ impl BroadcastStage {
.spawn(move || { .spawn(move || {
let _exit = Finalizer::new(exit_sender); let _exit = Finalizer::new(exit_sender);
Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver) Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver)
}) }).unwrap();
.unwrap();
(BroadcastStage { thread_hdl }) BroadcastStage { thread_hdl }
} }
} }
@ -286,18 +285,20 @@ mod tests {
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use window::{new_window_from_entries, SharedWindow}; use window::{new_window_from_entries, SharedWindow};
fn setup_dummy_broadcast_stage() -> ( struct DummyBroadcastStage {
Pubkey, my_id: Pubkey,
Pubkey, buddy_id: Pubkey,
BroadcastStage, broadcast_stage: BroadcastStage,
SharedWindow, shared_window: SharedWindow,
Sender<Vec<Entry>>, entry_sender: Sender<Vec<Entry>>,
Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
Vec<Entry>, entries: Vec<Entry>,
) { }
fn setup_dummy_broadcast_stage() -> DummyBroadcastStage {
// Setup dummy leader info // Setup dummy leader info
let leader_keypair = Keypair::new(); let leader_keypair = Keypair::new();
let id = leader_keypair.pubkey(); let my_id = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
// Give the leader somebody to broadcast to so he isn't lonely // Give the leader somebody to broadcast to so he isn't lonely
@ -335,15 +336,15 @@ mod tests {
exit_sender, exit_sender,
); );
( DummyBroadcastStage {
id, my_id,
buddy_id, buddy_id,
broadcast_stage, broadcast_stage,
shared_window, shared_window,
entry_sender, entry_sender,
crdt, crdt,
entries, entries,
) }
} }
fn find_highest_window_index(shared_window: &SharedWindow) -> u64 { fn find_highest_window_index(shared_window: &SharedWindow) -> u64 {
@ -359,25 +360,20 @@ mod tests {
#[test] #[test]
fn test_broadcast_stage_leader_rotation_exit() { fn test_broadcast_stage_leader_rotation_exit() {
let ( let broadcast_info = setup_dummy_broadcast_stage();
id,
buddy_id,
broadcast_stage,
shared_window,
entry_sender,
crdt,
entries,
) = setup_dummy_broadcast_stage();
{ {
let mut wcrdt = crdt.write().unwrap(); let mut wcrdt = broadcast_info.crdt.write().unwrap();
// Set leader to myself // Set the leader for the next rotation to be myself
wcrdt.set_leader(id); wcrdt.set_scheduled_leader(LEADER_ROTATION_INTERVAL, broadcast_info.my_id);
// Set the leader for the next rotation to also be myself
wcrdt.set_scheduled_leader(LEADER_ROTATION_INTERVAL, id);
} }
let genesis_len = entries.len() as u64; let genesis_len = broadcast_info.entries.len() as u64;
let last_entry_hash = entries.last().expect("Ledger should not be empty").id; let last_entry_hash = broadcast_info
.entries
.last()
.expect("Ledger should not be empty")
.id;
// Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will // Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will
// trigger a check for leader rotation. Because the next scheduled leader // trigger a check for leader rotation. Because the next scheduled leader
@ -386,20 +382,22 @@ mod tests {
for _ in genesis_len..LEADER_ROTATION_INTERVAL { for _ in genesis_len..LEADER_ROTATION_INTERVAL {
let new_entry = recorder.record(vec![]); let new_entry = recorder.record(vec![]);
entry_sender.send(new_entry).unwrap(); broadcast_info.entry_sender.send(new_entry).unwrap();
} }
// Set the scheduled next leader in the crdt to the other buddy on the network // Set the scheduled next leader in the crdt to the other buddy on the network
crdt.write() broadcast_info
.crdt
.write()
.unwrap() .unwrap()
.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, buddy_id); .set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, broadcast_info.buddy_id);
// Input another LEADER_ROTATION_INTERVAL dummy entries, which will take us // Input another LEADER_ROTATION_INTERVAL dummy entries, which will take us
// past the point of the leader rotation. The write_stage will see that // past the point of the leader rotation. The write_stage will see that
// it's no longer the leader after checking the crdt, and exit // it's no longer the leader after checking the crdt, and exit
for _ in 0..LEADER_ROTATION_INTERVAL { for _ in 0..LEADER_ROTATION_INTERVAL {
let new_entry = recorder.record(vec![]); let new_entry = recorder.record(vec![]);
match entry_sender.send(new_entry) { match broadcast_info.entry_sender.send(new_entry) {
// We disconnected, break out of loop and check the results // We disconnected, break out of loop and check the results
Err(_) => break, Err(_) => break,
_ => (), _ => (),
@ -408,11 +406,13 @@ mod tests {
// Make sure the threads closed cleanly // Make sure the threads closed cleanly
assert_eq!( assert_eq!(
broadcast_stage.join().unwrap(), broadcast_info.broadcast_stage.join().unwrap(),
BroadcastStageReturnType::LeaderRotation BroadcastStageReturnType::LeaderRotation
); );
let highest_index = find_highest_window_index(&shared_window); let highest_index = find_highest_window_index(&broadcast_info.shared_window);
// The blob index is zero indexed, so it will always be one behind the entry height
// which starts at one.
assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1); assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1);
} }
} }

View File

@ -319,6 +319,17 @@ impl Crdt {
self.scheduled_leaders.insert(entry_height, new_leader_id); self.scheduled_leaders.insert(entry_height, new_leader_id);
} }
pub fn get_valid_peers(&self) -> Vec<NodeInfo> {
let me = self.my_data().id;
self.table
.values()
.into_iter()
.filter(|x| x.id != me)
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
.cloned()
.collect()
}
pub fn get_external_liveness_entry(&self, key: &Pubkey) -> Option<&HashMap<Pubkey, u64>> { pub fn get_external_liveness_entry(&self, key: &Pubkey) -> Option<&HashMap<Pubkey, u64>> {
self.external_liveness.get(key) self.external_liveness.get(key)
} }

View File

@ -377,6 +377,8 @@ impl Fullnode {
} }
} }
// Tell the RPU to serve requests out of the new bank we've created
// instead of the old one
self.rpu.set_new_bank(self.bank.clone()); self.rpu.set_new_bank(self.bank.clone());
let tvu = Tvu::new( let tvu = Tvu::new(
self.keypair.clone(), self.keypair.clone(),
@ -476,9 +478,6 @@ impl Service for Fullnode {
_ => (), _ => (),
} }
// TODO: Case on join values above to determine if
// a leader rotation was in order, and propagate up a
// signal to reflect that
Ok(None) Ok(None)
} }
} }
@ -535,8 +534,7 @@ mod tests {
&validator_ledger_path, &validator_ledger_path,
false, false,
) )
}) }).collect();
.collect();
//each validator can exit in parallel to speed many sequential calls to `join` //each validator can exit in parallel to speed many sequential calls to `join`
vals.iter().for_each(|v| v.exit()); vals.iter().for_each(|v| v.exit());

View File

@ -146,8 +146,7 @@ impl Service for RecordStage {
type JoinReturnType = (); type JoinReturnType = ();
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
self.thread_hdl.join()?; self.thread_hdl.join()
Ok(())
} }
} }

View File

@ -116,7 +116,6 @@ impl Service for RequestStage {
type JoinReturnType = (); type JoinReturnType = ();
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
self.thread_hdl.join()?; self.thread_hdl.join()
Ok(())
} }
} }

View File

@ -71,8 +71,7 @@ impl Service for JsonRpcService {
type JoinReturnType = (); type JoinReturnType = ();
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {
self.thread_hdl.join()?; self.thread_hdl.join()
Ok(())
} }
} }

View File

@ -199,8 +199,8 @@ impl WriteStage {
loop { loop {
// Note that entry height is not zero indexed, it starts at 1, so the // Note that entry height is not zero indexed, it starts at 1, so the
// old leader is in power up to and including entry height // old leader is in power up to and including entry height
// n * LEADER_ROTATION_INTERVAL, so once we've forwarded that last block, // n * LEADER_ROTATION_INTERVAL for some "n". Once we've forwarded
// check for the next leader. // that last block, check for the next scheduled leader.
if entry_height % (LEADER_ROTATION_INTERVAL as u64) == 0 { if entry_height % (LEADER_ROTATION_INTERVAL as u64) == 0 {
let rcrdt = crdt.read().unwrap(); let rcrdt = crdt.read().unwrap();
let my_id = rcrdt.my_data().id; let my_id = rcrdt.my_data().id;
@ -294,6 +294,17 @@ mod tests {
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use write_stage::{WriteStage, WriteStageReturnType}; use write_stage::{WriteStage, WriteStageReturnType};
struct DummyWriteStage {
my_id: Pubkey,
write_stage: WriteStage,
entry_sender: Sender<Vec<Entry>>,
write_stage_entry_receiver: Receiver<Vec<Entry>>,
crdt: Arc<RwLock<Crdt>>,
bank: Arc<Bank>,
leader_ledger_path: String,
ledger_tail: Vec<Entry>,
}
fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec<Entry>) { fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec<Entry>) {
let entries = read_ledger(ledger_path, true).expect("opening ledger"); let entries = read_ledger(ledger_path, true).expect("opening ledger");
@ -304,19 +315,10 @@ mod tests {
bank.process_ledger(entries).expect("process_ledger") bank.process_ledger(entries).expect("process_ledger")
} }
fn setup_dummy_write_stage() -> ( fn setup_dummy_write_stage() -> DummyWriteStage {
Pubkey,
WriteStage,
Sender<Vec<Entry>>,
Receiver<Vec<Entry>>,
Arc<RwLock<Crdt>>,
Arc<Bank>,
String,
Vec<Entry>,
) {
// Setup leader info // Setup leader info
let leader_keypair = Arc::new(Keypair::new()); let leader_keypair = Arc::new(Keypair::new());
let id = leader_keypair.pubkey(); let my_id = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let crdt = Arc::new(RwLock::new(Crdt::new(leader_info.info).expect("Crdt::new"))); let crdt = Arc::new(RwLock::new(Crdt::new(leader_info.info).expect("Crdt::new")));
@ -343,8 +345,8 @@ mod tests {
entry_height, entry_height,
); );
( DummyWriteStage {
id, my_id,
write_stage, write_stage,
entry_sender, entry_sender,
write_stage_entry_receiver, write_stage_entry_receiver,
@ -352,29 +354,26 @@ mod tests {
bank, bank,
leader_ledger_path, leader_ledger_path,
ledger_tail, ledger_tail,
) }
} }
#[test] #[test]
fn test_write_stage_leader_rotation_exit() { fn test_write_stage_leader_rotation_exit() {
let ( let write_stage_info = setup_dummy_write_stage();
id,
write_stage,
entry_sender,
_write_stage_entry_receiver,
crdt,
bank,
leader_ledger_path,
ledger_tail,
) = setup_dummy_write_stage();
crdt.write() write_stage_info
.crdt
.write()
.unwrap() .unwrap()
.set_scheduled_leader(LEADER_ROTATION_INTERVAL, id); .set_scheduled_leader(LEADER_ROTATION_INTERVAL, write_stage_info.my_id);
let last_entry_hash = ledger_tail.last().expect("Ledger should not be empty").id; let last_entry_hash = write_stage_info
.ledger_tail
.last()
.expect("Ledger should not be empty")
.id;
let genesis_entry_height = ledger_tail.len() as u64; let genesis_entry_height = write_stage_info.ledger_tail.len() as u64;
// Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will // Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will
// trigger a check for leader rotation. Because the next scheduled leader // trigger a check for leader rotation. Because the next scheduled leader
@ -382,7 +381,7 @@ mod tests {
let mut recorder = Recorder::new(last_entry_hash); let mut recorder = Recorder::new(last_entry_hash);
for _ in genesis_entry_height..LEADER_ROTATION_INTERVAL { for _ in genesis_entry_height..LEADER_ROTATION_INTERVAL {
let new_entry = recorder.record(vec![]); let new_entry = recorder.record(vec![]);
entry_sender.send(new_entry).unwrap(); write_stage_info.entry_sender.send(new_entry).unwrap();
} }
// Set the scheduled next leader in the crdt to some other node // Set the scheduled next leader in the crdt to some other node
@ -390,7 +389,7 @@ mod tests {
let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey()); let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey());
{ {
let mut wcrdt = crdt.write().unwrap(); let mut wcrdt = write_stage_info.crdt.write().unwrap();
wcrdt.insert(&leader2_info.info); wcrdt.insert(&leader2_info.info);
wcrdt.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, leader2_keypair.pubkey()); wcrdt.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, leader2_keypair.pubkey());
} }
@ -401,17 +400,18 @@ mod tests {
// checking the schedule, and exit // checking the schedule, and exit
for _ in 0..LEADER_ROTATION_INTERVAL { for _ in 0..LEADER_ROTATION_INTERVAL {
let new_entry = recorder.record(vec![]); let new_entry = recorder.record(vec![]);
entry_sender.send(new_entry).unwrap(); write_stage_info.entry_sender.send(new_entry).unwrap();
} }
assert_eq!( assert_eq!(
write_stage.join().unwrap(), write_stage_info.write_stage.join().unwrap(),
WriteStageReturnType::LeaderRotation WriteStageReturnType::LeaderRotation
); );
// Make sure the ledger contains exactly LEADER_ROTATION_INTERVAL entries // Make sure the ledger contains exactly LEADER_ROTATION_INTERVAL entries
let (entry_height, _) = process_ledger(&leader_ledger_path, &bank); let (entry_height, _) =
remove_dir_all(leader_ledger_path).unwrap(); process_ledger(&write_stage_info.leader_ledger_path, &write_stage_info.bank);
remove_dir_all(write_stage_info.leader_ledger_path).unwrap();
assert_eq!(entry_height, 2 * LEADER_ROTATION_INTERVAL); assert_eq!(entry_height, 2 * LEADER_ROTATION_INTERVAL);
} }
} }

View File

@ -56,23 +56,14 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<Crdt>>, Pubkey) {
fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> { fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
//lets spy on the network //lets spy on the network
let (ncp, spy_ref, me) = make_spy_node(leader); let (ncp, spy_ref, _) = make_spy_node(leader);
//wait for the network to converge //wait for the network to converge
let mut converged = false; let mut converged = false;
let mut rv = vec![]; let mut rv = vec![];
for _ in 0..30 { for _ in 0..30 {
let num = spy_ref.read().unwrap().convergence(); let num = spy_ref.read().unwrap().convergence();
let mut v: Vec<NodeInfo> = spy_ref let mut v = spy_ref.read().unwrap().get_valid_peers();
.read()
.unwrap()
.table
.values()
.into_iter()
.filter(|x| x.id != me)
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
.cloned()
.collect();
if num >= num_nodes as u64 && v.len() >= num_nodes { if num >= num_nodes as u64 && v.len() >= num_nodes {
rv.append(&mut v); rv.append(&mut v);
converged = true; converged = true;
@ -753,22 +744,13 @@ fn test_leader_to_validator_transition() {
// Make an extra node for our leader to broadcast to, // Make an extra node for our leader to broadcast to,
// who won't vote and mess with our leader's entry count // who won't vote and mess with our leader's entry count
let (ncp, spy_node, me) = make_spy_node(&leader_info); let (ncp, spy_node, _) = make_spy_node(&leader_info);
// Wait for the leader to see the spy node // Wait for the leader to see the spy node
let mut converged = false; let mut converged = false;
for _ in 0..30 { for _ in 0..30 {
let num = spy_node.read().unwrap().convergence(); let num = spy_node.read().unwrap().convergence();
let mut v: Vec<NodeInfo> = spy_node let mut v: Vec<NodeInfo> = spy_node.read().unwrap().get_valid_peers();
.read()
.unwrap()
.table
.values()
.into_iter()
.filter(|x| x.id != me)
.filter(|x| Crdt::is_valid_address(&x.contact_info.rpu))
.cloned()
.collect();
// There's only one person excluding the spy node (the leader) who should see // There's only one person excluding the spy node (the leader) who should see
// two nodes on the network // two nodes on the network
if num >= 2 as u64 && v.len() >= 1 { if num >= 2 as u64 && v.len() >= 1 {
@ -780,7 +762,7 @@ fn test_leader_to_validator_transition() {
assert!(converged); assert!(converged);
let extra_transactions = std::cmp::max(LEADER_ROTATION_INTERVAL / 4, 1); let extra_transactions = std::cmp::max(LEADER_ROTATION_INTERVAL / 20, 1);
// Push leader "extra_transactions" past LEADER_ROTATION_INTERVAL entry height, // Push leader "extra_transactions" past LEADER_ROTATION_INTERVAL entry height,
// make sure the leader stops. // make sure the leader stops.
@ -788,7 +770,8 @@ fn test_leader_to_validator_transition() {
for i in genesis_height..(LEADER_ROTATION_INTERVAL + extra_transactions) { for i in genesis_height..(LEADER_ROTATION_INTERVAL + extra_transactions) {
let expected_balance = std::cmp::min( let expected_balance = std::cmp::min(
LEADER_ROTATION_INTERVAL - genesis_height, LEADER_ROTATION_INTERVAL - genesis_height,
i - genesis_height); i - genesis_height,
);
send_tx_and_retry_get_balance( send_tx_and_retry_get_balance(
&leader_info, &leader_info,