diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 3a37a8219b..1a1c4ae37f 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -8,6 +8,7 @@ use crate::last_id_queue::LastIdQueue; use crate::runtime::{self, RuntimeError}; use crate::status_cache::StatusCache; use bincode::{deserialize, serialize}; +use hashbrown::HashMap; use log::{debug, info, Level}; use solana_metrics::counter::Counter; use solana_sdk::account::Account; @@ -596,6 +597,16 @@ impl Bank { .collect() } + /// Collect all the stakes into a Map keyed on the Node id. + pub fn get_stakes(&self) -> HashMap { + let map: HashMap<_, _> = self + .vote_states(|_| true) + .iter() + .map(|state| (state.node_id, self.get_balance(&state.staker_id))) + .collect(); + map + } + pub fn tick_height(&self) -> u64 { self.last_id_queue.read().unwrap().tick_height } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 7210055087..bedcbf4cd5 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -174,16 +174,16 @@ impl ClusterInfo { let id = node_info.id; me.gossip.set_self(id); me.insert_info(node_info); - me.push_self(None); + me.push_self(&HashMap::new()); me } - pub fn push_self(&mut self, bank: Option<&Arc>) { + pub fn push_self(&mut self, stakes: &HashMap) { let mut my_data = self.my_data(); let now = timestamp(); my_data.wallclock = now; let mut entry = CrdsValue::ContactInfo(my_data); entry.sign(&self.keypair); - self.gossip.refresh_push_active_set(bank); + self.gossip.refresh_push_active_set(stakes); self.gossip.process_push_message(&[entry], now); } pub fn insert_info(&mut self, node_info: NodeInfo) { @@ -756,11 +756,11 @@ impl ClusterInfo { Ok((addr, out)) } - fn new_pull_requests(&mut self, bank: Option<&Arc>) -> Vec<(SocketAddr, Protocol)> { + fn new_pull_requests(&mut self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); let pulls: Vec<_> = self .gossip - .new_pull_request(now, bank) + .new_pull_request(now, stakes) .ok() .into_iter() .collect(); @@ -800,8 +800,8 @@ impl ClusterInfo { .collect() } - fn gossip_request(&mut self, bank: Option<&Arc>) -> Vec<(SocketAddr, Protocol)> { - let pulls: Vec<_> = self.new_pull_requests(bank); + fn gossip_request(&mut self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { + let pulls: Vec<_> = self.new_pull_requests(stakes); let pushes: Vec<_> = self.new_push_requests(); vec![pulls, pushes].into_iter().flat_map(|x| x).collect() } @@ -809,10 +809,10 @@ impl ClusterInfo { /// At random pick a node and try to get updated changes from them fn run_gossip( obj: &Arc>, - bank: Option<&Arc>, + stakes: &HashMap, blob_sender: &BlobSender, ) -> Result<()> { - let reqs = obj.write().unwrap().gossip_request(bank); + let reqs = obj.write().unwrap().gossip_request(&stakes); let blobs = reqs .into_iter() .filter_map(|(remote_gossip_addr, req)| to_shared_blob(req, remote_gossip_addr).ok()) @@ -864,7 +864,11 @@ impl ClusterInfo { let mut last_push = timestamp(); loop { let start = timestamp(); - let _ = Self::run_gossip(&obj, bank.as_ref(), &blob_sender); + let stakes: HashMap<_, _> = match bank { + Some(ref bank) => bank.get_stakes(), + None => HashMap::new(), + }; + let _ = Self::run_gossip(&obj, &stakes, &blob_sender); if exit.load(Ordering::Relaxed) { return; } @@ -872,7 +876,7 @@ impl ClusterInfo { //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { - obj.write().unwrap().push_self(bank.as_ref()); + obj.write().unwrap().push_self(&stakes); last_push = timestamp(); } let elapsed = timestamp() - start; @@ -1461,8 +1465,11 @@ mod tests { .write() .unwrap() .gossip - .refresh_push_active_set(None); - let reqs = cluster_info.write().unwrap().gossip_request(None); + .refresh_push_active_set(&HashMap::new()); + let reqs = cluster_info + .write() + .unwrap() + .gossip_request(&HashMap::new()); //assert none of the addrs are invalid. reqs.iter().all(|(addr, _)| { let res = ContactInfo::is_valid_address(addr); @@ -1740,7 +1747,7 @@ mod tests { let (_, _, val) = cluster_info .gossip - .new_pull_request(timestamp(), None) + .new_pull_request(timestamp(), &HashMap::new()) .ok() .unwrap(); assert!(val.verify()); diff --git a/src/crds_gossip.rs b/src/crds_gossip.rs index b5cc55574f..84ffb0fdf2 100644 --- a/src/crds_gossip.rs +++ b/src/crds_gossip.rs @@ -8,11 +8,10 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CrdsGossipPull; use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}; use crate::crds_value::CrdsValue; -use solana_runtime::bank::Bank; +use hashbrown::HashMap; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; -use std::sync::Arc; ///The min size for bloom filters pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; @@ -94,10 +93,10 @@ impl CrdsGossip { /// refresh the push active set /// * ratio - number of actives to rotate - pub fn refresh_push_active_set(&mut self, bank: Option<&Arc>) { + pub fn refresh_push_active_set(&mut self, stakes: &HashMap) { self.push.refresh_push_active_set( &self.crds, - bank, + stakes, self.id, self.pull.pull_request_time.len(), CRDS_GOSSIP_NUM_ACTIVE, @@ -108,9 +107,9 @@ impl CrdsGossip { pub fn new_pull_request( &self, now: u64, - bank: Option<&Arc>, + stakes: &HashMap, ) -> Result<(Pubkey, Bloom, CrdsValue), CrdsGossipError> { - self.pull.new_pull_request(&self.crds, self.id, now, bank) + self.pull.new_pull_request(&self.crds, self.id, now, stakes) } /// time when a request to `from` was initiated @@ -160,16 +159,11 @@ impl CrdsGossip { } } -/// Computes a normalized(log of bank balance) stake -pub fn get_stake(id: &Pubkey, bank: Option<&Arc>) -> f32 { - match bank { - Some(bank) => { - // cap the max balance to u32 max (it should be plenty) - let bal = f64::from(u32::max_value()).min(bank.get_balance(id) as f64); - 1_f32.max((bal as f32).ln()) - } - _ => 1.0, - } +/// Computes a normalized(log of actual stake) stake +pub fn get_stake(id: &Pubkey, stakes: &HashMap) -> f32 { + // cap the max balance to u32 max (it should be plenty) + let bal = f64::from(u32::max_value()).min(*stakes.get(id).unwrap_or(&0) as f64); + 1_f32.max((bal as f32).ln()) } /// Computes bounded weight given some max, a time since last selected, and a stake value @@ -200,7 +194,7 @@ mod test { .crds .insert(CrdsValue::ContactInfo(ci.clone()), 0) .unwrap(); - crds_gossip.refresh_push_active_set(None); + crds_gossip.refresh_push_active_set(&HashMap::new()); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( diff --git a/src/crds_gossip_pull.rs b/src/crds_gossip_pull.rs index 406a05448e..8364c7088d 100644 --- a/src/crds_gossip_pull.rs +++ b/src/crds_gossip_pull.rs @@ -19,13 +19,11 @@ use bincode::serialized_size; use hashbrown::HashMap; use rand; use rand::distributions::{Distribution, WeightedIndex}; -use solana_runtime::bank::Bank; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::cmp; use std::collections::VecDeque; -use std::sync::Arc; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; @@ -57,7 +55,7 @@ impl CrdsGossipPull { crds: &Crds, self_id: Pubkey, now: u64, - bank: Option<&Arc>, + stakes: &HashMap, ) -> Result<(Pubkey, Bloom, CrdsValue), CrdsGossipError> { let options: Vec<_> = crds .table @@ -68,7 +66,7 @@ impl CrdsGossipPull { let max_weight = f32::from(u16::max_value()) - 1.0; let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0); let since = ((now - req_time) / 1024) as u32; - let stake = get_stake(&item.id, bank); + let stake = get_stake(&item.id, stakes); let weight = get_weight(max_weight, since, stake); (weight, item) }) @@ -203,15 +201,13 @@ mod test { use super::*; use crate::contact_info::ContactInfo; use crate::crds_value::LeaderId; - use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::f32::consts::E; #[test] - fn test_new_pull_with_bank() { - let (block, mint_keypair) = GenesisBlock::new(500_000); - let bank = Arc::new(Bank::new(&block)); + fn test_new_pull_with_stakes() { let mut crds = Crds::default(); + let mut stakes = HashMap::new(); let node = CrdsGossipPull::default(); let me = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); crds.insert(me.clone(), 0).unwrap(); @@ -220,8 +216,7 @@ mod test { CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); let id = entry.label().pubkey(); crds.insert(entry.clone(), 0).unwrap(); - bank.transfer(i * 100, &mint_keypair, id, bank.last_id()) - .unwrap(); + stakes.insert(id, i * 100); } // The min balance of the heaviest nodes is at least ln(3000) - 0.5 // This is because the heaviest nodes will have very similar weights @@ -230,9 +225,9 @@ mod test { // try upto 10 times because of rng for _ in 0..10 { let msg = node - .new_pull_request(&crds, me.label().pubkey(), now, Some(&bank)) + .new_pull_request(&crds, me.label().pubkey(), now, &stakes) .unwrap(); - if bank.get_balance(&msg.0) >= min_balance.round() as u64 { + if *stakes.get(&msg.0).unwrap_or(&0) >= min_balance.round() as u64 { return; } } @@ -246,19 +241,19 @@ mod test { let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); assert_eq!( - node.new_pull_request(&crds, id, 0, None), + node.new_pull_request(&crds, id, 0, &HashMap::new()), Err(CrdsGossipError::NoPeers) ); crds.insert(entry.clone(), 0).unwrap(); assert_eq!( - node.new_pull_request(&crds, id, 0, None), + node.new_pull_request(&crds, id, 0, &HashMap::new()), Err(CrdsGossipError::NoPeers) ); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&crds, id, 0, None); + let req = node.new_pull_request(&crds, id, 0, &HashMap::new()); let (to, _, self_info) = req.unwrap(); assert_eq!(to, new.label().pubkey()); assert_eq!(self_info, entry); @@ -281,7 +276,7 @@ mod test { // odds of getting the other request should be 1 in u64::max_value() for _ in 0..10 { - let req = node.new_pull_request(&crds, node_id, u64::max_value(), None); + let req = node.new_pull_request(&crds, node_id, u64::max_value(), &HashMap::new()); let (to, _, self_info) = req.unwrap(); assert_eq!(to, old.label().pubkey()); assert_eq!(self_info, entry); @@ -297,7 +292,7 @@ mod test { node_crds.insert(entry.clone(), 0).unwrap(); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); node_crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&node_crds, node_id, 0, None); + let req = node.new_pull_request(&node_crds, node_id, 0, &HashMap::new()); let mut dest_crds = Crds::default(); let mut dest = CrdsGossipPull::default(); @@ -349,7 +344,7 @@ mod test { let mut done = false; for _ in 0..30 { // there is a chance of a false positive with bloom filters - let req = node.new_pull_request(&node_crds, node_id, 0, None); + let req = node.new_pull_request(&node_crds, node_id, 0, &HashMap::new()); let (_, filter, caller) = req.unwrap(); let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0); // if there is a false positive this is empty diff --git a/src/crds_gossip_push.rs b/src/crds_gossip_push.rs index 8568549137..88a1b29a7a 100644 --- a/src/crds_gossip_push.rs +++ b/src/crds_gossip_push.rs @@ -20,13 +20,11 @@ use indexmap::map::IndexMap; use rand; use rand::distributions::{Distribution, WeightedIndex}; use rand::seq::SliceRandom; -use solana_runtime::bank::Bank; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::cmp; -use std::sync::Arc; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; @@ -164,7 +162,7 @@ impl CrdsGossipPush { pub fn refresh_push_active_set( &mut self, crds: &Crds, - bank: Option<&Arc>, + stakes: &HashMap, self_id: Pubkey, network_size: usize, ratio: usize, @@ -182,7 +180,7 @@ impl CrdsGossipPush { let max_weight = f32::from(u16::max_value()) - 1.0; let last_updated: u64 = value.local_timestamp; let since = ((timestamp() - last_updated) / 1024) as u32; - let stake = get_stake(&info.id, bank); + let stake = get_stake(&info.id, stakes); let weight = get_weight(max_weight, since, stake); (weight, info) }) @@ -260,7 +258,6 @@ impl CrdsGossipPush { mod test { use super::*; use crate::contact_info::ContactInfo; - use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::f32::consts::E; @@ -365,14 +362,14 @@ mod test { let value1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert_eq!(crds.insert(value1.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1); assert!(push.active_set.get(&value1.label().pubkey()).is_some()); let value2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert!(push.active_set.get(&value2.label().pubkey()).is_none()); assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); for _ in 0..30 { - push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1); if push.active_set.get(&value2.label().pubkey()).is_some() { break; } @@ -384,32 +381,30 @@ mod test { CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); } - push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1); assert_eq!(push.active_set.len(), push.num_active); } #[test] fn test_active_set_refresh_with_bank() { - let (block, mint_keypair) = GenesisBlock::new(100_000_000); - let bank = Arc::new(Bank::new(&block)); let time = timestamp() - 1024; //make sure there's at least a 1 second delay let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); + let mut stakes = HashMap::new(); for i in 1..=100 { let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), time)); let id = peer.label().pubkey(); crds.insert(peer.clone(), time).unwrap(); - bank.transfer(i * 100, &mint_keypair, id, bank.last_id()) - .unwrap(); + stakes.insert(id, i * 100); } let min_balance = E.powf(7000_f32.ln() - 0.5); // try upto 10 times because of rng for _ in 0..10 { - push.refresh_push_active_set(&crds, Some(&bank), Pubkey::default(), 100, 30); + push.refresh_push_active_set(&crds, &stakes, Pubkey::default(), 100, 30); let mut num_correct = 0; let mut num_wrong = 0; push.active_set.iter().for_each(|peer| { - if bank.get_balance(peer.0) >= min_balance as u64 { + if *stakes.get(peer.0).unwrap_or(&0) >= min_balance as u64 { num_correct += 1; } else { num_wrong += 1; @@ -431,7 +426,7 @@ mod test { let mut push = CrdsGossipPush::default(); let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1); let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); @@ -451,7 +446,7 @@ mod test { let mut push = CrdsGossipPush::default(); let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1); let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); @@ -471,7 +466,7 @@ mod test { let mut push = CrdsGossipPush::default(); let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(Keypair::new().pubkey(), 0)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, None, Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), Pubkey::default(), 1, 1); let mut ci = ContactInfo::new_localhost(Keypair::new().pubkey(), 0); ci.wallclock = 1; diff --git a/tests/crds_gossip.rs b/tests/crds_gossip.rs index c236c94df4..440a0bf138 100644 --- a/tests/crds_gossip.rs +++ b/tests/crds_gossip.rs @@ -1,4 +1,5 @@ use bincode::serialized_size; +use hashbrown::HashMap; use log::trace; use rayon::prelude::*; use solana::cluster_info::NodeInfo; @@ -12,7 +13,6 @@ use solana_sdk::hash::hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::timestamp; -use std::collections::HashMap; use std::sync::{Arc, Mutex}; type Node = Arc>; @@ -112,7 +112,9 @@ fn network_simulator(network: &mut Network) { // make sure there is someone in the active set let network_values: Vec = network.values().cloned().collect(); network_values.par_iter().for_each(|node| { - node.lock().unwrap().refresh_push_active_set(None); + node.lock() + .unwrap() + .refresh_push_active_set(&HashMap::new()); }); let mut total_bytes = bytes_tx; for second in 1..num { @@ -211,7 +213,9 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, } if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 { network_values.par_iter().for_each(|node| { - node.lock().unwrap().refresh_push_active_set(None); + node.lock() + .unwrap() + .refresh_push_active_set(&HashMap::new()); }); } total = network_values @@ -249,7 +253,12 @@ fn network_run_pull( let requests: Vec<_> = { network_values .par_iter() - .filter_map(|from| from.lock().unwrap().new_pull_request(now, None).ok()) + .filter_map(|from| { + from.lock() + .unwrap() + .new_pull_request(now, &HashMap::new()) + .ok() + }) .collect() }; let transfered: Vec<_> = requests @@ -372,7 +381,7 @@ fn test_prune_errors() { .crds .insert(CrdsValue::ContactInfo(ci.clone()), 0) .unwrap(); - crds_gossip.refresh_push_active_set(None); + crds_gossip.refresh_push_active_set(&HashMap::new()); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg(