diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 2044743264..1fd1f25283 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -132,6 +132,7 @@ pub struct ReplayStageConfig { pub cache_block_meta_sender: Option, pub bank_notification_sender: Option, pub wait_for_vote_to_start_leader: bool, + pub disable_epoch_boundary_optimization: bool, } #[derive(Default)] @@ -333,6 +334,7 @@ impl ReplayStage { cache_block_meta_sender, bank_notification_sender, wait_for_vote_to_start_leader, + disable_epoch_boundary_optimization, } = config; trace!("replay stage"); @@ -697,6 +699,7 @@ impl ReplayStage { &retransmit_slots_sender, &mut skipped_slots_info, has_new_vote_been_rooted, + disable_epoch_boundary_optimization, ); let poh_bank = poh_recorder.lock().unwrap().bank(); @@ -1087,6 +1090,7 @@ impl ReplayStage { retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, has_new_vote_been_rooted: bool, + disable_epoch_boundary_optimization: bool, ) { // all the individual calls to poh_recorder.lock() are designed to // increase granularity, decrease contention @@ -1204,7 +1208,10 @@ impl ReplayStage { root_slot, my_pubkey, rpc_subscriptions, - NewBankOptions { vote_only_bank }, + NewBankOptions { + vote_only_bank, + disable_epoch_boundary_optimization, + }, ); let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 242cc41674..3812547a93 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -98,6 +98,7 @@ pub struct TvuConfig { pub rocksdb_max_compaction_jitter: Option, pub wait_for_vote_to_start_leader: bool, pub accounts_shrink_ratio: AccountShrinkThreshold, + pub disable_epoch_boundary_optimization: bool, } impl Tvu { @@ -281,6 +282,7 @@ impl Tvu { cache_block_meta_sender, bank_notification_sender, wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, + disable_epoch_boundary_optimization: tvu_config.disable_epoch_boundary_optimization, }; let (voting_sender, voting_receiver) = channel(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 60874f1a52..98ec7001f9 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -154,6 +154,7 @@ pub struct ValidatorConfig { pub validator_exit: Arc>, pub no_wait_for_vote_to_start_leader: bool, pub accounts_shrink_ratio: AccountShrinkThreshold, + pub disable_epoch_boundary_optimization: bool, } impl Default for ValidatorConfig { @@ -212,6 +213,7 @@ impl Default for ValidatorConfig { validator_exit: Arc::new(RwLock::new(Exit::default())), no_wait_for_vote_to_start_leader: true, accounts_shrink_ratio: AccountShrinkThreshold::default(), + disable_epoch_boundary_optimization: false, } } } @@ -808,6 +810,7 @@ impl Validator { rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval, wait_for_vote_to_start_leader, accounts_shrink_ratio: config.accounts_shrink_ratio, + disable_epoch_boundary_optimization: config.disable_epoch_boundary_optimization, }, &max_slots, &cost_model, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index cd93d7f6b7..1eecd8c5a8 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -60,6 +60,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { poh_hashes_per_batch: config.poh_hashes_per_batch, no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader, accounts_shrink_ratio: config.accounts_shrink_ratio, + disable_epoch_boundary_optimization: config.disable_epoch_boundary_optimization, } } diff --git a/programs/stake/src/stake_state.rs b/programs/stake/src/stake_state.rs index c50ca1c504..2e2f334b01 100644 --- a/programs/stake/src/stake_state.rs +++ b/programs/stake/src/stake_state.rs @@ -1095,6 +1095,81 @@ fn stake_weighted_credits_observed( } } +// utility function, used by runtime +// returns a tuple of (stakers_reward,voters_reward) +#[doc(hidden)] +#[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")] +pub fn redeem_rewards_slow( + rewarded_epoch: Epoch, + stake_account: &mut AccountSharedData, + vote_account: &mut AccountSharedData, + vote_state: &VoteState, + point_value: &PointValue, + stake_history: Option<&StakeHistory>, + inflation_point_calc_tracer: Option, + fix_activating_credits_observed: bool, +) -> Result<(u64, u64), InstructionError> { + if let StakeState::Stake(meta, mut stake) = stake_account.state()? { + if let Some(inflation_point_calc_tracer) = inflation_point_calc_tracer.as_ref() { + inflation_point_calc_tracer( + &InflationPointCalculationEvent::EffectiveStakeAtRewardedEpoch( + stake.stake(rewarded_epoch, stake_history), + ), + ); + inflation_point_calc_tracer(&InflationPointCalculationEvent::RentExemptReserve( + meta.rent_exempt_reserve, + )); + inflation_point_calc_tracer(&InflationPointCalculationEvent::Commission( + vote_state.commission, + )); + } + + if let Some((stakers_reward, voters_reward)) = redeem_stake_rewards( + rewarded_epoch, + &mut stake, + point_value, + vote_state, + stake_history, + inflation_point_calc_tracer, + fix_activating_credits_observed, + ) { + stake_account.checked_add_lamports(stakers_reward)?; + vote_account.checked_add_lamports(voters_reward)?; + + stake_account.set_state(&StakeState::Stake(meta, stake))?; + + Ok((stakers_reward, voters_reward)) + } else { + Err(StakeError::NoCreditsToRedeem.into()) + } + } else { + Err(InstructionError::InvalidAccountData) + } +} + +// utility function, used by runtime +#[doc(hidden)] +#[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")] +pub fn calculate_points_slow( + stake_account: &AccountSharedData, + vote_account: &AccountSharedData, + stake_history: Option<&StakeHistory>, +) -> Result { + if let StakeState::Stake(_meta, stake) = stake_account.state()? { + let vote_state: VoteState = + StateMut::::state(vote_account)?.convert_to_current(); + + Ok(calculate_stake_points( + &stake, + &vote_state, + stake_history, + null_tracer(), + )) + } else { + Err(InstructionError::InvalidAccountData) + } +} + // utility function, used by runtime // returns a tuple of (stakers_reward,voters_reward) #[doc(hidden)] diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 38132355c5..7345ec78b7 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1113,6 +1113,7 @@ struct VoteWithStakeDelegations { #[derive(Debug, Default)] pub struct NewBankOptions { pub vote_only_bank: bool, + pub disable_epoch_boundary_optimization: bool, } impl Bank { @@ -1268,7 +1269,10 @@ impl Bank { new_bank_options: NewBankOptions, ) -> Self { let mut time = Measure::start("bank::new_from_parent"); - let NewBankOptions { vote_only_bank } = new_bank_options; + let NewBankOptions { + vote_only_bank, + disable_epoch_boundary_optimization, + } = new_bank_options; parent.freeze(); assert_ne!(slot, parent.slot()); @@ -1374,32 +1378,55 @@ impl Bank { // Following code may touch AccountsDb, requiring proper ancestors let parent_epoch = parent.epoch(); if parent_epoch < new.epoch() { - let thread_pool = ThreadPoolBuilder::new().build().unwrap(); - new.apply_feature_activations(false, false); - - // Add new entry to stakes.stake_history, set appropriate epoch and - // update vote accounts with warmed up stakes before saving a - // snapshot of stakes in epoch stakes - new.stakes - .write() - .unwrap() - .activate_epoch(epoch, &thread_pool); - - // Save a snapshot of stakes for use in consensus and stake weighted networking - let leader_schedule_epoch = epoch_schedule.get_leader_schedule_epoch(slot); - new.update_epoch_stakes(leader_schedule_epoch); - - // After saving a snapshot of stakes, apply stake rewards and commission - new.update_rewards_with_thread_pool(parent_epoch, reward_calc_tracer, &thread_pool); - } else { - // Save a snapshot of stakes for use in consensus and stake weighted networking - let leader_schedule_epoch = epoch_schedule.get_leader_schedule_epoch(slot); - new.update_epoch_stakes(leader_schedule_epoch); } - // Update sysvars before processing transactions + let optimize_epoch_boundary_updates = !disable_epoch_boundary_optimization + && new + .feature_set + .is_active(&feature_set::optimize_epoch_boundary_updates::id()); + + if optimize_epoch_boundary_updates { + if parent_epoch < new.epoch() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + + // Add new entry to stakes.stake_history, set appropriate epoch and + // update vote accounts with warmed up stakes before saving a + // snapshot of stakes in epoch stakes + new.stakes + .write() + .unwrap() + .activate_epoch(epoch, &thread_pool); + + // Save a snapshot of stakes for use in consensus and stake weighted networking + let leader_schedule_epoch = epoch_schedule.get_leader_schedule_epoch(slot); + new.update_epoch_stakes(leader_schedule_epoch); + + // After saving a snapshot of stakes, apply stake rewards and commission + new.update_rewards_with_thread_pool(parent_epoch, reward_calc_tracer, &thread_pool); + } else { + // Save a snapshot of stakes for use in consensus and stake weighted networking + let leader_schedule_epoch = epoch_schedule.get_leader_schedule_epoch(slot); + new.update_epoch_stakes(leader_schedule_epoch); + } + + // Update sysvars before processing transactions + new.update_slot_hashes(); + new.update_stake_history(Some(parent_epoch)); + new.update_clock(Some(parent_epoch)); + new.update_fees(); + + return new; + } + + #[allow(deprecated)] + let cloned = new.stakes.read().unwrap().clone_with_epoch(epoch); + *new.stakes.write().unwrap() = cloned; + + let leader_schedule_epoch = epoch_schedule.get_leader_schedule_epoch(slot); + new.update_epoch_stakes(leader_schedule_epoch); new.update_slot_hashes(); + new.update_rewards(parent_epoch, reward_calc_tracer); new.update_stake_history(Some(parent_epoch)); new.update_clock(Some(parent_epoch)); new.update_fees(); @@ -1966,6 +1993,111 @@ impl Bank { num_slots as f64 / self.slots_per_year } + // update rewards based on the previous epoch + fn update_rewards( + &mut self, + prev_epoch: Epoch, + reward_calc_tracer: Option, + ) { + if prev_epoch == self.epoch() { + return; + } + let mut timing = Measure::start("epoch_rewards"); + // if I'm the first Bank in an epoch, count, claim, disburse rewards from Inflation + + let slot_in_year = self.slot_in_year_for_inflation(); + let epoch_duration_in_years = self.epoch_duration_in_years(prev_epoch); + + let (validator_rate, foundation_rate) = { + let inflation = self.inflation.read().unwrap(); + ( + (*inflation).validator(slot_in_year), + (*inflation).foundation(slot_in_year), + ) + }; + + let capitalization = self.capitalization(); + let validator_rewards = + (validator_rate * capitalization as f64 * epoch_duration_in_years) as u64; + + let old_vote_balance_and_staked = self.stakes.read().unwrap().vote_balance_and_staked(); + + #[allow(deprecated)] + let validator_point_value = self.pay_validator_rewards( + prev_epoch, + validator_rewards, + reward_calc_tracer, + self.stake_program_advance_activating_credits_observed(), + ); + + if !self + .feature_set + .is_active(&feature_set::deprecate_rewards_sysvar::id()) + { + // this sysvar can be retired once `pico_inflation` is enabled on all clusters + self.update_sysvar_account(&sysvar::rewards::id(), |account| { + create_account( + &sysvar::rewards::Rewards::new(validator_point_value), + self.inherit_specially_retained_account_fields(account), + ) + }); + } + + let new_vote_balance_and_staked = self.stakes.read().unwrap().vote_balance_and_staked(); + let validator_rewards_paid = new_vote_balance_and_staked - old_vote_balance_and_staked; + assert_eq!( + validator_rewards_paid, + u64::try_from( + self.rewards + .read() + .unwrap() + .iter() + .map(|(_address, reward_info)| { + match reward_info.reward_type { + RewardType::Voting | RewardType::Staking => reward_info.lamports, + _ => 0, + } + }) + .sum::() + ) + .unwrap() + ); + + // verify that we didn't pay any more than we expected to + assert!(validator_rewards >= validator_rewards_paid); + + info!( + "distributed inflation: {} (rounded from: {})", + validator_rewards_paid, validator_rewards + ); + + self.capitalization + .fetch_add(validator_rewards_paid, Relaxed); + + let active_stake = if let Some(stake_history_entry) = + self.stakes.read().unwrap().history().get(&prev_epoch) + { + stake_history_entry.effective + } else { + 0 + }; + timing.stop(); + + datapoint_warn!( + "epoch_rewards", + ("slot", self.slot, i64), + ("epoch", prev_epoch, i64), + ("validator_rate", validator_rate, f64), + ("foundation_rate", foundation_rate, f64), + ("epoch_duration_in_years", epoch_duration_in_years, f64), + ("validator_rewards", validator_rewards_paid, i64), + ("active_stake", active_stake, i64), + ("pre_capitalization", capitalization, i64), + ("post_capitalization", self.capitalization(), i64), + ("time_us", timing.as_us(), i64) + ); + } + // update rewards based on the previous epoch fn update_rewards_with_thread_pool( &mut self, @@ -2064,6 +2196,69 @@ impl Bank { ); } + /// map stake delegations into resolved (pubkey, account) pairs + /// returns a map (has to be copied) of loaded + /// ( Vec<(staker info)> (voter account) ) keyed by voter pubkey + /// + /// Filters out invalid pairs + #[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")] + fn stake_delegation_accounts( + &self, + reward_calc_tracer: Option, + ) -> HashMap, AccountSharedData)> { + let mut accounts = HashMap::new(); + + self.stakes + .read() + .unwrap() + .stake_delegations() + .iter() + .for_each(|(stake_pubkey, delegation)| { + match ( + self.get_account_with_fixed_root(stake_pubkey), + self.get_account_with_fixed_root(&delegation.voter_pubkey), + ) { + (Some(stake_account), Some(vote_account)) => { + // call tracer to catch any illegal data if any + if let Some(reward_calc_tracer) = reward_calc_tracer.as_ref() { + reward_calc_tracer(&RewardCalculationEvent::Staking( + stake_pubkey, + &InflationPointCalculationEvent::Delegation( + *delegation, + *vote_account.owner(), + ), + )); + } + if self + .feature_set + .is_active(&feature_set::filter_stake_delegation_accounts::id()) + && (stake_account.owner() != &solana_stake_program::id() + || vote_account.owner() != &solana_vote_program::id()) + { + datapoint_warn!( + "bank-stake_delegation_accounts-invalid-account", + ("slot", self.slot() as i64, i64), + ("stake-address", format!("{:?}", stake_pubkey), String), + ( + "vote-address", + format!("{:?}", delegation.voter_pubkey), + String + ), + ); + return; + } + let entry = accounts + .entry(delegation.voter_pubkey) + .or_insert((Vec::new(), vote_account)); + entry.0.push((*stake_pubkey, stake_account)); + } + (_, _) => {} + } + }); + + accounts + } + /// map stake delegations into resolved (pubkey, account) pairs /// returns a map (has to be copied) of loaded /// ( Vec<(staker info)> (voter account) ) keyed by voter pubkey @@ -2171,6 +2366,128 @@ impl Bank { accounts } + /// iterate over all stakes, redeem vote credits for each stake we can + /// successfully load and parse, return the lamport value of one point + #[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")] + fn pay_validator_rewards( + &mut self, + rewarded_epoch: Epoch, + rewards: u64, + reward_calc_tracer: Option, + fix_activating_credits_observed: bool, + ) -> f64 { + let stake_history = self.stakes.read().unwrap().history().clone(); + + #[allow(deprecated)] + let mut stake_delegation_accounts = + self.stake_delegation_accounts(reward_calc_tracer.as_ref()); + + let points: u128 = stake_delegation_accounts + .iter() + .flat_map(|(_vote_pubkey, (stake_group, vote_account))| { + stake_group + .iter() + .map(move |(_stake_pubkey, stake_account)| (stake_account, vote_account)) + }) + .map(|(stake_account, vote_account)| { + #[allow(deprecated)] + stake_state::calculate_points_slow( + stake_account, + vote_account, + Some(&stake_history), + ) + .unwrap_or(0) + }) + .sum(); + + if points == 0 { + return 0.0; + } + + let point_value = PointValue { rewards, points }; + + let mut rewards = vec![]; + // pay according to point value + for (vote_pubkey, (stake_group, vote_account)) in stake_delegation_accounts.iter_mut() { + let mut vote_account_changed = false; + let voters_account_pre_balance = vote_account.lamports(); + let vote_state: VoteState = match StateMut::::state(vote_account) { + Ok(vote_state) => vote_state.convert_to_current(), + Err(err) => { + debug!( + "failed to deserialize vote account {}: {}", + vote_pubkey, err + ); + continue; + } + }; + let commission = Some(vote_state.commission); + + for (stake_pubkey, stake_account) in stake_group.iter_mut() { + // curry closure to add the contextual stake_pubkey + let reward_calc_tracer = reward_calc_tracer.as_ref().map(|outer| { + let stake_pubkey = *stake_pubkey; + // inner + move |inner_event: &_| { + outer(&RewardCalculationEvent::Staking(&stake_pubkey, inner_event)) + } + }); + #[allow(deprecated)] + let redeemed = stake_state::redeem_rewards_slow( + rewarded_epoch, + stake_account, + vote_account, + &vote_state, + &point_value, + Some(&stake_history), + reward_calc_tracer, + fix_activating_credits_observed, + ); + if let Ok((stakers_reward, _voters_reward)) = redeemed { + self.store_account(stake_pubkey, stake_account); + vote_account_changed = true; + + if stakers_reward > 0 { + rewards.push(( + *stake_pubkey, + RewardInfo { + reward_type: RewardType::Staking, + lamports: stakers_reward as i64, + post_balance: stake_account.lamports(), + commission, + }, + )); + } + } else { + debug!( + "stake_state::redeem_rewards() failed for {}: {:?}", + stake_pubkey, redeemed + ); + } + } + + if vote_account_changed { + let post_balance = vote_account.lamports(); + let lamports = (post_balance - voters_account_pre_balance) as i64; + if lamports != 0 { + rewards.push(( + *vote_pubkey, + RewardInfo { + reward_type: RewardType::Voting, + lamports, + post_balance, + commission, + }, + )); + } + self.store_account(vote_pubkey, vote_account); + } + } + self.rewards.write().unwrap().append(&mut rewards); + + point_value.rewards as f64 / point_value.points as f64 + } + /// iterate over all stakes, redeem vote credits for each stake we can /// successfully load and parse, return the lamport value of one point fn pay_validator_rewards_with_thread_pool( diff --git a/runtime/src/stakes.rs b/runtime/src/stakes.rs index 9afb80811b..4df4eb3da0 100644 --- a/runtime/src/stakes.rs +++ b/runtime/src/stakes.rs @@ -44,6 +44,49 @@ impl Stakes { &self.stake_history } + #[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")] + pub fn clone_with_epoch(&self, next_epoch: Epoch) -> Self { + let prev_epoch = self.epoch; + if prev_epoch == next_epoch { + self.clone() + } else { + // wrap up the prev epoch by adding new stake history entry for the prev epoch + let mut stake_history_upto_prev_epoch = self.stake_history.clone(); + stake_history_upto_prev_epoch.add( + prev_epoch, + stake_state::new_stake_history_entry( + prev_epoch, + self.stake_delegations + .iter() + .map(|(_pubkey, stake_delegation)| stake_delegation), + Some(&self.stake_history), + ), + ); + + // refresh the stake distribution of vote accounts for the next epoch, using new stake history + let vote_accounts_for_next_epoch = self + .vote_accounts + .iter() + .map(|(pubkey, (_ /*stake*/, account))| { + let stake = self.calculate_stake( + pubkey, + next_epoch, + Some(&stake_history_upto_prev_epoch), + ); + (*pubkey, (stake, account.clone())) + }) + .collect(); + + Stakes { + stake_delegations: self.stake_delegations.clone(), + unused: self.unused, + epoch: next_epoch, + stake_history: stake_history_upto_prev_epoch, + vote_accounts: vote_accounts_for_next_epoch, + } + } + } + pub fn activate_epoch(&mut self, next_epoch: Epoch, thread_pool: &ThreadPool) { let prev_epoch = self.epoch; self.epoch = next_epoch; @@ -541,6 +584,35 @@ pub mod tests { } } + #[test] + fn test_clone_with_epoch() { + let mut stakes = Stakes::default(); + + let ((vote_pubkey, vote_account), (stake_pubkey, stake_account)) = + create_staked_node_accounts(10); + + stakes.store(&vote_pubkey, &vote_account, true); + stakes.store(&stake_pubkey, &stake_account, true); + let stake = stake_state::stake_from(&stake_account).unwrap(); + + { + let vote_accounts = stakes.vote_accounts(); + assert_eq!( + vote_accounts.get(&vote_pubkey).unwrap().0, + stake.stake(stakes.epoch, Some(&stakes.stake_history)) + ); + } + #[allow(deprecated)] + let stakes = stakes.clone_with_epoch(3); + { + let vote_accounts = stakes.vote_accounts(); + assert_eq!( + vote_accounts.get(&vote_pubkey).unwrap().0, + stake.stake(stakes.epoch, Some(&stakes.stake_history)) + ); + } + } + #[test] fn test_activate_epoch() { let mut stakes = Stakes::default(); diff --git a/validator/src/main.rs b/validator/src/main.rs index 7665135594..30a5bbb24b 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1991,6 +1991,14 @@ pub fn main() { .help("Allow contacting private ip addresses") .hidden(true), ) + .arg( + Arg::with_name("disable_epoch_boundary_optimization") + .long("disable-epoch-boundary-optimization") + .takes_value(false) + .help("Disables epoch boundary optimization and overrides the \ + optimize_epoch_boundary_updates feature switch if enabled.") + .hidden(true), + ) .after_help("The default subcommand is run") .subcommand( SubCommand::with_name("exit") @@ -2512,6 +2520,8 @@ pub fn main() { tpu_coalesce_ms, no_wait_for_vote_to_start_leader: matches.is_present("no_wait_for_vote_to_start_leader"), accounts_shrink_ratio, + disable_epoch_boundary_optimization: matches + .is_present("disable_epoch_boundary_optimization"), ..ValidatorConfig::default() };