diff --git a/runtime/src/stakes.rs b/runtime/src/stakes.rs index 8f72e64f54..34b993a7d3 100644 --- a/runtime/src/stakes.rs +++ b/runtime/src/stakes.rs @@ -3,16 +3,13 @@ use { crate::{ stake_history::StakeHistory, - vote_account::{VoteAccount, VoteAccounts, VoteAccountsHashMap}, + vote_account::{VoteAccount, VoteAccounts}, }, dashmap::DashMap, im::HashMap as ImHashMap, num_derive::ToPrimitive, num_traits::ToPrimitive, - rayon::{ - iter::{IntoParallelRefIterator, ParallelIterator}, - ThreadPool, - }, + rayon::{prelude::*, ThreadPool}, solana_sdk::{ account::{AccountSharedData, ReadableAccount}, clock::{Epoch, Slot}, @@ -26,6 +23,7 @@ use { solana_vote_program::vote_state::VoteState, std::{ collections::HashMap, + ops::Add, sync::{Arc, RwLock, RwLockReadGuard}, }, }; @@ -168,71 +166,54 @@ impl Stakes { &self.stake_history } - pub fn activate_epoch(&mut self, next_epoch: Epoch, thread_pool: &ThreadPool) { - let prev_epoch = self.epoch; - self.epoch = next_epoch; - - thread_pool.install(|| { - let stake_delegations = &self.stake_delegations; - let stake_history = &mut self.stake_history; - let vote_accounts: &VoteAccountsHashMap = self.vote_accounts.as_ref(); - - // construct map of vote pubkey -> list of stake delegations - let vote_delegations: HashMap> = { - let mut vote_delegations = HashMap::with_capacity(vote_accounts.len()); - stake_delegations - .iter() - .for_each(|(_stake_pubkey, delegation)| { - let vote_pubkey = &delegation.voter_pubkey; - vote_delegations - .entry(*vote_pubkey) - .and_modify(|delegations: &mut Vec<_>| delegations.push(delegation)) - .or_insert_with(|| vec![delegation]); - }); - vote_delegations - }; - - // wrap up the prev epoch by adding new stake history entry for the prev epoch - { - let stake_history_entry = vote_delegations - .par_iter() - .map(|(_vote_pubkey, delegations)| { - delegations - .par_iter() - .map(|delegation| { - delegation.stake_activating_and_deactivating( - prev_epoch, - Some(stake_history), - ) - }) - .reduce(StakeActivationStatus::default, |a, b| a + b) - }) - .reduce(StakeActivationStatus::default, |a, b| a + b); - - stake_history.add(prev_epoch, stake_history_entry); + fn activate_epoch(&mut self, next_epoch: Epoch, thread_pool: &ThreadPool) { + type StakesHashMap = HashMap; + fn merge(mut acc: StakesHashMap, other: StakesHashMap) -> StakesHashMap { + if acc.len() < other.len() { + return merge(other, acc); } - - // refresh the stake distribution of vote accounts for the next epoch, using new stake history - let vote_accounts_for_next_epoch: VoteAccountsHashMap = vote_accounts + for (key, stake) in other { + *acc.entry(key).or_default() += stake; + } + acc + } + let stake_delegations: Vec<_> = self.stake_delegations.values().collect(); + // Wrap up the prev epoch by adding new stake history entry for the + // prev epoch. + let stake_history_entry = thread_pool.install(|| { + stake_delegations .par_iter() - .map(|(vote_pubkey, (_stake, vote_account))| { - let delegated_stake = vote_delegations - .get(vote_pubkey) - .map(|delegations| { - delegations - .par_iter() - .map(|delegation| delegation.stake(next_epoch, Some(stake_history))) - .sum() - }) - .unwrap_or_default(); - - (*vote_pubkey, (delegated_stake, vote_account.clone())) + .fold(StakeActivationStatus::default, |acc, delegation| { + acc + delegation + .stake_activating_and_deactivating(self.epoch, Some(&self.stake_history)) }) - .collect(); - - // overwrite vote accounts so that staked nodes singleton is reset - self.vote_accounts = VoteAccounts::from(Arc::new(vote_accounts_for_next_epoch)); + .reduce(StakeActivationStatus::default, Add::add) }); + self.stake_history.add(self.epoch, stake_history_entry); + self.epoch = next_epoch; + // Refresh the stake distribution of vote accounts for the next epoch, + // using new stake history. + let delegated_stakes = thread_pool.install(|| { + stake_delegations + .par_iter() + .fold(HashMap::default, |mut delegated_stakes, delegation| { + let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default(); + *entry += delegation.stake(self.epoch, Some(&self.stake_history)); + delegated_stakes + }) + .reduce(HashMap::default, merge) + }); + self.vote_accounts = self + .vote_accounts + .iter() + .map(|(&vote_pubkey, (_ /*stake*/, vote_account))| { + let delegated_stake = delegated_stakes + .get(&vote_pubkey) + .copied() + .unwrap_or_default(); + (vote_pubkey, (delegated_stake, vote_account.clone())) + }) + .collect(); } /// Sum the stakes that point to the given voter_pubkey