diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs index 20a3bd0b48..889e8709d9 100644 --- a/src/choose_gossip_peer_strategy.rs +++ b/src/choose_gossip_peer_strategy.rs @@ -1,6 +1,6 @@ use crdt::ReplicatedData; -use rand::thread_rng; use rand::distributions::{IndependentSample, Weighted, WeightedChoice}; +use rand::thread_rng; use result::{Error, Result}; use signature::PublicKey; use std; @@ -9,8 +9,7 @@ use std::collections::HashMap; pub const DEFAULT_WEIGHT: u32 = 1; pub trait ChooseGossipPeerStrategy { - fn choose_peer(&self, options: Vec<&ReplicatedData>) -> - Result; + fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result; } pub struct ChooseRandomPeerStrategy<'a> { @@ -18,7 +17,7 @@ pub struct ChooseRandomPeerStrategy<'a> { } impl<'a> ChooseRandomPeerStrategy<'a> { - pub fn new(random: &'a Fn() -> u64,) -> Self { + pub fn new(random: &'a Fn() -> u64) -> Self { ChooseRandomPeerStrategy { random } } } @@ -45,9 +44,12 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { remote: &'a HashMap, external_liveness: &'a HashMap>, get_stake: &'a Fn(PublicKey) -> f64, - ) -> Self - { - ChooseWeightedPeerStrategy { remote, external_liveness, get_stake } + ) -> Self { + ChooseWeightedPeerStrategy { + remote, + external_liveness, + get_stake, + } } fn calculate_weighted_remote_index(&self, peer_id: PublicKey) -> u32 { @@ -60,7 +62,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { } let liveness_entry = self.external_liveness.get(&peer_id); - if liveness_entry.is_none(){ + if liveness_entry.is_none() { return DEFAULT_WEIGHT; } @@ -73,61 +75,55 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { // Calculate the weighted average of the rumors let mut relevant_votes = vec![]; - let total_stake = votes.iter().fold( - 0.0, - |total_stake, (&id, &vote)| { - let stake = (self.get_stake)(id); - // If the total stake is going to overflow u64, pick - // the larger of either the current total_stake, or the - // new stake, this way we are guaranteed to get at least u64/2 - // sample of stake in our weighted calculation - if std::f64::MAX - total_stake < stake { - if stake > total_stake { - relevant_votes = vec![(stake, vote)]; - stake - } else { - total_stake - } + let total_stake = votes.iter().fold(0.0, |total_stake, (&id, &vote)| { + let stake = (self.get_stake)(id); + // If the total stake is going to overflow u64, pick + // the larger of either the current total_stake, or the + // new stake, this way we are guaranteed to get at least u64/2 + // sample of stake in our weighted calculation + if std::f64::MAX - total_stake < stake { + if stake > total_stake { + relevant_votes = vec![(stake, vote)]; + stake } else { - relevant_votes.push((stake, vote)); - total_stake + stake + total_stake } + } else { + relevant_votes.push((stake, vote)); + total_stake + stake } - ); + }); - let weighted_vote = relevant_votes.iter().fold( - 0.0, - |sum, &(stake, vote)| { - if vote < last_seen_index { - // This should never happen b/c we maintain the invariant that the indexes - // in the external_liveness table are always greater than the corresponding - // indexes in the remote table, if the index exists in the remote table at all. - - // Case 1: Attempt to insert bigger index into the "external_liveness" table - // happens after an insertion into the "remote" table. In this case, - // (see apply_updates()) function, we prevent the insertion if the entry - // in the remote table >= the atempted insertion into the "external" liveness - // table. - - // Case 2: Bigger index in the "external_liveness" table inserted before - // a smaller insertion into the "remote" table. We clear the corresponding - // "external_liveness" table entry on all insertions into the "remote" table - // See apply_updates() function. + let weighted_vote = relevant_votes.iter().fold(0.0, |sum, &(stake, vote)| { + if vote < last_seen_index { + // This should never happen b/c we maintain the invariant that the indexes + // in the external_liveness table are always greater than the corresponding + // indexes in the remote table, if the index exists in the remote table at all. - warn!("weighted peer index was smaller than local entry in remote table"); - return sum; - } + // Case 1: Attempt to insert bigger index into the "external_liveness" table + // happens after an insertion into the "remote" table. In this case, + // (see apply_updates()) function, we prevent the insertion if the entry + // in the remote table >= the atempted insertion into the "external" liveness + // table. - let vote_difference = (vote - last_seen_index) as f64; - let new_weight = vote_difference * (stake / total_stake); + // Case 2: Bigger index in the "external_liveness" table inserted before + // a smaller insertion into the "remote" table. We clear the corresponding + // "external_liveness" table entry on all insertions into the "remote" table + // See apply_updates() function. - if std::f64::MAX - sum < new_weight { - return f64::max(new_weight, sum); - } + warn!("weighted peer index was smaller than local entry in remote table"); + return sum; + } - sum + new_weight - }, - ); + let vote_difference = (vote - last_seen_index) as f64; + let new_weight = vote_difference * (stake / total_stake); + + if std::f64::MAX - sum < new_weight { + return f64::max(new_weight, sum); + } + + sum + new_weight + }); // Return u32 b/c the weighted sampling API from rand::distributions // only takes u32 for weights @@ -136,8 +132,8 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { } // If the weighted rumors we've heard about aren't any greater than - // what we've directly learned from the last time we communicated with the - // peer (i.e. weighted_vote == 0), then return a weight of 1. + // what we've directly learned from the last time we communicated with the + // peer (i.e. weighted_vote == 0), then return a weight of 1. // Otherwise, return the calculated weight. weighted_vote as u32 + DEFAULT_WEIGHT } @@ -159,17 +155,19 @@ impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { } let mut rng = thread_rng(); - Ok(WeightedChoice::new(&mut weighted_peers).ind_sample(&mut rng).clone()) + Ok(WeightedChoice::new(&mut weighted_peers) + .ind_sample(&mut rng) + .clone()) } } #[cfg(test)] mod tests { + use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT}; use logger; use signature::{KeyPair, KeyPairUtil, PublicKey}; use std; use std::collections::HashMap; - use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT}; fn get_stake(id: PublicKey) -> f64 { return 1.0; @@ -185,13 +183,10 @@ mod tests { let remote: HashMap = HashMap::new(); let external_liveness: HashMap> = HashMap::new(); - let weighted_strategy = ChooseWeightedPeerStrategy::new( - &remote, - &external_liveness, - &get_stake, - ); + let weighted_strategy = + ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake); - // If external_liveness table doesn't contain this entry, + // If external_liveness table doesn't contain this entry, // return the default weight let result = weighted_strategy.calculate_weighted_remote_index(key1); assert_eq!(result, DEFAULT_WEIGHT); @@ -209,17 +204,14 @@ mod tests { let mut external_liveness: HashMap> = HashMap::new(); // If only the liveness table contains the entry, should return the - // weighted liveness entries - let test_value : u32 = 5; + // weighted liveness entries + let test_value: u32 = 5; let mut rumors: HashMap = HashMap::new(); rumors.insert(key2, test_value as u64); external_liveness.insert(key1, rumors); - let weighted_strategy = ChooseWeightedPeerStrategy::new( - &remote, - &external_liveness, - &get_stake, - ); + let weighted_strategy = + ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake); let result = weighted_strategy.calculate_weighted_remote_index(key1); assert_eq!(result, test_value + DEFAULT_WEIGHT); @@ -242,11 +234,8 @@ mod tests { rumors.insert(key2, test_value); external_liveness.insert(key1, rumors); - let weighted_strategy = ChooseWeightedPeerStrategy::new( - &remote, - &external_liveness, - &get_stake, - ); + let weighted_strategy = + ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake); let result = weighted_strategy.calculate_weighted_remote_index(key1); assert_eq!(result, std::u32::MAX); @@ -262,7 +251,7 @@ mod tests { let mut remote: HashMap = HashMap::new(); let mut external_liveness: HashMap> = HashMap::new(); - // Test many validators' rumors in external_liveness + // Test many validators' rumors in external_liveness let num_peers = 10; let mut rumors: HashMap = HashMap::new(); @@ -275,14 +264,11 @@ mod tests { external_liveness.insert(key1, rumors); - let weighted_strategy = ChooseWeightedPeerStrategy::new( - &remote, - &external_liveness, - &get_stake, - ); + let weighted_strategy = + ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake); let result = weighted_strategy.calculate_weighted_remote_index(key1); - assert_eq!(result, (num_peers/2) as u32); + assert_eq!(result, (num_peers / 2) as u32); } #[test] @@ -295,7 +281,7 @@ mod tests { let mut remote: HashMap = HashMap::new(); let mut external_liveness: HashMap> = HashMap::new(); - // Test many validators' rumors in external_liveness + // Test many validators' rumors in external_liveness let num_peers = 10; let old_index = 20; let mut rumors: HashMap = HashMap::new(); @@ -309,15 +295,12 @@ mod tests { external_liveness.insert(key1, rumors); - let weighted_strategy = ChooseWeightedPeerStrategy::new( - &remote, - &external_liveness, - &get_stake, - ); + let weighted_strategy = + ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake); let result = weighted_strategy.calculate_weighted_remote_index(key1); // If nobody has seen a newer update then rever to default assert_eq!(result, DEFAULT_WEIGHT); } -} \ No newline at end of file +} diff --git a/src/crdt.rs b/src/crdt.rs index bb2031503b..a9b125444c 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -15,11 +15,8 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; -use choose_gossip_peer_strategy::{ - ChooseGossipPeerStrategy, - ChooseRandomPeerStrategy, - ChooseWeightedPeerStrategy, -}; +use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseRandomPeerStrategy, + ChooseWeightedPeerStrategy}; use hash::Hash; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; @@ -241,11 +238,7 @@ impl Crdt { self.insert(&me); } - pub fn get_external_liveness_entry( - &self, - key: &PublicKey, - ) -> Option<&HashMap> - { + pub fn get_external_liveness_entry(&self, key: &PublicKey) -> Option<&HashMap> { self.external_liveness.get(key) } @@ -309,6 +302,9 @@ impl Crdt { self.remote.remove(id); self.local.remove(id); self.external_liveness.remove(id); + for map in self.external_liveness.values_mut() { + map.remove(id); + } } } @@ -555,7 +551,7 @@ impl Crdt { self.table.len() ); return Err(Error::CrdtTooSmall); - }, + } Err(e) => return Err(e), }; @@ -627,11 +623,12 @@ impl Crdt { /// * `update_index` - the number of updates that `from` has completed and this set of `data` represents /// * `data` - the update data fn apply_updates( - &mut self, from: PublicKey, + &mut self, + from: PublicKey, update_index: u64, data: &[ReplicatedData], external_liveness: &[(PublicKey, u64)], - ){ + ) { trace!("got updates {}", data.len()); // TODO we need to punish/spam resist here // sig verify the whole update and slash anyone who sends a bad update @@ -640,12 +637,11 @@ impl Crdt { } for (pk, external_remote_index) in external_liveness.iter() { - let remote_entry = - if let Some(v) = self.remote.get(pk) { - *v - } else { - 0 - }; + let remote_entry = if let Some(v) = self.remote.get(pk) { + *v + } else { + 0 + }; if remote_entry >= *external_remote_index { continue; @@ -757,7 +753,10 @@ impl Crdt { let me = obj.read().unwrap(); // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = me.get_updates_since(v); - let external_liveness = me.remote.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); + let external_liveness = me.remote + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); drop(me); trace!("get updates since response {} {}", v, data.len()); let len = data.len(); diff --git a/src/lib.rs b/src/lib.rs index d551c68b73..7f17a0f66f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod bank; pub mod banking_stage; pub mod blob_fetch_stage; pub mod budget; +mod choose_gossip_peer_strategy; pub mod crdt; pub mod drone; pub mod entry; @@ -47,7 +48,6 @@ pub mod transaction; pub mod tvu; pub mod window_stage; pub mod write_stage; -mod choose_gossip_peer_strategy; extern crate bincode; extern crate byteorder; extern crate chrono;