Cleanup stakes for gossip (#2860)

This commit is contained in:
Sagar Dhawan
2019-02-20 20:02:47 -08:00
committed by GitHub
parent 1cd88968cf
commit 3c62e2332e
6 changed files with 82 additions and 71 deletions

View File

@ -8,6 +8,7 @@ use crate::last_id_queue::LastIdQueue;
use crate::runtime::{self, RuntimeError}; use crate::runtime::{self, RuntimeError};
use crate::status_cache::StatusCache; use crate::status_cache::StatusCache;
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use hashbrown::HashMap;
use log::{debug, info, Level}; use log::{debug, info, Level};
use solana_metrics::counter::Counter; use solana_metrics::counter::Counter;
use solana_sdk::account::Account; use solana_sdk::account::Account;
@ -596,6 +597,16 @@ impl Bank {
.collect() .collect()
} }
/// Collect all the stakes into a Map keyed on the Node id.
pub fn get_stakes(&self) -> HashMap<Pubkey, u64> {
let map: HashMap<_, _> = self
.vote_states(|_| true)
.iter()
.map(|state| (state.node_id, self.get_balance(&state.staker_id)))
.collect();
map
}
pub fn tick_height(&self) -> u64 { pub fn tick_height(&self) -> u64 {
self.last_id_queue.read().unwrap().tick_height self.last_id_queue.read().unwrap().tick_height
} }

View File

@ -174,16 +174,16 @@ impl ClusterInfo {
let id = node_info.id; let id = node_info.id;
me.gossip.set_self(id); me.gossip.set_self(id);
me.insert_info(node_info); me.insert_info(node_info);
me.push_self(None); me.push_self(&HashMap::new());
me me
} }
pub fn push_self(&mut self, bank: Option<&Arc<Bank>>) { pub fn push_self(&mut self, stakes: &HashMap<Pubkey, u64>) {
let mut my_data = self.my_data(); let mut my_data = self.my_data();
let now = timestamp(); let now = timestamp();
my_data.wallclock = now; my_data.wallclock = now;
let mut entry = CrdsValue::ContactInfo(my_data); let mut entry = CrdsValue::ContactInfo(my_data);
entry.sign(&self.keypair); entry.sign(&self.keypair);
self.gossip.refresh_push_active_set(bank); self.gossip.refresh_push_active_set(stakes);
self.gossip.process_push_message(&[entry], now); self.gossip.process_push_message(&[entry], now);
} }
pub fn insert_info(&mut self, node_info: NodeInfo) { pub fn insert_info(&mut self, node_info: NodeInfo) {
@ -756,11 +756,11 @@ impl ClusterInfo {
Ok((addr, out)) Ok((addr, out))
} }
fn new_pull_requests(&mut self, bank: Option<&Arc<Bank>>) -> Vec<(SocketAddr, Protocol)> { fn new_pull_requests(&mut self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let now = timestamp(); let now = timestamp();
let pulls: Vec<_> = self let pulls: Vec<_> = self
.gossip .gossip
.new_pull_request(now, bank) .new_pull_request(now, stakes)
.ok() .ok()
.into_iter() .into_iter()
.collect(); .collect();
@ -800,8 +800,8 @@ impl ClusterInfo {
.collect() .collect()
} }
fn gossip_request(&mut self, bank: Option<&Arc<Bank>>) -> Vec<(SocketAddr, Protocol)> { fn gossip_request(&mut self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let pulls: Vec<_> = self.new_pull_requests(bank); let pulls: Vec<_> = self.new_pull_requests(stakes);
let pushes: Vec<_> = self.new_push_requests(); let pushes: Vec<_> = self.new_push_requests();
vec![pulls, pushes].into_iter().flat_map(|x| x).collect() vec![pulls, pushes].into_iter().flat_map(|x| x).collect()
} }
@ -809,10 +809,10 @@ impl ClusterInfo {
/// At random pick a node and try to get updated changes from them /// At random pick a node and try to get updated changes from them
fn run_gossip( fn run_gossip(
obj: &Arc<RwLock<Self>>, obj: &Arc<RwLock<Self>>,
bank: Option<&Arc<Bank>>, stakes: &HashMap<Pubkey, u64>,
blob_sender: &BlobSender, blob_sender: &BlobSender,
) -> Result<()> { ) -> Result<()> {
let reqs = obj.write().unwrap().gossip_request(bank); let reqs = obj.write().unwrap().gossip_request(&stakes);
let blobs = reqs let blobs = reqs
.into_iter() .into_iter()
.filter_map(|(remote_gossip_addr, req)| to_shared_blob(req, remote_gossip_addr).ok()) .filter_map(|(remote_gossip_addr, req)| to_shared_blob(req, remote_gossip_addr).ok())
@ -864,7 +864,11 @@ impl ClusterInfo {
let mut last_push = timestamp(); let mut last_push = timestamp();
loop { loop {
let start = timestamp(); let start = timestamp();
let _ = Self::run_gossip(&obj, bank.as_ref(), &blob_sender); let stakes: HashMap<_, _> = match bank {
Some(ref bank) => bank.get_stakes(),
None => HashMap::new(),
};
let _ = Self::run_gossip(&obj, &stakes, &blob_sender);
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return; return;
} }
@ -872,7 +876,7 @@ impl ClusterInfo {
//TODO: possibly tune this parameter //TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep //we saw a deadlock passing an obj.read().unwrap().timeout into sleep
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
obj.write().unwrap().push_self(bank.as_ref()); obj.write().unwrap().push_self(&stakes);
last_push = timestamp(); last_push = timestamp();
} }
let elapsed = timestamp() - start; let elapsed = timestamp() - start;
@ -1461,8 +1465,11 @@ mod tests {
.write() .write()
.unwrap() .unwrap()
.gossip .gossip
.refresh_push_active_set(None); .refresh_push_active_set(&HashMap::new());
let reqs = cluster_info.write().unwrap().gossip_request(None); let reqs = cluster_info
.write()
.unwrap()
.gossip_request(&HashMap::new());
//assert none of the addrs are invalid. //assert none of the addrs are invalid.
reqs.iter().all(|(addr, _)| { reqs.iter().all(|(addr, _)| {
let res = ContactInfo::is_valid_address(addr); let res = ContactInfo::is_valid_address(addr);
@ -1740,7 +1747,7 @@ mod tests {
let (_, _, val) = cluster_info let (_, _, val) = cluster_info
.gossip .gossip
.new_pull_request(timestamp(), None) .new_pull_request(timestamp(), &HashMap::new())
.ok() .ok()
.unwrap(); .unwrap();
assert!(val.verify()); assert!(val.verify());

View File

@ -8,11 +8,10 @@ use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CrdsGossipPull; use crate::crds_gossip_pull::CrdsGossipPull;
use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}; use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE};
use crate::crds_value::CrdsValue; use crate::crds_value::CrdsValue;
use solana_runtime::bank::Bank; use hashbrown::HashMap;
use solana_runtime::bloom::Bloom; use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::sync::Arc;
///The min size for bloom filters ///The min size for bloom filters
pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000;
@ -94,10 +93,10 @@ impl CrdsGossip {
/// refresh the push active set /// refresh the push active set
/// * ratio - number of actives to rotate /// * ratio - number of actives to rotate
pub fn refresh_push_active_set(&mut self, bank: Option<&Arc<Bank>>) { pub fn refresh_push_active_set(&mut self, stakes: &HashMap<Pubkey, u64>) {
self.push.refresh_push_active_set( self.push.refresh_push_active_set(
&self.crds, &self.crds,
bank, stakes,
self.id, self.id,
self.pull.pull_request_time.len(), self.pull.pull_request_time.len(),
CRDS_GOSSIP_NUM_ACTIVE, CRDS_GOSSIP_NUM_ACTIVE,
@ -108,9 +107,9 @@ impl CrdsGossip {
pub fn new_pull_request( pub fn new_pull_request(
&self, &self,
now: u64, now: u64,
bank: Option<&Arc<Bank>>, stakes: &HashMap<Pubkey, u64>,
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> { ) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> {
self.pull.new_pull_request(&self.crds, self.id, now, bank) self.pull.new_pull_request(&self.crds, self.id, now, stakes)
} }
/// time when a request to `from` was initiated /// time when a request to `from` was initiated
@ -160,16 +159,11 @@ impl CrdsGossip {
} }
} }
/// Computes a normalized(log of bank balance) stake /// Computes a normalized(log of actual stake) stake
pub fn get_stake(id: &Pubkey, bank: Option<&Arc<Bank>>) -> f32 { pub fn get_stake<S: std::hash::BuildHasher>(id: &Pubkey, stakes: &HashMap<Pubkey, u64, S>) -> f32 {
match bank { // cap the max balance to u32 max (it should be plenty)
Some(bank) => { let bal = f64::from(u32::max_value()).min(*stakes.get(id).unwrap_or(&0) as f64);
// cap the max balance to u32 max (it should be plenty) 1_f32.max((bal as f32).ln())
let bal = f64::from(u32::max_value()).min(bank.get_balance(id) as f64);
1_f32.max((bal as f32).ln())
}
_ => 1.0,
}
} }
/// Computes bounded weight given some max, a time since last selected, and a stake value /// Computes bounded weight given some max, a time since last selected, and a stake value
@ -200,7 +194,7 @@ mod test {
.crds .crds
.insert(CrdsValue::ContactInfo(ci.clone()), 0) .insert(CrdsValue::ContactInfo(ci.clone()), 0)
.unwrap(); .unwrap();
crds_gossip.refresh_push_active_set(None); crds_gossip.refresh_push_active_set(&HashMap::new());
let now = timestamp(); let now = timestamp();
//incorrect dest //incorrect dest
let mut res = crds_gossip.process_prune_msg( let mut res = crds_gossip.process_prune_msg(

View File

@ -19,13 +19,11 @@ use bincode::serialized_size;
use hashbrown::HashMap; use hashbrown::HashMap;
use rand; use rand;
use rand::distributions::{Distribution, WeightedIndex}; use rand::distributions::{Distribution, WeightedIndex};
use solana_runtime::bank::Bank;
use solana_runtime::bloom::Bloom; use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::cmp; use std::cmp;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::Arc;
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
@ -57,7 +55,7 @@ impl CrdsGossipPull {
crds: &Crds, crds: &Crds,
self_id: Pubkey, self_id: Pubkey,
now: u64, now: u64,
bank: Option<&Arc<Bank>>, stakes: &HashMap<Pubkey, u64>,
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> { ) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> {
let options: Vec<_> = crds let options: Vec<_> = crds
.table .table
@ -68,7 +66,7 @@ impl CrdsGossipPull {
let max_weight = f32::from(u16::max_value()) - 1.0; let max_weight = f32::from(u16::max_value()) - 1.0;
let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0); let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0);
let since = ((now - req_time) / 1024) as u32; let since = ((now - req_time) / 1024) as u32;
let stake = get_stake(&item.id, bank); let stake = get_stake(&item.id, stakes);
let weight = get_weight(max_weight, since, stake); let weight = get_weight(max_weight, since, stake);
(weight, item) (weight, item)
}) })
@ -203,15 +201,13 @@ mod test {
use super::*; use super::*;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::crds_value::LeaderId; use crate::crds_value::LeaderId;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::f32::consts::E; use std::f32::consts::E;
#[test] #[test]
fn test_new_pull_with_bank() { fn test_new_pull_with_stakes() {
let (block, mint_keypair) = GenesisBlock::new(500_000);
let bank = Arc::new(Bank::new(&block));
let mut crds = Crds::default(); let mut crds = Crds::default();
let mut stakes = HashMap::new();
let node = CrdsGossipPull::default(); let node = CrdsGossipPull::default();
let me = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); let me = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
crds.insert(me.clone(), 0).unwrap(); crds.insert(me.clone(), 0).unwrap();
@ -220,8 +216,7 @@ mod test {
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
let id = entry.label().pubkey(); let id = entry.label().pubkey();
crds.insert(entry.clone(), 0).unwrap(); crds.insert(entry.clone(), 0).unwrap();
bank.transfer(i * 100, &mint_keypair, id, bank.last_id()) stakes.insert(id, i * 100);
.unwrap();
} }
// The min balance of the heaviest nodes is at least ln(3000) - 0.5 // The min balance of the heaviest nodes is at least ln(3000) - 0.5
// This is because the heaviest nodes will have very similar weights // This is because the heaviest nodes will have very similar weights
@ -230,9 +225,9 @@ mod test {
// try upto 10 times because of rng // try upto 10 times because of rng
for _ in 0..10 { for _ in 0..10 {
let msg = node let msg = node
.new_pull_request(&crds, me.label().pubkey(), now, Some(&bank)) .new_pull_request(&crds, me.label().pubkey(), now, &stakes)
.unwrap(); .unwrap();
if bank.get_balance(&msg.0) >= min_balance.round() as u64 { if *stakes.get(&msg.0).unwrap_or(&0) >= min_balance.round() as u64 {
return; return;
} }
} }
@ -246,19 +241,19 @@ mod test {
let id = entry.label().pubkey(); let id = entry.label().pubkey();
let node = CrdsGossipPull::default(); let node = CrdsGossipPull::default();
assert_eq!( assert_eq!(
node.new_pull_request(&crds, id, 0, None), node.new_pull_request(&crds, id, 0, &HashMap::new()),
Err(CrdsGossipError::NoPeers) Err(CrdsGossipError::NoPeers)
); );
crds.insert(entry.clone(), 0).unwrap(); crds.insert(entry.clone(), 0).unwrap();
assert_eq!( assert_eq!(
node.new_pull_request(&crds, id, 0, None), node.new_pull_request(&crds, id, 0, &HashMap::new()),
Err(CrdsGossipError::NoPeers) Err(CrdsGossipError::NoPeers)
); );
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
crds.insert(new.clone(), 0).unwrap(); crds.insert(new.clone(), 0).unwrap();
let req = node.new_pull_request(&crds, id, 0, None); let req = node.new_pull_request(&crds, id, 0, &HashMap::new());
let (to, _, self_info) = req.unwrap(); let (to, _, self_info) = req.unwrap();
assert_eq!(to, new.label().pubkey()); assert_eq!(to, new.label().pubkey());
assert_eq!(self_info, entry); assert_eq!(self_info, entry);
@ -281,7 +276,7 @@ mod test {
// odds of getting the other request should be 1 in u64::max_value() // odds of getting the other request should be 1 in u64::max_value()
for _ in 0..10 { for _ in 0..10 {
let req = node.new_pull_request(&crds, node_id, u64::max_value(), None); let req = node.new_pull_request(&crds, node_id, u64::max_value(), &HashMap::new());
let (to, _, self_info) = req.unwrap(); let (to, _, self_info) = req.unwrap();
assert_eq!(to, old.label().pubkey()); assert_eq!(to, old.label().pubkey());
assert_eq!(self_info, entry); assert_eq!(self_info, entry);
@ -297,7 +292,7 @@ mod test {
node_crds.insert(entry.clone(), 0).unwrap(); node_crds.insert(entry.clone(), 0).unwrap();
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
node_crds.insert(new.clone(), 0).unwrap(); node_crds.insert(new.clone(), 0).unwrap();
let req = node.new_pull_request(&node_crds, node_id, 0, None); let req = node.new_pull_request(&node_crds, node_id, 0, &HashMap::new());
let mut dest_crds = Crds::default(); let mut dest_crds = Crds::default();
let mut dest = CrdsGossipPull::default(); let mut dest = CrdsGossipPull::default();
@ -349,7 +344,7 @@ mod test {
let mut done = false; let mut done = false;
for _ in 0..30 { for _ in 0..30 {
// there is a chance of a false positive with bloom filters // there is a chance of a false positive with bloom filters
let req = node.new_pull_request(&node_crds, node_id, 0, None); let req = node.new_pull_request(&node_crds, node_id, 0, &HashMap::new());
let (_, filter, caller) = req.unwrap(); let (_, filter, caller) = req.unwrap();
let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0); let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0);
// if there is a false positive this is empty // if there is a false positive this is empty

View File

@ -20,13 +20,11 @@ use indexmap::map::IndexMap;
use rand; use rand;
use rand::distributions::{Distribution, WeightedIndex}; use rand::distributions::{Distribution, WeightedIndex};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use solana_runtime::bank::Bank;
use solana_runtime::bloom::Bloom; use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp; use solana_sdk::timing::timestamp;
use std::cmp; use std::cmp;
use std::sync::Arc;
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
@ -164,7 +162,7 @@ impl CrdsGossipPush {
pub fn refresh_push_active_set( pub fn refresh_push_active_set(
&mut self, &mut self,
crds: &Crds, crds: &Crds,
bank: Option<&Arc<Bank>>, stakes: &HashMap<Pubkey, u64>,
self_id: Pubkey, self_id: Pubkey,
network_size: usize, network_size: usize,
ratio: usize, ratio: usize,
@ -182,7 +180,7 @@ impl CrdsGossipPush {
let max_weight = f32::from(u16::max_value()) - 1.0; let max_weight = f32::from(u16::max_value()) - 1.0;
let last_updated: u64 = value.local_timestamp; let last_updated: u64 = value.local_timestamp;
let since = ((timestamp() - last_updated) / 1024) as u32; let since = ((timestamp() - last_updated) / 1024) as u32;
let stake = get_stake(&info.id, bank); let stake = get_stake(&info.id, stakes);
let weight = get_weight(max_weight, since, stake); let weight = get_weight(max_weight, since, stake);
(weight, info) (weight, info)
}) })
@ -260,7 +258,6 @@ impl CrdsGossipPush {
mod test { mod test {
use super::*; use super::*;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::f32::consts::E; use std::f32::consts::E;
@ -365,14 +362,14 @@ mod test {
let value1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); let value1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(crds.insert(value1.clone(), 0), Ok(None)); assert_eq!(crds.insert(value1.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
assert!(push.active_set.get(&value1.label().pubkey()).is_some()); assert!(push.active_set.get(&value1.label().pubkey()).is_some());
let value2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); let value2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert!(push.active_set.get(&value2.label().pubkey()).is_none()); assert!(push.active_set.get(&value2.label().pubkey()).is_none());
assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
for _ in 0..30 { for _ in 0..30 {
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
if push.active_set.get(&value2.label().pubkey()).is_some() { if push.active_set.get(&value2.label().pubkey()).is_some() {
break; break;
} }
@ -384,32 +381,30 @@ mod test {
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
} }
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
assert_eq!(push.active_set.len(), push.num_active); assert_eq!(push.active_set.len(), push.num_active);
} }
#[test] #[test]
fn test_active_set_refresh_with_bank() { fn test_active_set_refresh_with_bank() {
let (block, mint_keypair) = GenesisBlock::new(100_000_000);
let bank = Arc::new(Bank::new(&block));
let time = timestamp() - 1024; //make sure there's at least a 1 second delay let time = timestamp() - 1024; //make sure there's at least a 1 second delay
let mut crds = Crds::default(); let mut crds = Crds::default();
let mut push = CrdsGossipPush::default(); let mut push = CrdsGossipPush::default();
let mut stakes = HashMap::new();
for i in 1..=100 { for i in 1..=100 {
let peer = let peer =
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), time)); CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), time));
let id = peer.label().pubkey(); let id = peer.label().pubkey();
crds.insert(peer.clone(), time).unwrap(); crds.insert(peer.clone(), time).unwrap();
bank.transfer(i * 100, &mint_keypair, id, bank.last_id()) stakes.insert(id, i * 100);
.unwrap();
} }
let min_balance = E.powf(7000_f32.ln() - 0.5); let min_balance = E.powf(7000_f32.ln() - 0.5);
// try upto 10 times because of rng // try upto 10 times because of rng
for _ in 0..10 { for _ in 0..10 {
push.refresh_push_active_set(&crds, Some(&bank), Pubkey::default(), 100, 30); push.refresh_push_active_set(&crds, &stakes, Pubkey::default(), 100, 30);
let mut num_correct = 0; let mut num_correct = 0;
let mut num_wrong = 0; let mut num_wrong = 0;
push.active_set.iter().for_each(|peer| { push.active_set.iter().for_each(|peer| {
if bank.get_balance(peer.0) >= min_balance as u64 { if *stakes.get(peer.0).unwrap_or(&0) >= min_balance as u64 {
num_correct += 1; num_correct += 1;
} else { } else {
num_wrong += 1; num_wrong += 1;
@ -431,7 +426,7 @@ mod test {
let mut push = CrdsGossipPush::default(); let mut push = CrdsGossipPush::default();
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
let new_msg = let new_msg =
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
@ -451,7 +446,7 @@ mod test {
let mut push = CrdsGossipPush::default(); let mut push = CrdsGossipPush::default();
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
let new_msg = let new_msg =
CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
@ -471,7 +466,7 @@ mod test {
let mut push = CrdsGossipPush::default(); let mut push = CrdsGossipPush::default();
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1);
let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0); let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
ci.wallclock = 1; ci.wallclock = 1;

View File

@ -1,4 +1,5 @@
use bincode::serialized_size; use bincode::serialized_size;
use hashbrown::HashMap;
use log::trace; use log::trace;
use rayon::prelude::*; use rayon::prelude::*;
use solana::cluster_info::NodeInfo; use solana::cluster_info::NodeInfo;
@ -12,7 +13,6 @@ use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp; use solana_sdk::timing::timestamp;
use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
type Node = Arc<Mutex<CrdsGossip>>; type Node = Arc<Mutex<CrdsGossip>>;
@ -112,7 +112,9 @@ fn network_simulator(network: &mut Network) {
// make sure there is someone in the active set // make sure there is someone in the active set
let network_values: Vec<Node> = network.values().cloned().collect(); let network_values: Vec<Node> = network.values().cloned().collect();
network_values.par_iter().for_each(|node| { network_values.par_iter().for_each(|node| {
node.lock().unwrap().refresh_push_active_set(None); node.lock()
.unwrap()
.refresh_push_active_set(&HashMap::new());
}); });
let mut total_bytes = bytes_tx; let mut total_bytes = bytes_tx;
for second in 1..num { for second in 1..num {
@ -211,7 +213,9 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
} }
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 { if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
network_values.par_iter().for_each(|node| { network_values.par_iter().for_each(|node| {
node.lock().unwrap().refresh_push_active_set(None); node.lock()
.unwrap()
.refresh_push_active_set(&HashMap::new());
}); });
} }
total = network_values total = network_values
@ -249,7 +253,12 @@ fn network_run_pull(
let requests: Vec<_> = { let requests: Vec<_> = {
network_values network_values
.par_iter() .par_iter()
.filter_map(|from| from.lock().unwrap().new_pull_request(now, None).ok()) .filter_map(|from| {
from.lock()
.unwrap()
.new_pull_request(now, &HashMap::new())
.ok()
})
.collect() .collect()
}; };
let transfered: Vec<_> = requests let transfered: Vec<_> = requests
@ -372,7 +381,7 @@ fn test_prune_errors() {
.crds .crds
.insert(CrdsValue::ContactInfo(ci.clone()), 0) .insert(CrdsValue::ContactInfo(ci.clone()), 0)
.unwrap(); .unwrap();
crds_gossip.refresh_push_active_set(None); crds_gossip.refresh_push_active_set(&HashMap::new());
let now = timestamp(); let now = timestamp();
//incorrect dest //incorrect dest
let mut res = crds_gossip.process_prune_msg( let mut res = crds_gossip.process_prune_msg(