diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 83c3bf7c0c..22d4dfe035 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -358,11 +358,12 @@ impl Tower { pub(crate) fn check_switch_threshold( &self, - _slot: u64, + _slot: Slot, _ancestors: &HashMap>, _descendants: &HashMap>, _progress: &ProgressMap, - _total_stake: u64, + _total_epoch_stake: u64, + _epoch_vote_accounts: &HashMap, ) -> bool { true } @@ -647,7 +648,6 @@ pub mod test { bank_forks, progress, &None, - &mut 0, &mut HashSet::new(), ); } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5d8fb56d36..f4a1d08724 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -177,11 +177,6 @@ impl ReplayStage { let mut current_leader = None; let mut last_reset = Hash::default(); let mut partition = false; - let mut earliest_vote_on_fork = { - let slots = tower.last_vote().slots; - slots.last().cloned().unwrap_or(0) - }; - let mut switch_threshold = false; let mut skipped_slots_info = SkippedSlotsInfo::default(); loop { let allocated = thread_mem_usage::Allocatedp::default(); @@ -220,7 +215,7 @@ impl ReplayStage { Self::report_memory(&allocated, "replay_active_banks", start); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); - let descendants = Arc::new(HashMap::new()); + let descendants = HashMap::new(); let start = allocated.get(); let mut frozen_banks: Vec<_> = bank_forks .read() @@ -259,7 +254,7 @@ impl ReplayStage { } } - let (heaviest_bank, votable_bank_on_same_fork) = + let (heaviest_bank, heaviest_bank_on_same_fork) = Self::select_forks(&frozen_banks, &tower, &progress, &ancestors); Self::report_memory(&allocated, "select_fork", start); @@ -267,9 +262,7 @@ impl ReplayStage { let (vote_bank, reset_bank, failure_reasons) = Self::select_vote_and_reset_forks( &heaviest_bank, - &votable_bank_on_same_fork, - earliest_vote_on_fork, - &mut switch_threshold, + &heaviest_bank_on_same_fork, &ancestors, &descendants, &progress, @@ -311,46 +304,36 @@ impl ReplayStage { let start = allocated.get(); // Vote on a fork - let voted_on_different_fork = { - if let Some(ref vote_bank) = vote_bank { - subscriptions.notify_subscribers(block_commitment_cache.read().unwrap().slot(), &bank_forks); - if let Some(votable_leader) = leader_schedule_cache - .slot_leader_at(vote_bank.slot(), Some(vote_bank)) - { - Self::log_leader_change( - &my_pubkey, - vote_bank.slot(), - &mut current_leader, - &votable_leader, - ); - } - - Self::handle_votable_bank( - &vote_bank, - &bank_forks, - &mut tower, - &mut progress, - &vote_account, - &authorized_voter_keypairs, - &cluster_info, - &blockstore, - &leader_schedule_cache, - &root_bank_sender, - &lockouts_sender, - &accounts_hash_sender, - &latest_root_senders, - &mut earliest_vote_on_fork, - &mut all_pubkeys, - &subscriptions, - )?; - - ancestors - .get(&vote_bank.slot()) - .unwrap() - .contains(&earliest_vote_on_fork) - } else { - false + if let Some(ref vote_bank) = vote_bank { + subscriptions.notify_subscribers(block_commitment_cache.read().unwrap().slot(), &bank_forks); + if let Some(votable_leader) = leader_schedule_cache + .slot_leader_at(vote_bank.slot(), Some(vote_bank)) + { + Self::log_leader_change( + &my_pubkey, + vote_bank.slot(), + &mut current_leader, + &votable_leader, + ); } + + Self::handle_votable_bank( + &vote_bank, + &bank_forks, + &mut tower, + &mut progress, + &vote_account, + &authorized_voter_keypairs, + &cluster_info, + &blockstore, + &leader_schedule_cache, + &root_bank_sender, + &lockouts_sender, + &accounts_hash_sender, + &latest_root_senders, + &mut all_pubkeys, + &subscriptions, + )?; }; Self::report_memory(&allocated, "votable_bank", start); @@ -358,12 +341,7 @@ impl ReplayStage { // Reset onto a fork if let Some(reset_bank) = reset_bank { - let selected_same_fork = ancestors - .get(&reset_bank.slot()) - .unwrap() - .contains(&earliest_vote_on_fork); if last_reset != reset_bank.last_blockhash() - && (selected_same_fork || switch_threshold) { info!( "vote bank: {:?} reset bank: {:?}", @@ -413,17 +391,6 @@ impl ReplayStage { } Self::report_memory(&allocated, "reset_bank", start); - // If we voted on a different fork, update the earliest vote - // to this slot, clear the switch threshold - if voted_on_different_fork { - earliest_vote_on_fork = vote_bank - .expect("voted_on_different_fork only set if vote_bank.is_some()") - .slot(); - // Clear the thresholds after voting on different - // fork - switch_threshold = false; - } - let start = allocated.get(); if !tpu_has_bank { Self::maybe_start_leader( @@ -718,7 +685,6 @@ impl ReplayStage { lockouts_sender: &Sender, accounts_hash_sender: &Option, latest_root_senders: &[Sender], - earliest_vote_on_fork: &mut Slot, all_pubkeys: &mut HashSet>, subscriptions: &Arc, ) -> Result<()> { @@ -751,7 +717,6 @@ impl ReplayStage { &bank_forks, progress, accounts_hash_sender, - earliest_vote_on_fork, all_pubkeys, ); subscriptions.notify_roots(rooted_slots); @@ -1144,7 +1109,8 @@ impl ReplayStage { .count(); let last_vote = tower.last_vote().slots.last().cloned(); - let mut last_votable_on_same_fork = None; + let mut heaviest_bank_on_same_fork = None; + let mut heaviest_same_fork_weight = 0; let stats: Vec<&ForkStats> = frozen_banks .iter() .map(|bank| { @@ -1160,15 +1126,20 @@ impl ReplayStage { .get(&bank.slot()) .expect("Entry in frozen banks must exist in ancestors") .contains(&last_vote) - && stats.vote_threshold { // Descendant of last vote cannot be locked out assert!(!stats.is_locked_out); // ancestors(slot) should not contain the slot itself, - // so we shouldd never get the same bank as the last vote + // so we should never get the same bank as the last vote assert_ne!(bank.slot(), last_vote); - last_votable_on_same_fork = Some(bank.clone()); + // highest weight, lowest slot first. frozen_banks is sorted + // from least slot to greatest slot, so if two banks have + // the same fork weight, the lower slot will be picked + if stats.fork_weight > heaviest_same_fork_weight { + heaviest_bank_on_same_fork = Some(bank.clone()); + heaviest_same_fork_weight = stats.fork_weight; + } } } @@ -1219,17 +1190,15 @@ impl ReplayStage { ("tower_duration", ms as i64, i64), ); - (rv.map(|x| x.0.clone()), last_votable_on_same_fork) + (rv.map(|x| x.0.clone()), heaviest_bank_on_same_fork) } // Given a heaviest bank, `heaviest_bank` and the next votable bank - // `votable_bank_on_same_fork` as the validator's last vote, return + // `heaviest_bank_on_same_fork` as the validator's last vote, return // a bank to vote on, a bank to reset to, pub(crate) fn select_vote_and_reset_forks( heaviest_bank: &Option>, - votable_bank_on_same_fork: &Option>, - earliest_vote_on_fork: u64, - switch_threshold: &mut bool, + heaviest_bank_on_same_fork: &Option>, ancestors: &HashMap>, descendants: &HashMap>, progress: &ProgressMap, @@ -1255,52 +1224,40 @@ impl ReplayStage { let mut failure_reasons = vec![]; let selected_fork = { if let Some(bank) = heaviest_bank { - let selected_same_fork = ancestors - .get(&bank.slot()) - .unwrap() - .contains(&earliest_vote_on_fork); - if selected_same_fork { - // If the heaviest bank is on the same fork as the last - // vote, then there's no need to check the switch threshold. - // Just vote for the latest votable bank on the same fork, - // which is `votable_bank_on_same_fork`. - votable_bank_on_same_fork + let switch_threshold = tower.check_switch_threshold( + bank.slot(), + &ancestors, + &descendants, + &progress, + bank.total_epoch_stake(), + bank.epoch_vote_accounts(bank.epoch()).expect( + "Bank epoch vote accounts must contain entry for the bank's own epoch", + ), + ); + if !switch_threshold { + // If we can't switch, then reset to the the next votable + // bank on the same fork as our last vote, but don't vote + info!( + "Waiting to switch to {}, voting on {:?} on same fork for now", + bank.slot(), + heaviest_bank_on_same_fork.as_ref().map(|b| b.slot()) + ); + failure_reasons.push(HeaviestForkFailures::FailedSwitchThreshold(bank.slot())); + heaviest_bank_on_same_fork + .as_ref() + .map(|b| (b, switch_threshold)) } else { - if !*switch_threshold { - let total_staked = - progress.get_fork_stats(bank.slot()).unwrap().total_staked; - *switch_threshold = tower.check_switch_threshold( - earliest_vote_on_fork, - &ancestors, - &descendants, - &progress, - total_staked, - ); - } - if !*switch_threshold { - // If we can't switch, then vote on the the next votable - // bank on the same fork as our last vote - info!( - "Waiting to switch to {}, voting on {:?} on same fork for now", - bank.slot(), - votable_bank_on_same_fork.as_ref().map(|b| b.slot()) - ); - failure_reasons - .push(HeaviestForkFailures::FailedSwitchThreshold(bank.slot())); - votable_bank_on_same_fork - } else { - // If the switch threshold is observed, halt voting on - // the current fork and attempt to vote/reset Poh/switch to - // theh heaviest bank - heaviest_bank - } + // If the switch threshold is observed, halt voting on + // the current fork and attempt to vote/reset Poh to + // the heaviest bank + heaviest_bank.as_ref().map(|b| (b, switch_threshold)) } } else { - &None + None } }; - if let Some(bank) = selected_fork { + if let Some((bank, switch_threshold)) = selected_fork { let (is_locked_out, vote_threshold, is_leader_slot, fork_weight) = { let fork_stats = progress.get_fork_stats(bank.slot()).unwrap(); let propagated_stats = &progress.get_propagated_stats(bank.slot()).unwrap(); @@ -1323,16 +1280,15 @@ impl ReplayStage { if !propagation_confirmed { failure_reasons.push(HeaviestForkFailures::NoPropagatedConfirmation(bank.slot())); } + if !switch_threshold { + failure_reasons.push(HeaviestForkFailures::FailedSwitchThreshold(bank.slot())); + } - if !is_locked_out && vote_threshold && propagation_confirmed { + if !is_locked_out && vote_threshold && propagation_confirmed && switch_threshold { info!("voting: {} {}", bank.slot(), fork_weight); - ( - selected_fork.clone(), - selected_fork.clone(), - failure_reasons, - ) + (Some(bank.clone()), Some(bank.clone()), failure_reasons) } else { - (None, selected_fork.clone(), failure_reasons) + (None, Some(bank.clone()), failure_reasons) } } else { (None, None, failure_reasons) @@ -1508,7 +1464,6 @@ impl ReplayStage { bank_forks: &RwLock, progress: &mut ProgressMap, accounts_hash_sender: &Option, - earliest_vote_on_fork: &mut u64, all_pubkeys: &mut HashSet>, ) { let old_epoch = bank_forks.read().unwrap().root_bank().epoch(); @@ -1521,7 +1476,6 @@ impl ReplayStage { if old_epoch != new_epoch { all_pubkeys.retain(|x| Rc::strong_count(x) > 1); } - *earliest_vote_on_fork = std::cmp::max(new_root, *earliest_vote_on_fork); progress.handle_new_root(&r_bank_forks); } @@ -2108,30 +2062,10 @@ pub(crate) mod tests { for i in 0..=root { progress.insert(i, ForkProgress::new(Hash::default(), None, None)); } - let mut earliest_vote_on_fork = root - 1; - ReplayStage::handle_new_root( - root, - &bank_forks, - &mut progress, - &None, - &mut earliest_vote_on_fork, - &mut HashSet::new(), - ); + ReplayStage::handle_new_root(root, &bank_forks, &mut progress, &None, &mut HashSet::new()); assert_eq!(bank_forks.read().unwrap().root(), root); assert_eq!(progress.len(), 1); - assert_eq!(earliest_vote_on_fork, root); assert!(progress.get(&root).is_some()); - - earliest_vote_on_fork = root + 1; - ReplayStage::handle_new_root( - root, - &bank_forks, - &mut progress, - &None, - &mut earliest_vote_on_fork, - &mut HashSet::new(), - ); - assert_eq!(earliest_vote_on_fork, root + 1); } #[test]