diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index dfe57830f0..3c232a441e 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -72,9 +72,11 @@ impl Broadcast { } let bank_epoch = bank.get_stakers_epoch(bank.slot()); - let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers( - &staking_utils::delegated_stakes_at_epoch(&bank, bank_epoch).unwrap(), - ); + let mut broadcast_table = cluster_info + .read() + .unwrap() + .sorted_tvu_peers(staking_utils::staked_nodes_at_epoch(&bank, bank_epoch).as_ref()); + inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); // Layer 1, leader nodes are limited to the fanout size. broadcast_table.truncate(DATA_PLANE_FANOUT); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 0c9f91cea0..58548d5b74 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -424,11 +424,16 @@ impl ClusterInfo { fn sort_by_stake( peers: &[ContactInfo], - stakes: &HashMap, + stakes: Option<&HashMap>, ) -> Vec<(u64, ContactInfo)> { let mut peers_with_stakes: Vec<_> = peers .iter() - .map(|c| (*stakes.get(&c.id).unwrap_or(&0), c.clone())) + .map(|c| { + ( + stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0)), + c.clone(), + ) + }) .collect(); peers_with_stakes.sort_unstable_by(|(l_stake, l_info), (r_stake, r_info)| { if r_stake == l_stake { @@ -444,7 +449,7 @@ impl ClusterInfo { /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list fn sorted_peers_and_index( &self, - stakes: &HashMap, + stakes: Option<&HashMap>, ) -> (usize, Vec) { let mut peers = self.retransmit_peers(); peers.push(self.lookup(&self.id()).unwrap().clone()); @@ -465,7 +470,7 @@ impl ClusterInfo { (index, peers) } - pub fn sorted_tvu_peers(&self, stakes: &HashMap) -> Vec { + pub fn sorted_tvu_peers(&self, stakes: Option<&HashMap>) -> Vec { let peers = self.tvu_peers(); let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes); peers_with_stakes @@ -945,9 +950,9 @@ impl ClusterInfo { loop { let start = timestamp(); let stakes: HashMap<_, _> = match bank_forks { - Some(ref bank_forks) => staking_utils::delegated_stakes( - &bank_forks.read().unwrap().working_bank(), - ), + Some(ref bank_forks) => { + staking_utils::staked_nodes(&bank_forks.read().unwrap().working_bank()) + } None => HashMap::new(), }; let _ = Self::run_gossip(&obj, &stakes, &blob_sender); @@ -1421,7 +1426,7 @@ impl ClusterInfo { /// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance) pub fn compute_retransmit_peers( - stakes: &HashMap, + stakes: Option<&HashMap>, cluster_info: &Arc>, fanout: usize, ) -> (Vec, Vec) { diff --git a/core/src/leader_schedule_utils.rs b/core/src/leader_schedule_utils.rs index dbadccda24..dd7e3f72d8 100644 --- a/core/src/leader_schedule_utils.rs +++ b/core/src/leader_schedule_utils.rs @@ -6,7 +6,7 @@ use solana_sdk::timing::NUM_CONSECUTIVE_LEADER_SLOTS; /// Return the leader schedule for the given epoch. pub fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option { - staking_utils::delegated_stakes_at_epoch(bank, epoch_height).map(|stakes| { + staking_utils::staked_nodes_at_epoch(bank, epoch_height).map(|stakes| { let mut seed = [0u8; 32]; seed[0..8].copy_from_slice(&epoch_height.to_le_bytes()); let mut stakes: Vec<_> = stakes.into_iter().collect(); @@ -68,7 +68,7 @@ mod tests { create_genesis_block_with_leader(0, &pubkey, BOOTSTRAP_LEADER_LAMPORTS).0; let bank = Bank::new(&genesis_block); - let ids_and_stakes: Vec<_> = staking_utils::delegated_stakes(&bank).into_iter().collect(); + let ids_and_stakes: Vec<_> = staking_utils::staked_nodes(&bank).into_iter().collect(); let seed = [0u8; 32]; let leader_schedule = LeaderSchedule::new( &ids_and_stakes, diff --git a/core/src/locktower.rs b/core/src/locktower.rs index c7c33c434d..8e8a6a1b7b 100644 --- a/core/src/locktower.rs +++ b/core/src/locktower.rs @@ -360,7 +360,7 @@ impl Locktower { fn initialize_lockouts_from_bank(bank: &Bank, current_epoch: u64) -> VoteState { let mut lockouts = VoteState::default(); - if let Some(iter) = staking_utils::node_staked_accounts_at_epoch(&bank, current_epoch) { + if let Some(iter) = bank.epoch_vote_accounts(current_epoch) { for (delegate_id, (_, account)) in iter { if *delegate_id == bank.collector_id() { let state = VoteState::deserialize(&account.data).expect("votes"); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 53f8083176..26c29372dd 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -39,7 +39,7 @@ fn retransmit( let r_bank = bank_forks.read().unwrap().working_bank(); let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot()); let (neighbors, children) = compute_retransmit_peers( - &staking_utils::delegated_stakes_at_epoch(&r_bank, bank_epoch).unwrap(), + staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(), cluster_info, DATA_PLANE_FANOUT, ); diff --git a/core/src/staking_utils.rs b/core/src/staking_utils.rs index fae7532c8c..94cd80f611 100644 --- a/core/src/staking_utils.rs +++ b/core/src/staking_utils.rs @@ -18,18 +18,15 @@ pub fn get_supermajority_slot(bank: &Bank, epoch_height: u64) -> Option { } pub fn vote_account_stakes(bank: &Bank) -> HashMap { - let node_staked_accounts = node_staked_accounts(bank); - node_staked_accounts + bank.vote_accounts() + .into_iter() .map(|(id, (stake, _))| (id, stake)) .collect() } -/// Collect the delegate account balance and vote states for delegates have non-zero balance in -/// any of their managed staking accounts -pub fn delegated_stakes(bank: &Bank) -> HashMap { - let node_staked_accounts = node_staked_accounts(bank); - let node_staked_vote_states = to_vote_state(node_staked_accounts); - to_delegated_stakes(node_staked_vote_states) +/// Collect the staked nodes, as named by staked vote accounts from the given bank +pub fn staked_nodes(bank: &Bank) -> HashMap { + to_staked_nodes(to_vote_states(bank.vote_accounts().into_iter())) } /// At the specified epoch, collect the node account balance and vote states for nodes that @@ -38,43 +35,23 @@ pub fn vote_account_stakes_at_epoch( bank: &Bank, epoch_height: u64, ) -> Option> { - let node_staked_accounts = node_staked_accounts_at_epoch(bank, epoch_height); - node_staked_accounts - .map(|epoch_state| epoch_state.map(|(id, (stake, _))| (*id, *stake)).collect()) + bank.epoch_vote_accounts(epoch_height).map(|accounts| { + accounts + .iter() + .map(|(id, (stake, _))| (*id, *stake)) + .collect() + }) } /// At the specified epoch, collect the delegate account balance and vote states for delegates /// that have non-zero balance in any of their managed staking accounts -pub fn delegated_stakes_at_epoch(bank: &Bank, epoch_height: u64) -> Option> { - let node_staked_accounts = node_staked_accounts_at_epoch(bank, epoch_height); - let node_staked_vote_states = node_staked_accounts.map(to_vote_state); - node_staked_vote_states.map(to_delegated_stakes) +pub fn staked_nodes_at_epoch(bank: &Bank, epoch_height: u64) -> Option> { + bank.epoch_vote_accounts(epoch_height) + .map(|vote_accounts| to_staked_nodes(to_vote_states(vote_accounts.into_iter()))) } -/// Collect the node account balance and vote states for nodes have non-zero balance in -/// their corresponding staking accounts -fn node_staked_accounts(bank: &Bank) -> impl Iterator { - bank.vote_accounts().into_iter() -} - -pub fn node_staked_accounts_at_epoch( - bank: &Bank, - epoch_height: u64, -) -> Option> { - bank.epoch_vote_accounts(epoch_height).map(|vote_accounts| { - vote_accounts - .into_iter() - .filter(|(account_id, (_, account))| filter_no_delegate(account_id, account)) - }) -} - -fn filter_no_delegate(account_id: &Pubkey, account: &Account) -> bool { - VoteState::deserialize(&account.data) - .map(|vote_state| vote_state.node_id != *account_id) - .unwrap_or(false) -} - -fn to_vote_state( +// input (vote_id, (stake, vote_account)) => (stake, vote_state) +fn to_vote_states( node_staked_accounts: impl Iterator, impl Borrow<(u64, Account)>)>, ) -> impl Iterator { node_staked_accounts.filter_map(|(_, stake_account)| { @@ -84,13 +61,13 @@ fn to_vote_state( }) } -fn to_delegated_stakes( +// (stake, vote_state) => (node, stake) +fn to_staked_nodes( node_staked_accounts: impl Iterator, ) -> HashMap { let mut map: HashMap = HashMap::new(); node_staked_accounts.for_each(|(stake, state)| { - let delegate = &state.node_id; - map.entry(*delegate) + map.entry(state.node_id) .and_modify(|s| *s += stake) .or_insert(stake); }); @@ -98,10 +75,12 @@ fn to_delegated_stakes( } fn epoch_stakes_and_lockouts(bank: &Bank, epoch_height: u64) -> Vec<(u64, Option)> { - let node_staked_accounts = - node_staked_accounts_at_epoch(bank, epoch_height).expect("Bank state for epoch is missing"); - let node_staked_vote_states = to_vote_state(node_staked_accounts); - node_staked_vote_states + let node_staked_accounts = bank + .epoch_vote_accounts(epoch_height) + .expect("Bank state for epoch is missing") + .into_iter(); + + to_vote_states(node_staked_accounts) .map(|(stake, states)| (stake, states.root_slot)) .collect() } @@ -148,7 +127,7 @@ pub mod tests { } #[test] - fn test_bank_staked_nodes_at_epoch() { + fn test_vote_account_stakes_at_epoch() { let (genesis_block, _mint_keypair, voting_keypair) = create_genesis_block_with_leader(1, &Pubkey::new_rand(), BOOTSTRAP_LEADER_LAMPORTS); @@ -263,22 +242,22 @@ pub mod tests { } #[test] - fn test_to_delegated_stakes() { + fn test_to_staked_nodes() { let mut stakes = Vec::new(); - let delegate1 = Pubkey::new_rand(); - let delegate2 = Pubkey::new_rand(); + let node1 = Pubkey::new_rand(); + let node2 = Pubkey::new_rand(); - // Delegate 1 has stake of 3 + // Node 1 has stake of 3 for i in 0..3 { - stakes.push((i, VoteState::new(&Pubkey::new_rand(), &delegate1, 0))); + stakes.push((i, VoteState::new(&Pubkey::new_rand(), &node1, 0))); } - // Delegate 1 has stake of 5 - stakes.push((5, VoteState::new(&Pubkey::new_rand(), &delegate2, 0))); + // Node 1 has stake of 5 + stakes.push((5, VoteState::new(&Pubkey::new_rand(), &node2, 0))); - let result = to_delegated_stakes(stakes.into_iter()); + let result = to_staked_nodes(stakes.into_iter()); assert_eq!(result.len(), 2); - assert_eq!(result[&delegate1], 3); - assert_eq!(result[&delegate2], 5); + assert_eq!(result[&node1], 3); + assert_eq!(result[&node2], 5); } } diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index a9f958240d..f26f975cbf 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -72,7 +72,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect(); // pretend to broadcast from leader - cluster_info::create_broadcast_orders - let mut broadcast_table = cluster_info.sorted_tvu_peers(&staked_nodes); + let mut broadcast_table = cluster_info.sorted_tvu_peers(Some(&staked_nodes)); broadcast_table.truncate(fanout); let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table); @@ -106,7 +106,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { cluster.gossip.set_self(&*id); if !mapped_peers.contains_key(id) { let (neighbors, children) = compute_retransmit_peers( - &staked_nodes, + Some(&staked_nodes), &Arc::new(RwLock::new(cluster.clone())), fanout, ); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index a679c9a662..fa8617ccdc 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -34,7 +34,7 @@ use std::time::Instant; #[derive(Default, Clone)] pub struct Stakes { /// vote accounts - pub vote_accounts: HashMap, + vote_accounts: HashMap, /// stake_accounts stake_accounts: HashMap, @@ -45,7 +45,7 @@ impl Stakes { solana_vote_api::check_id(&account.owner) || solana_stake_api::check_id(&account.owner) } - pub fn update(&mut self, pubkey: &Pubkey, account: &Account) { + pub fn store(&mut self, pubkey: &Pubkey, account: &Account) { if solana_vote_api::check_id(&account.owner) { if account.lamports != 0 { self.vote_accounts @@ -805,7 +805,7 @@ impl Bank { self.accounts.store_slow(self.slot(), pubkey, account); if Stakes::is_stake(account) { - self.stakes.write().unwrap().update(pubkey, account); + self.stakes.write().unwrap().store(pubkey, account); } } @@ -937,17 +937,19 @@ impl Bank { .zip(acc.0.iter()) .filter(|(_, account)| Stakes::is_stake(account)) { - self.stakes.write().unwrap().update(pubkey, account); + self.stakes.write().unwrap().store(pubkey, account); } } } - /// current vote accounts for this bank + /// current vote accounts for this bank along with the stake + /// attributed to each account pub fn vote_accounts(&self) -> HashMap { self.stakes.read().unwrap().vote_accounts.clone() } - /// vote accounts for the specific epoch + /// vote accounts for the specific epoch along with the stake + /// attributed to each account pub fn epoch_vote_accounts(&self, epoch: u64) -> Option<&HashMap> { self.epoch_stakes .get(&epoch)