From 5b74678e37e79f88391cd6da276a6478a0725f4e Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 23 Apr 2021 07:20:10 +0000 Subject: [PATCH] Ingest votes from gossip into fork choice (#16560) (#16724) (cherry picked from commit 4c94f8933f7e34448f57f8a1b213e6949c0e94b8) Co-authored-by: carllin --- core/src/bank_weight_fork_choice.rs | 154 ------ core/src/cluster_slot_state_verifier.rs | 2 +- core/src/consensus.rs | 80 +++- core/src/fork_choice.rs | 6 +- core/src/heaviest_subtree_fork_choice.rs | 43 +- ...latest_validator_votes_for_frozen_banks.rs | 351 ++++++++++++++ core/src/lib.rs | 2 + core/src/replay_stage.rs | 269 ++++++++--- .../unfrozen_gossip_verified_vote_hashes.rs | 132 +++++ local-cluster/tests/local_cluster.rs | 453 +++++++++++++++--- 10 files changed, 1156 insertions(+), 336 deletions(-) delete mode 100644 core/src/bank_weight_fork_choice.rs create mode 100644 core/src/latest_validator_votes_for_frozen_banks.rs create mode 100644 core/src/unfrozen_gossip_verified_vote_hashes.rs diff --git a/core/src/bank_weight_fork_choice.rs b/core/src/bank_weight_fork_choice.rs deleted file mode 100644 index 9b43c14d6d..0000000000 --- a/core/src/bank_weight_fork_choice.rs +++ /dev/null @@ -1,154 +0,0 @@ -use crate::{ - consensus::{ComputedBankState, Tower}, - fork_choice::ForkChoice, - progress_map::{ForkStats, ProgressMap}, -}; -use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::{clock::Slot, timing}; -use std::time::Instant; -use std::{ - collections::{HashMap, HashSet}, - sync::{Arc, RwLock}, -}; - -#[derive(Default)] -pub struct BankWeightForkChoice {} - -impl ForkChoice for BankWeightForkChoice { - fn compute_bank_stats( - &mut self, - bank: &Bank, - _tower: &Tower, - progress: &mut ProgressMap, - computed_bank_state: &ComputedBankState, - ) { - let bank_slot = bank.slot(); - // Only time progress map should be missing a bank slot - // is if this node was the leader for this slot as those banks - // are not replayed in replay_active_banks() - let parent_weight = bank - .parent() - .and_then(|b| progress.get(&b.slot())) - .map(|x| x.fork_stats.fork_weight) - .unwrap_or(0); - - let stats = progress - .get_fork_stats_mut(bank_slot) - .expect("All frozen banks must exist in the Progress map"); - - let ComputedBankState { bank_weight, .. } = computed_bank_state; - stats.weight = *bank_weight; - stats.fork_weight = stats.weight + parent_weight; - } - - // Returns: - // 1) The heaviest overall bank - // 2) The heaviest bank on the same fork as the last vote (doesn't require a - // switching proof to vote for) - fn select_forks( - &self, - frozen_banks: &[Arc], - tower: &Tower, - progress: &ProgressMap, - ancestors: &HashMap>, - _bank_forks: &RwLock, - ) -> (Arc, Option>) { - let tower_start = Instant::now(); - assert!(!frozen_banks.is_empty()); - let num_frozen_banks = frozen_banks.len(); - - trace!("frozen_banks {}", frozen_banks.len()); - let num_old_banks = frozen_banks - .iter() - .filter(|b| b.slot() < tower.root()) - .count(); - - let last_voted_slot = tower.last_voted_slot(); - let mut heaviest_bank_on_same_fork = None; - let mut heaviest_same_fork_weight = 0; - let stats: Vec<&ForkStats> = frozen_banks - .iter() - .map(|bank| { - // Only time progress map should be missing a bank slot - // is if this node was the leader for this slot as those banks - // are not replayed in replay_active_banks() - let stats = progress - .get_fork_stats(bank.slot()) - .expect("All frozen banks must exist in the Progress map"); - - if let Some(last_voted_slot) = last_voted_slot { - if ancestors - .get(&bank.slot()) - .expect("Entry in frozen banks must exist in ancestors") - .contains(&last_voted_slot) - { - // Descendant of last vote cannot be locked out - assert!(!stats.is_locked_out); - - // ancestors(slot) should not contain the slot itself, - // so we should never get the same bank as the last vote - assert_ne!(bank.slot(), last_voted_slot); - // highest weight, lowest slot first. frozen_banks is sorted - // from least slot to greatest slot, so if two banks have - // the same fork weight, the lower slot will be picked - if stats.fork_weight > heaviest_same_fork_weight { - heaviest_bank_on_same_fork = Some(bank.clone()); - heaviest_same_fork_weight = stats.fork_weight; - } - } - } - - stats - }) - .collect(); - let num_not_recent = stats.iter().filter(|s| !s.is_recent).count(); - let num_has_voted = stats.iter().filter(|s| s.has_voted).count(); - let num_empty = stats.iter().filter(|s| s.is_empty).count(); - let num_threshold_failure = stats.iter().filter(|s| !s.vote_threshold).count(); - let num_votable_threshold_failure = stats - .iter() - .filter(|s| s.is_recent && !s.has_voted && !s.vote_threshold) - .count(); - - let mut candidates: Vec<_> = frozen_banks.iter().zip(stats.iter()).collect(); - - //highest weight, lowest slot first - candidates.sort_by_key(|b| (b.1.fork_weight, 0i64 - b.0.slot() as i64)); - let rv = candidates - .last() - .expect("frozen banks was nonempty so candidates must also be nonempty"); - let ms = timing::duration_as_ms(&tower_start.elapsed()); - let weights: Vec<(u128, u64, u64)> = candidates - .iter() - .map(|x| (x.1.weight, x.0.slot(), x.1.block_height)) - .collect(); - debug!( - "@{:?} tower duration: {:?} len: {}/{} weights: {:?}", - timing::timestamp(), - ms, - candidates.len(), - stats.iter().filter(|s| !s.has_voted).count(), - weights, - ); - datapoint_debug!( - "replay_stage-select_forks", - ("frozen_banks", num_frozen_banks as i64, i64), - ("not_recent", num_not_recent as i64, i64), - ("has_voted", num_has_voted as i64, i64), - ("old_banks", num_old_banks as i64, i64), - ("empty_banks", num_empty as i64, i64), - ("threshold_failure", num_threshold_failure as i64, i64), - ( - "votable_threshold_failure", - num_votable_threshold_failure as i64, - i64 - ), - ("tower_duration", ms as i64, i64), - ); - - (rv.0.clone(), heaviest_bank_on_same_fork) - } - - fn mark_fork_invalid_candidate(&mut self, _invalid_slot: Slot) {} - fn mark_fork_valid_candidate(&mut self, _valid_slots: &[Slot]) {} -} diff --git a/core/src/cluster_slot_state_verifier.rs b/core/src/cluster_slot_state_verifier.rs index 51fe168a4f..2bc47e2a75 100644 --- a/core/src/cluster_slot_state_verifier.rs +++ b/core/src/cluster_slot_state_verifier.rs @@ -5,7 +5,7 @@ use crate::{ use solana_sdk::{clock::Slot, hash::Hash}; use std::collections::{BTreeMap, HashMap, HashSet}; -pub type GossipDuplicateConfirmedSlots = BTreeMap; +pub(crate) type GossipDuplicateConfirmedSlots = BTreeMap; type SlotStateHandler = fn(Slot, &Hash, Option<&Hash>, bool, bool) -> Vec; #[derive(PartialEq, Debug)] diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 9df438e8ee..9a69626588 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1,4 +1,7 @@ -use crate::progress_map::{LockoutIntervals, ProgressMap}; +use crate::{ + latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, + progress_map::{LockoutIntervals, ProgressMap}, +}; use chrono::prelude::*; use solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db}; use solana_measure::measure::Measure; @@ -95,7 +98,6 @@ pub(crate) struct ComputedBankState { // Tree of intervals of lockouts of the form [slot, slot + slot.lockout], // keyed by end of the range pub lockout_intervals: LockoutIntervals, - pub pubkey_votes: Arc, } #[frozen_abi(digest = "Eay84NBbJqiMBfE7HHH2o6e51wcvoU79g8zCi5sw6uj3")] @@ -219,6 +221,8 @@ impl Tower { bank_slot: Slot, vote_accounts: F, ancestors: &HashMap>, + get_frozen_hash: impl Fn(Slot) -> Option, + latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, ) -> ComputedBankState where F: IntoIterator, @@ -230,7 +234,6 @@ impl Tower { // Tree of intervals of lockouts of the form [slot, slot + slot.lockout], // keyed by end of the range let mut lockout_intervals = LockoutIntervals::new(); - let mut pubkey_votes = vec![]; for (key, (voted_stake, account)) in vote_accounts { if voted_stake == 0 { continue; @@ -277,8 +280,12 @@ impl Tower { let start_root = vote_state.root_slot; // Add the last vote to update the `heaviest_subtree_fork_choice` - if let Some(last_voted_slot) = vote_state.last_voted_slot() { - pubkey_votes.push((key, last_voted_slot)); + if let Some(last_landed_voted_slot) = vote_state.last_voted_slot() { + latest_validator_votes_for_frozen_banks.check_add_vote( + key, + last_landed_voted_slot, + get_frozen_hash(last_landed_voted_slot), + ); } vote_state.process_slot_vote_unchecked(bank_slot); @@ -341,7 +348,6 @@ impl Tower { total_stake, bank_weight, lockout_intervals, - pubkey_votes: Arc::new(pubkey_votes), } } @@ -1272,11 +1278,13 @@ pub mod test { use super::*; use crate::{ cluster_info_vote_listener::VoteTracker, + cluster_slot_state_verifier::GossipDuplicateConfirmedSlots, cluster_slots::ClusterSlots, fork_choice::SelectVoteAndResetForkResult, - heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, + heaviest_subtree_fork_choice::{HeaviestSubtreeForkChoice, SlotHashKey}, progress_map::{DuplicateStats, ForkProgress}, replay_stage::{HeaviestForkFailures, ReplayStage}, + unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, }; use solana_ledger::{blockstore::make_slot_entries, get_tmp_ledger_path}; use solana_runtime::{ @@ -1300,7 +1308,7 @@ pub mod test { vote_transaction, }; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, fs::{remove_file, OpenOptions}, io::{Read, Seek, SeekFrom, Write}, sync::RwLock, @@ -1315,6 +1323,7 @@ pub mod test { pub bank_forks: RwLock, pub progress: ProgressMap, pub heaviest_subtree_fork_choice: HeaviestSubtreeForkChoice, + pub latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks, } impl VoteSimulator { @@ -1334,6 +1343,8 @@ pub mod test { bank_forks: RwLock::new(bank_forks), progress, heaviest_subtree_fork_choice, + latest_validator_votes_for_frozen_banks: + LatestValidatorVotesForFrozenBanks::default(), } } pub(crate) fn fill_bank_forks( @@ -1415,6 +1426,7 @@ pub mod test { &ClusterSlots::default(), &self.bank_forks, &mut self.heaviest_subtree_fork_choice, + &mut self.latest_validator_votes_for_frozen_banks, ); let vote_bank = self @@ -1461,8 +1473,8 @@ pub mod test { &AbsRequestSender::default(), None, &mut self.heaviest_subtree_fork_choice, - &mut BTreeMap::new(), - &mut BTreeMap::new(), + &mut GossipDuplicateConfirmedSlots::default(), + &mut UnfrozenGossipVerifiedVoteHashes::default(), &mut true, &mut Vec::new(), ) @@ -2130,24 +2142,34 @@ pub mod test { //two accounts voting for slot 0 with 1 token staked let mut accounts = gen_stakes(&[(1, &[0]), (1, &[0])]); accounts.sort_by_key(|(pk, _)| *pk); - let account_latest_votes: PubkeyVotes = - accounts.iter().map(|(pubkey, _)| (*pubkey, 0)).collect(); + let account_latest_votes: Vec<(Pubkey, SlotHashKey)> = accounts + .iter() + .map(|(pubkey, _)| (*pubkey, (0, Hash::default()))) + .collect(); let ancestors = vec![(1, vec![0].into_iter().collect()), (0, HashSet::new())] .into_iter() .collect(); + let mut latest_validator_votes_for_frozen_banks = + LatestValidatorVotesForFrozenBanks::default(); let ComputedBankState { voted_stakes, total_stake, bank_weight, - pubkey_votes, .. - } = Tower::collect_vote_lockouts(&Pubkey::default(), 1, accounts.into_iter(), &ancestors); + } = Tower::collect_vote_lockouts( + &Pubkey::default(), + 1, + accounts.into_iter(), + &ancestors, + |_| Some(Hash::default()), + &mut latest_validator_votes_for_frozen_banks, + ); assert_eq!(voted_stakes[&0], 2); assert_eq!(total_stake, 2); - let mut pubkey_votes = Arc::try_unwrap(pubkey_votes).unwrap(); - pubkey_votes.sort(); - assert_eq!(pubkey_votes, account_latest_votes); + let mut new_votes = latest_validator_votes_for_frozen_banks.take_votes_dirty_set(0); + new_votes.sort(); + assert_eq!(new_votes, account_latest_votes); // Each account has 1 vote in it. After simulating a vote in collect_vote_lockouts, // the account will have 2 votes, with lockout 2 + 4 = 6. So expected weight for @@ -2160,9 +2182,14 @@ pub mod test { //two accounts voting for slots 0..MAX_LOCKOUT_HISTORY with 1 token staked let mut accounts = gen_stakes(&[(1, &votes), (1, &votes)]); accounts.sort_by_key(|(pk, _)| *pk); - let account_latest_votes: PubkeyVotes = accounts + let account_latest_votes: Vec<(Pubkey, SlotHashKey)> = accounts .iter() - .map(|(pubkey, _)| (*pubkey, (MAX_LOCKOUT_HISTORY - 1) as Slot)) + .map(|(pubkey, _)| { + ( + *pubkey, + ((MAX_LOCKOUT_HISTORY - 1) as Slot, Hash::default()), + ) + }) .collect(); let mut tower = Tower::new_for_tests(0, 0.67); let mut ancestors = HashMap::new(); @@ -2184,16 +2211,19 @@ pub mod test { + root_weight; let expected_bank_weight = 2 * vote_account_expected_weight; assert_eq!(tower.lockouts.root_slot, Some(0)); + let mut latest_validator_votes_for_frozen_banks = + LatestValidatorVotesForFrozenBanks::default(); let ComputedBankState { voted_stakes, bank_weight, - pubkey_votes, .. } = Tower::collect_vote_lockouts( &Pubkey::default(), MAX_LOCKOUT_HISTORY as u64, accounts.into_iter(), &ancestors, + |_| Some(Hash::default()), + &mut latest_validator_votes_for_frozen_banks, ); for i in 0..MAX_LOCKOUT_HISTORY { assert_eq!(voted_stakes[&(i as u64)], 2); @@ -2201,9 +2231,9 @@ pub mod test { // should be the sum of all the weights for root assert_eq!(bank_weight, expected_bank_weight); - let mut pubkey_votes = Arc::try_unwrap(pubkey_votes).unwrap(); - pubkey_votes.sort(); - assert_eq!(pubkey_votes, account_latest_votes); + let mut new_votes = latest_validator_votes_for_frozen_banks.take_votes_dirty_set(root.slot); + new_votes.sort(); + assert_eq!(new_votes, account_latest_votes); } #[test] @@ -2500,6 +2530,8 @@ pub mod test { vote_to_evaluate, accounts.clone().into_iter(), &ancestors, + |_| None, + &mut LatestValidatorVotesForFrozenBanks::default(), ); assert!(tower.check_vote_stake_threshold(vote_to_evaluate, &voted_stakes, total_stake,)); @@ -2516,6 +2548,8 @@ pub mod test { vote_to_evaluate, accounts.into_iter(), &ancestors, + |_| None, + &mut LatestValidatorVotesForFrozenBanks::default(), ); assert!(!tower.check_vote_stake_threshold(vote_to_evaluate, &voted_stakes, total_stake,)); } diff --git a/core/src/fork_choice.rs b/core/src/fork_choice.rs index fde5d7da5d..b5e6524359 100644 --- a/core/src/fork_choice.rs +++ b/core/src/fork_choice.rs @@ -1,5 +1,6 @@ use crate::{ - consensus::{ComputedBankState, SwitchForkDecision, Tower}, + consensus::{SwitchForkDecision, Tower}, + latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, progress_map::ProgressMap, replay_stage::HeaviestForkFailures, }; @@ -21,8 +22,7 @@ pub(crate) trait ForkChoice { &mut self, bank: &Bank, tower: &Tower, - progress: &mut ProgressMap, - computed_bank_state: &ComputedBankState, + latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, ); // Returns: diff --git a/core/src/heaviest_subtree_fork_choice.rs b/core/src/heaviest_subtree_fork_choice.rs index 3b48109012..538ed432c5 100644 --- a/core/src/heaviest_subtree_fork_choice.rs +++ b/core/src/heaviest_subtree_fork_choice.rs @@ -1,8 +1,7 @@ use crate::{ - consensus::{ComputedBankState, Tower}, - fork_choice::ForkChoice, - progress_map::ProgressMap, - tree_diff::TreeDiff, + consensus::Tower, fork_choice::ForkChoice, + latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, + progress_map::ProgressMap, tree_diff::TreeDiff, }; use solana_measure::measure::Measure; use solana_runtime::{bank::Bank, bank_forks::BankForks, epoch_stakes::EpochStakes}; @@ -22,7 +21,7 @@ use std::{ use trees::{Tree, TreeWalk}; pub type ForkWeight = u64; -type SlotHashKey = (Slot, Hash); +pub type SlotHashKey = (Slot, Hash); type UpdateOperations = BTreeMap<(SlotHashKey, UpdateLabel), UpdateOperation>; const MAX_ROOT_PRINT_SECONDS: u64 = 30; @@ -163,6 +162,10 @@ impl HeaviestSubtreeForkChoice { heaviest_subtree_fork_choice } + pub fn contains_block(&self, key: &SlotHashKey) -> bool { + self.fork_infos.contains_key(key) + } + pub fn best_slot(&self, key: &SlotHashKey) -> Option { self.fork_infos .get(key) @@ -529,6 +532,15 @@ impl HeaviestSubtreeForkChoice { let (pubkey, new_vote_slot_hash) = pubkey_vote.borrow(); let (new_vote_slot, new_vote_hash) = *new_vote_slot_hash; if new_vote_slot < self.root.0 { + // If the new vote is less than the root we can ignore it. This is because there + // are two cases. Either: + // 1) The validator's latest vote was bigger than the new vote, so we can ignore it + // 2) The validator's latest vote was less than the new vote, then the validator's latest + // vote was also less than root. This means either every node in the current tree has the + // validators stake counted toward it (if the latest vote was an ancestor of the current root), + // OR every node doesn't have this validator's vote counting toward it (if the latest vote + // was not an ancestor of the current root). Thus this validator is essentially a no-op + // and won't affect fork choice. continue; } @@ -744,29 +756,14 @@ impl ForkChoice for HeaviestSubtreeForkChoice { &mut self, bank: &Bank, _tower: &Tower, - progress: &mut ProgressMap, - computed_bank_state: &ComputedBankState, + latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, ) { - let ComputedBankState { pubkey_votes, .. } = computed_bank_state; let mut start = Measure::start("compute_bank_stats_time"); // Update `heaviest_subtree_fork_choice` to find the best fork to build on let root = self.root.0; + let new_votes = latest_validator_votes_for_frozen_banks.take_votes_dirty_set(root); let (best_overall_slot, best_overall_hash) = self.add_votes( - pubkey_votes.iter().filter_map(|(pubkey, slot)| { - if *slot >= root { - Some(( - *pubkey, - ( - *slot, - progress - .get_hash(*slot) - .expect("Votes for ancestors must exist in progress map"), - ), - )) - } else { - None - } - }), + new_votes.into_iter(), bank.epoch_stakes_map(), bank.epoch_schedule(), ); diff --git a/core/src/latest_validator_votes_for_frozen_banks.rs b/core/src/latest_validator_votes_for_frozen_banks.rs new file mode 100644 index 0000000000..6b6b906dc4 --- /dev/null +++ b/core/src/latest_validator_votes_for_frozen_banks.rs @@ -0,0 +1,351 @@ +use crate::heaviest_subtree_fork_choice::SlotHashKey; +use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; +use std::collections::{hash_map::Entry, HashMap}; + +#[derive(Default)] +pub(crate) struct LatestValidatorVotesForFrozenBanks { + // TODO: Clean outdated/unstaked pubkeys from this list. + max_frozen_votes: HashMap)>, + // Pubkeys that had their `max_frozen_votes` updated since the last + // fork choice update + fork_choice_dirty_set: HashMap)>, +} + +impl LatestValidatorVotesForFrozenBanks { + // `frozen_hash.is_some()` if the bank with slot == `vote_slot` is frozen + // Returns whether the vote was actually added, and the latest voted frozen slot + pub(crate) fn check_add_vote( + &mut self, + vote_pubkey: Pubkey, + vote_slot: Slot, + frozen_hash: Option, + ) -> (bool, Option) { + let max_frozen_votes_entry = self.max_frozen_votes.entry(vote_pubkey); + if let Some(frozen_hash) = frozen_hash { + match max_frozen_votes_entry { + Entry::Occupied(mut occupied_entry) => { + let (latest_frozen_vote_slot, latest_frozen_vote_hashes) = + occupied_entry.get_mut(); + if vote_slot > *latest_frozen_vote_slot { + self.fork_choice_dirty_set + .insert(vote_pubkey, (vote_slot, vec![frozen_hash])); + *latest_frozen_vote_slot = vote_slot; + *latest_frozen_vote_hashes = vec![frozen_hash]; + return (true, Some(vote_slot)); + } else if vote_slot == *latest_frozen_vote_slot + && !latest_frozen_vote_hashes.contains(&frozen_hash) + { + let (_, dirty_frozen_hashes) = + self.fork_choice_dirty_set.entry(vote_pubkey).or_default(); + assert!(!dirty_frozen_hashes.contains(&frozen_hash)); + dirty_frozen_hashes.push(frozen_hash); + latest_frozen_vote_hashes.push(frozen_hash); + return (true, Some(vote_slot)); + } else { + // We have newer votes for this validator, we don't care about this vote + return (false, Some(*latest_frozen_vote_slot)); + } + } + + Entry::Vacant(vacant_entry) => { + vacant_entry.insert((vote_slot, vec![frozen_hash])); + self.fork_choice_dirty_set + .insert(vote_pubkey, (vote_slot, vec![frozen_hash])); + return (true, Some(vote_slot)); + } + } + } + + // Non-frozen banks are not inserted because we only track frozen votes in this + // struct + ( + false, + match max_frozen_votes_entry { + Entry::Occupied(occupied_entry) => Some(occupied_entry.get().0), + Entry::Vacant(_) => None, + }, + ) + } + + pub(crate) fn take_votes_dirty_set(&mut self, root: Slot) -> Vec<(Pubkey, SlotHashKey)> { + let new_votes = std::mem::take(&mut self.fork_choice_dirty_set); + new_votes + .into_iter() + .filter(|(_, (slot, _))| *slot >= root) + .flat_map(|(pk, (slot, hashes))| { + hashes + .into_iter() + .map(|hash| (pk, (slot, hash))) + .collect::>() + }) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_latest_validator_votes_for_frozen_banks_check_add_vote() { + let mut latest_validator_votes_for_frozen_banks = + LatestValidatorVotesForFrozenBanks::default(); + + // Case 1: Non-frozen banks shouldn't be added + let vote_pubkey = Pubkey::new_unique(); + let mut vote_slot = 1; + let frozen_hash = None; + assert_eq!( + latest_validator_votes_for_frozen_banks.check_add_vote( + vote_pubkey, + vote_slot, + frozen_hash, + ), + // Non-frozen bank isn't inserted, so should return None for + // the highest voted frozen slot + (false, None) + ); + assert!(latest_validator_votes_for_frozen_banks + .max_frozen_votes + .is_empty()); + assert!(latest_validator_votes_for_frozen_banks + .fork_choice_dirty_set + .is_empty()); + + // Case 2: Frozen vote should be added, but the same vote added again + // shouldn't update state + let num_repeated_iterations = 3; + let frozen_hash = Some(Hash::new_unique()); + for i in 0..num_repeated_iterations { + let expected_result = if i == 0 { + (true, Some(vote_slot)) + } else { + (false, Some(vote_slot)) + }; + assert_eq!( + latest_validator_votes_for_frozen_banks.check_add_vote( + vote_pubkey, + vote_slot, + frozen_hash, + ), + expected_result + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .max_frozen_votes + .get(&vote_pubkey) + .unwrap(), + (vote_slot, vec![frozen_hash.unwrap()]) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .fork_choice_dirty_set + .get(&vote_pubkey) + .unwrap(), + (vote_slot, vec![frozen_hash.unwrap()]) + ); + } + + // Case 3: Adding duplicate vote for same slot should update the state + let duplicate_frozen_hash = Some(Hash::new_unique()); + let all_frozen_hashes = vec![frozen_hash.unwrap(), duplicate_frozen_hash.unwrap()]; + assert_eq!( + latest_validator_votes_for_frozen_banks.check_add_vote( + vote_pubkey, + vote_slot, + duplicate_frozen_hash, + ), + (true, Some(vote_slot)) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .max_frozen_votes + .get(&vote_pubkey) + .unwrap(), + (vote_slot, all_frozen_hashes.clone()) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .fork_choice_dirty_set + .get(&vote_pubkey) + .unwrap(), + (vote_slot, all_frozen_hashes.clone()) + ); + + // Case 4: Adding duplicate vote that is not frozen should not update the state + let frozen_hash = None; + assert_eq!( + latest_validator_votes_for_frozen_banks.check_add_vote( + vote_pubkey, + vote_slot, + frozen_hash, + ), + (false, Some(vote_slot)) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .max_frozen_votes + .get(&vote_pubkey) + .unwrap(), + (vote_slot, all_frozen_hashes.clone()) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .fork_choice_dirty_set + .get(&vote_pubkey) + .unwrap(), + (vote_slot, all_frozen_hashes.clone()) + ); + + // Case 5: Adding a vote for a new higher slot that is not yet frozen + // should not update the state + let frozen_hash = None; + let old_vote_slot = vote_slot; + vote_slot += 1; + assert_eq!( + latest_validator_votes_for_frozen_banks.check_add_vote( + vote_pubkey, + vote_slot, + frozen_hash, + ), + (false, Some(old_vote_slot)) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .max_frozen_votes + .get(&vote_pubkey) + .unwrap(), + (old_vote_slot, all_frozen_hashes.clone()) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .fork_choice_dirty_set + .get(&vote_pubkey) + .unwrap(), + (old_vote_slot, all_frozen_hashes) + ); + + // Case 6: Adding a vote for a new higher slot that *is* frozen + // should upate the state + let frozen_hash = Some(Hash::new_unique()); + assert_eq!( + latest_validator_votes_for_frozen_banks.check_add_vote( + vote_pubkey, + vote_slot, + frozen_hash, + ), + (true, Some(vote_slot)) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .max_frozen_votes + .get(&vote_pubkey) + .unwrap(), + (vote_slot, vec![frozen_hash.unwrap()]) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .fork_choice_dirty_set + .get(&vote_pubkey) + .unwrap(), + (vote_slot, vec![frozen_hash.unwrap()]) + ); + + // Case 7: Adding a vote for a new pubkey should also update the state + vote_slot += 1; + let frozen_hash = Some(Hash::new_unique()); + let vote_pubkey = Pubkey::new_unique(); + assert_eq!( + latest_validator_votes_for_frozen_banks.check_add_vote( + vote_pubkey, + vote_slot, + frozen_hash, + ), + (true, Some(vote_slot)) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .max_frozen_votes + .get(&vote_pubkey) + .unwrap(), + (vote_slot, vec![frozen_hash.unwrap()]) + ); + assert_eq!( + *latest_validator_votes_for_frozen_banks + .fork_choice_dirty_set + .get(&vote_pubkey) + .unwrap(), + (vote_slot, vec![frozen_hash.unwrap()]) + ); + } + + #[test] + fn test_latest_validator_votes_for_frozen_banks_take_votes_dirty_set() { + let mut latest_validator_votes_for_frozen_banks = + LatestValidatorVotesForFrozenBanks::default(); + let num_validators = 10; + + let setup_dirty_set = + |latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks| { + (0..num_validators) + .flat_map(|vote_slot| { + let vote_pubkey = Pubkey::new_unique(); + let frozen_hash1 = Hash::new_unique(); + assert_eq!( + latest_validator_votes_for_frozen_banks.check_add_vote( + vote_pubkey, + vote_slot, + Some(frozen_hash1), + ), + // This vote slot was frozen, and is the highest slot inserted thus far, + // so the highest vote should be Some(vote_slot) + (true, Some(vote_slot)) + ); + // Add a duplicate + let frozen_hash2 = Hash::new_unique(); + assert_eq!( + latest_validator_votes_for_frozen_banks.check_add_vote( + vote_pubkey, + vote_slot, + Some(frozen_hash2), + ), + // This vote slot was frozen, and is for a duplicate version of the highest slot + // inserted thus far, so the highest vote should be Some(vote_slot). + (true, Some(vote_slot)) + ); + vec![ + (vote_pubkey, (vote_slot, frozen_hash1)), + (vote_pubkey, (vote_slot, frozen_hash2)), + ] + }) + .collect() + }; + + // Taking all the dirty votes >= 0 will return everything + let root = 0; + let mut expected_dirty_set: Vec<(Pubkey, SlotHashKey)> = + setup_dirty_set(&mut latest_validator_votes_for_frozen_banks); + let mut votes_dirty_set_output = + latest_validator_votes_for_frozen_banks.take_votes_dirty_set(root); + votes_dirty_set_output.sort(); + expected_dirty_set.sort(); + assert_eq!(votes_dirty_set_output, expected_dirty_set); + assert!(latest_validator_votes_for_frozen_banks + .take_votes_dirty_set(0) + .is_empty()); + + // Taking all the firty votes >= num_validators - 1 will only return the last vote + let root = num_validators - 1; + let dirty_set = setup_dirty_set(&mut latest_validator_votes_for_frozen_banks); + let mut expected_dirty_set: Vec<(Pubkey, SlotHashKey)> = + dirty_set[dirty_set.len() - 2..dirty_set.len()].to_vec(); + let mut votes_dirty_set_output = + latest_validator_votes_for_frozen_banks.take_votes_dirty_set(root); + votes_dirty_set_output.sort(); + expected_dirty_set.sort(); + assert_eq!(votes_dirty_set_output, expected_dirty_set); + assert!(latest_validator_votes_for_frozen_banks + .take_votes_dirty_set(0) + .is_empty()); + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index a09a068bca..bb325a421f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -41,6 +41,7 @@ pub mod fork_choice; pub mod gen_keys; pub mod gossip_service; pub mod heaviest_subtree_fork_choice; +pub mod latest_validator_votes_for_frozen_banks; pub mod ledger_cleanup_service; pub mod non_circulating_supply; pub mod optimistic_confirmation_verifier; @@ -77,6 +78,7 @@ pub mod tpu; pub mod transaction_status_service; pub mod tree_diff; pub mod tvu; +pub mod unfrozen_gossip_verified_vote_hashes; pub mod validator; pub mod verified_vote_packets; pub mod vote_stake_tracker; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 341942359c..7d5689843d 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -15,6 +15,7 @@ use crate::{ }, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, + latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, progress_map::{DuplicateStats, ForkProgress, ProgressMap, PropagatedStats}, @@ -22,6 +23,7 @@ use crate::{ result::Result, rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, + unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, window_service::DuplicateSlotReceiver, }; use solana_client::rpc_response::SlotUpdate; @@ -68,8 +70,6 @@ const MAX_VOTE_SIGNATURES: usize = 200; pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; -pub type GossipVerifiedVoteHashes = BTreeMap>>; - #[derive(PartialEq, Debug)] pub(crate) enum HeaviestForkFailures { LockedOut(u64), @@ -138,7 +138,7 @@ pub struct ReplayTiming { bank_count: u64, process_gossip_duplicate_confirmed_slots_elapsed: u64, process_duplicate_slots_elapsed: u64, - process_gossip_verified_vote_hashes_elapsed: u64, + process_unfrozen_gossip_verified_vote_hashes_elapsed: u64, } impl ReplayTiming { #[allow(clippy::too_many_arguments)] @@ -158,7 +158,7 @@ impl ReplayTiming { heaviest_fork_failures_elapsed: u64, bank_count: u64, process_gossip_duplicate_confirmed_slots_elapsed: u64, - process_gossip_verified_vote_hashes_elapsed: u64, + process_unfrozen_gossip_verified_vote_hashes_elapsed: u64, process_duplicate_slots_elapsed: u64, ) { self.collect_frozen_banks_elapsed += collect_frozen_banks_elapsed; @@ -176,8 +176,8 @@ impl ReplayTiming { self.bank_count += bank_count; self.process_gossip_duplicate_confirmed_slots_elapsed += process_gossip_duplicate_confirmed_slots_elapsed; - self.process_gossip_verified_vote_hashes_elapsed += - process_gossip_verified_vote_hashes_elapsed; + self.process_unfrozen_gossip_verified_vote_hashes_elapsed += + process_unfrozen_gossip_verified_vote_hashes_elapsed; self.process_duplicate_slots_elapsed += process_duplicate_slots_elapsed; let now = timestamp(); let elapsed_ms = now - self.last_print; @@ -233,8 +233,8 @@ impl ReplayTiming { i64 ), ( - "process_gossip_verified_vote_hashes_elapsed", - self.process_gossip_verified_vote_hashes_elapsed as i64, + "process_unfrozen_gossip_verified_vote_hashes_elapsed", + self.process_unfrozen_gossip_verified_vote_hashes_elapsed as i64, i64 ), ( @@ -329,8 +329,9 @@ impl ReplayStage { let mut partition_exists = false; let mut skipped_slots_info = SkippedSlotsInfo::default(); let mut replay_timing = ReplayTiming::default(); - let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = BTreeMap::new(); - let mut gossip_verified_vote_hashes: GossipVerifiedVoteHashes = BTreeMap::new(); + let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = GossipDuplicateConfirmedSlots::default(); + let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes = UnfrozenGossipVerifiedVoteHashes::default(); + let mut latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks = LatestValidatorVotesForFrozenBanks::default(); let mut voted_signatures = Vec::new(); let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader; loop { @@ -377,6 +378,8 @@ impl ReplayStage { &gossip_duplicate_confirmed_slots, &ancestors, &descendants, + &mut unfrozen_gossip_verified_vote_hashes, + &mut latest_validator_votes_for_frozen_banks, ); replay_active_banks_time.stop(); Self::report_memory(&allocated, "replay_active_banks", start); @@ -415,12 +418,15 @@ impl ReplayStage { // and switching proofs because these may be votes that haven't yet been // included in a block, so we may not have yet observed these votes just // by replaying blocks. - let mut process_gossip_verified_vote_hashes_time = Measure::start("process_gossip_duplicate_confirmed_slots"); - Self::process_gossip_verified_vote_hashes( + let mut process_unfrozen_gossip_verified_vote_hashes_time = Measure::start("process_gossip_duplicate_confirmed_slots"); + /*Self::process_gossip_verified_vote_hashes( &gossip_verified_vote_hash_receiver, - &mut gossip_verified_vote_hashes, - ); - process_gossip_verified_vote_hashes_time.stop(); + &mut unfrozen_gossip_verified_vote_hashes, + &heaviest_subtree_fork_choice, + &mut latest_validator_votes_for_frozen_banks, + );*/ + for _ in gossip_verified_vote_hash_receiver.try_iter() {} + process_unfrozen_gossip_verified_vote_hashes_time.stop(); // Check to remove any duplicated slots from fork choice let mut process_duplicate_slots_time = Measure::start("process_duplicate_slots"); @@ -459,6 +465,7 @@ impl ReplayStage { &cluster_slots, &bank_forks, &mut heaviest_subtree_fork_choice, + &mut latest_validator_votes_for_frozen_banks, ); compute_bank_stats_time.stop(); @@ -557,7 +564,7 @@ impl ReplayStage { &cache_block_time_sender, &bank_notification_sender, &mut gossip_duplicate_confirmed_slots, - &mut gossip_verified_vote_hashes, + &mut unfrozen_gossip_verified_vote_hashes, &mut voted_signatures, &mut has_new_vote_been_rooted, ); @@ -696,7 +703,7 @@ impl ReplayStage { heaviest_fork_failures_time.as_us(), if did_complete_bank {1} else {0}, process_gossip_duplicate_confirmed_slots_time.as_us(), - process_gossip_verified_vote_hashes_time.as_us(), + process_unfrozen_gossip_verified_vote_hashes_time.as_us(), process_duplicate_slots_time.as_us(), ); } @@ -942,18 +949,23 @@ impl ReplayStage { } } + #[cfg(test)] fn process_gossip_verified_vote_hashes( gossip_verified_vote_hash_receiver: &GossipVerifiedVoteHashReceiver, - gossip_verified_vote_hashes: &mut GossipVerifiedVoteHashes, + unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, + heaviest_subtree_fork_choice: &HeaviestSubtreeForkChoice, + latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, ) { for (pubkey, slot, hash) in gossip_verified_vote_hash_receiver.try_iter() { + let is_frozen = heaviest_subtree_fork_choice.contains_block(&(slot, hash)); // cluster_info_vote_listener will ensure it doesn't push duplicates - gossip_verified_vote_hashes - .entry(slot) - .or_default() - .entry(hash) - .or_default() - .push(pubkey); + unfrozen_gossip_verified_vote_hashes.add_vote( + pubkey, + slot, + hash, + is_frozen, + latest_validator_votes_for_frozen_banks, + ) } } @@ -1300,7 +1312,7 @@ impl ReplayStage { cache_block_time_sender: &Option, bank_notification_sender: &Option, gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, - gossip_verified_vote_hashes: &mut GossipVerifiedVoteHashes, + unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, vote_signatures: &mut Vec, has_new_vote_been_rooted: &mut bool, ) { @@ -1355,7 +1367,7 @@ impl ReplayStage { highest_confirmed_root, heaviest_subtree_fork_choice, gossip_duplicate_confirmed_slots, - gossip_verified_vote_hashes, + unfrozen_gossip_verified_vote_hashes, has_new_vote_been_rooted, vote_signatures, ); @@ -1558,6 +1570,8 @@ impl ReplayStage { gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, ancestors: &HashMap>, descendants: &HashMap>, + unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, + latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -1672,6 +1686,18 @@ impl ReplayStage { .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); } + let bank_hash = bank.hash(); + if let Some(new_frozen_voters) = + unfrozen_gossip_verified_vote_hashes.remove_slot_hash(bank.slot(), &bank_hash) + { + for pubkey in new_frozen_voters { + latest_validator_votes_for_frozen_banks.check_add_vote( + pubkey, + bank.slot(), + Some(bank_hash), + ); + } + } Self::record_rewards(&bank, &rewards_recorder_sender); } else { trace!( @@ -1697,6 +1723,7 @@ impl ReplayStage { cluster_slots: &ClusterSlots, bank_forks: &RwLock, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, ) -> Vec { frozen_banks.sort_by_key(|bank| bank.slot()); let mut new_stats = vec![]; @@ -1716,14 +1743,15 @@ impl ReplayStage { bank_slot, bank.vote_accounts().into_iter(), &ancestors, + |slot| progress.get_hash(slot), + latest_validator_votes_for_frozen_banks, ); // Notify any listeners of the votes found in this newly computed // bank heaviest_subtree_fork_choice.compute_bank_stats( &bank, tower, - progress, - &computed_bank_state, + latest_validator_votes_for_frozen_banks, ); let ComputedBankState { voted_stakes, @@ -2224,7 +2252,7 @@ impl ReplayStage { highest_confirmed_root: Option, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, - gossip_verified_vote_hashes: &mut GossipVerifiedVoteHashes, + unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, has_new_vote_been_rooted: &mut bool, voted_signatures: &mut Vec, ) { @@ -2252,9 +2280,7 @@ impl ReplayStage { // gossip_confirmed_slots now only contains entries >= `new_root` std::mem::swap(gossip_duplicate_confirmed_slots, &mut slots_ge_root); - let mut slots_ge_root = gossip_verified_vote_hashes.split_off(&new_root); - // gossip_verified_vote_hashes now only contains entries >= `new_root` - std::mem::swap(gossip_verified_vote_hashes, &mut slots_ge_root); + unfrozen_gossip_verified_vote_hashes.set_root(new_root); } fn generate_new_bank_forks( @@ -2449,7 +2475,7 @@ pub(crate) mod tests { #[test] fn test_is_partition_detected() { - let (bank_forks, _) = setup_forks(); + let VoteSimulator { bank_forks, .. } = setup_forks(); let ancestors = bank_forks.read().unwrap().ancestors(); // Last vote 1 is an ancestor of the heaviest slot 3, no partition assert!(!ReplayStage::is_partition_detected(&ancestors, 1, 3)); @@ -2668,11 +2694,13 @@ pub(crate) mod tests { .into_iter() .map(|s| (s, Hash::default())) .collect(); - let mut gossip_verified_vote_hashes: GossipVerifiedVoteHashes = - vec![root - 1, root, root + 1] - .into_iter() - .map(|s| (s, HashMap::new())) - .collect(); + let mut unfrozen_gossip_verified_vote_hashes: UnfrozenGossipVerifiedVoteHashes = + UnfrozenGossipVerifiedVoteHashes { + votes_per_slot: vec![root - 1, root, root + 1] + .into_iter() + .map(|s| (s, HashMap::new())) + .collect(), + }; ReplayStage::handle_new_root( root, &bank_forks, @@ -2681,7 +2709,7 @@ pub(crate) mod tests { None, &mut heaviest_subtree_fork_choice, &mut gossip_duplicate_confirmed_slots, - &mut gossip_verified_vote_hashes, + &mut unfrozen_gossip_verified_vote_hashes, &mut true, &mut Vec::new(), ); @@ -2697,7 +2725,8 @@ pub(crate) mod tests { vec![root, root + 1] ); assert_eq!( - gossip_verified_vote_hashes + unfrozen_gossip_verified_vote_hashes + .votes_per_slot .keys() .cloned() .collect::>(), @@ -2748,8 +2777,8 @@ pub(crate) mod tests { &AbsRequestSender::default(), Some(confirmed_root), &mut heaviest_subtree_fork_choice, - &mut BTreeMap::new(), - &mut BTreeMap::new(), + &mut GossipDuplicateConfirmedSlots::default(), + &mut UnfrozenGossipVerifiedVoteHashes::default(), &mut true, &mut Vec::new(), ); @@ -3292,6 +3321,8 @@ pub(crate) mod tests { let (bank_forks, mut progress, mut heaviest_subtree_fork_choice) = initialize_state(&keypairs, 10_000); + let mut latest_validator_votes_for_frozen_banks = + LatestValidatorVotesForFrozenBanks::default(); let bank0 = bank_forks.get(0).unwrap().clone(); let my_keypairs = keypairs.get(&node_pubkey).unwrap(); let vote_tx = vote_transaction::new_vote_transaction( @@ -3329,6 +3360,7 @@ pub(crate) mod tests { &ClusterSlots::default(), &bank_forks, &mut heaviest_subtree_fork_choice, + &mut latest_validator_votes_for_frozen_banks, ); // bank 0 has no votes, should not send any votes on the channel @@ -3379,6 +3411,7 @@ pub(crate) mod tests { &ClusterSlots::default(), &bank_forks, &mut heaviest_subtree_fork_choice, + &mut latest_validator_votes_for_frozen_banks, ); // Bank 1 had one vote @@ -3414,6 +3447,7 @@ pub(crate) mod tests { &ClusterSlots::default(), &bank_forks, &mut heaviest_subtree_fork_choice, + &mut latest_validator_votes_for_frozen_banks, ); // No new stats should have been computed assert!(newly_computed.is_empty()); @@ -3438,7 +3472,8 @@ pub(crate) mod tests { .cloned() .collect(); let mut heaviest_subtree_fork_choice = &mut vote_simulator.heaviest_subtree_fork_choice; - + let mut latest_validator_votes_for_frozen_banks = + LatestValidatorVotesForFrozenBanks::default(); let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); ReplayStage::compute_bank_stats( &node_pubkey, @@ -3450,6 +3485,7 @@ pub(crate) mod tests { &ClusterSlots::default(), &vote_simulator.bank_forks, &mut heaviest_subtree_fork_choice, + &mut latest_validator_votes_for_frozen_banks, ); let bank1 = vote_simulator @@ -3529,6 +3565,7 @@ pub(crate) mod tests { &ClusterSlots::default(), &vote_simulator.bank_forks, &mut vote_simulator.heaviest_subtree_fork_choice, + &mut vote_simulator.latest_validator_votes_for_frozen_banks, ); frozen_banks.sort_by_key(|bank| bank.slot()); @@ -4208,7 +4245,11 @@ pub(crate) mod tests { #[test] fn test_purge_unconfirmed_duplicate_slot() { - let (bank_forks, mut progress) = setup_forks(); + let VoteSimulator { + bank_forks, + mut progress, + .. + } = setup_forks(); let mut descendants = bank_forks.read().unwrap().descendants().clone(); let mut ancestors = bank_forks.read().unwrap().ancestors(); @@ -4268,7 +4309,7 @@ pub(crate) mod tests { #[test] fn test_purge_ancestors_descendants() { - let (bank_forks, _) = setup_forks(); + let VoteSimulator { bank_forks, .. } = setup_forks(); // Purge branch rooted at slot 2 let mut descendants = bank_forks.read().unwrap().descendants().clone(); @@ -4374,6 +4415,7 @@ pub(crate) mod tests { &ClusterSlots::default(), &bank_forks, &mut HeaviestSubtreeForkChoice::new_from_bank_forks(&bank_forks.read().unwrap()), + &mut LatestValidatorVotesForFrozenBanks::default(), ); // Check status is true @@ -4416,8 +4458,6 @@ pub(crate) mod tests { Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"), ); let mut tower = Tower::new_for_tests(8, 0.67); - let mut heaviest_subtree_fork_choice = - HeaviestSubtreeForkChoice::new_from_bank_forks(&bank_forks.read().unwrap()); // All forks have same weight so heaviest bank to vote/reset on should be the tip of // the fork with the lower slot @@ -4425,7 +4465,8 @@ pub(crate) mod tests { &bank_forks, &mut progress, &mut tower, - &mut heaviest_subtree_fork_choice, + &mut vote_simulator.heaviest_subtree_fork_choice, + &mut vote_simulator.latest_validator_votes_for_frozen_banks, ); assert_eq!(vote_fork.unwrap(), 4); assert_eq!(reset_fork.unwrap(), 4); @@ -4452,7 +4493,7 @@ pub(crate) mod tests { &ancestors, &descendants, &mut progress, - &mut heaviest_subtree_fork_choice, + &mut vote_simulator.heaviest_subtree_fork_choice, SlotStateUpdate::Duplicate, ); @@ -4460,7 +4501,8 @@ pub(crate) mod tests { &bank_forks, &mut progress, &mut tower, - &mut heaviest_subtree_fork_choice, + &mut vote_simulator.heaviest_subtree_fork_choice, + &mut vote_simulator.latest_validator_votes_for_frozen_banks, ); assert!(vote_fork.is_none()); assert_eq!(reset_fork.unwrap(), 3); @@ -4479,7 +4521,7 @@ pub(crate) mod tests { &ancestors, &descendants, &mut progress, - &mut heaviest_subtree_fork_choice, + &mut vote_simulator.heaviest_subtree_fork_choice, SlotStateUpdate::Duplicate, ); @@ -4487,7 +4529,8 @@ pub(crate) mod tests { &bank_forks, &mut progress, &mut tower, - &mut heaviest_subtree_fork_choice, + &mut vote_simulator.heaviest_subtree_fork_choice, + &mut vote_simulator.latest_validator_votes_for_frozen_banks, ); // Should now pick the next heaviest fork that is not a descendant of 2, which is 6. @@ -4506,14 +4549,15 @@ pub(crate) mod tests { &ancestors, &descendants, &mut progress, - &mut heaviest_subtree_fork_choice, + &mut vote_simulator.heaviest_subtree_fork_choice, SlotStateUpdate::DuplicateConfirmed, ); let (vote_fork, reset_fork) = run_compute_and_select_forks( &bank_forks, &mut progress, &mut tower, - &mut heaviest_subtree_fork_choice, + &mut vote_simulator.heaviest_subtree_fork_choice, + &mut vote_simulator.latest_validator_votes_for_frozen_banks, ); // Should now pick the heaviest fork 4 again, but lockouts apply so fork 4 // is not votable, which avoids voting for 4 again. @@ -4521,11 +4565,123 @@ pub(crate) mod tests { assert_eq!(reset_fork.unwrap(), 4); } + #[test] + fn test_gossip_vote_for_unrooted_slot() { + let VoteSimulator { + bank_forks, + mut heaviest_subtree_fork_choice, + mut latest_validator_votes_for_frozen_banks, + mut progress, + vote_pubkeys, + .. + } = setup_forks(); + + let vote_pubkey = vote_pubkeys[0]; + let mut unfrozen_gossip_verified_vote_hashes = UnfrozenGossipVerifiedVoteHashes::default(); + let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded(); + + // Cast a vote for slot 3 on one fork + let vote_slot = 3; + let vote_bank = bank_forks.read().unwrap().get(vote_slot).unwrap().clone(); + gossip_verified_vote_hash_sender + .send((vote_pubkey, vote_slot, vote_bank.hash())) + .expect("Send should succeed"); + ReplayStage::process_gossip_verified_vote_hashes( + &gossip_verified_vote_hash_receiver, + &mut unfrozen_gossip_verified_vote_hashes, + &heaviest_subtree_fork_choice, + &mut latest_validator_votes_for_frozen_banks, + ); + + // Pick the best fork + heaviest_subtree_fork_choice.compute_bank_stats( + &vote_bank, + &Tower::default(), + &mut latest_validator_votes_for_frozen_banks, + ); + assert_eq!(heaviest_subtree_fork_choice.best_overall_slot().0, 6); + + // Now send another vote for a frozen bank on the other fork, where the new vote + // is bigger than the last vote + let bigger_vote_slot = 4; + let bigger_vote_bank = bank_forks + .read() + .unwrap() + .get(bigger_vote_slot) + .unwrap() + .clone(); + assert!(heaviest_subtree_fork_choice + .contains_block(&(bigger_vote_slot, bigger_vote_bank.hash()))); + gossip_verified_vote_hash_sender + .send((vote_pubkey, bigger_vote_slot, bigger_vote_bank.hash())) + .expect("Send should succeed"); + ReplayStage::process_gossip_verified_vote_hashes( + &gossip_verified_vote_hash_receiver, + &mut unfrozen_gossip_verified_vote_hashes, + &heaviest_subtree_fork_choice, + &mut latest_validator_votes_for_frozen_banks, + ); + + // Now set a root for a slot on the previously voted fork thats smaller than the new vote + let new_root = 3; + ReplayStage::handle_new_root( + new_root, + &bank_forks, + &mut progress, + &AbsRequestSender::default(), + None, + &mut heaviest_subtree_fork_choice, + &mut GossipDuplicateConfirmedSlots::default(), + &mut unfrozen_gossip_verified_vote_hashes, + &mut true, + &mut vec![], + ); + + // Add a new bank, freeze it + let parent_bank = bank_forks.read().unwrap().get(6).unwrap().clone(); + let new_bank = Bank::new_from_parent(&parent_bank, &Pubkey::default(), 7); + bank_forks.write().unwrap().insert(new_bank); + let new_bank = bank_forks.read().unwrap().get(7).unwrap().clone(); + new_bank.freeze(); + heaviest_subtree_fork_choice.add_new_leaf_slot( + (new_bank.slot(), new_bank.hash()), + Some((parent_bank.slot(), parent_bank.hash())), + ); + + // Compute bank stats on new slot + heaviest_subtree_fork_choice.compute_bank_stats( + &new_bank, + &Tower::default(), + &mut latest_validator_votes_for_frozen_banks, + ); + // Even though the `bigger_vote_slot` no longer exists in the fork choice tree, + // this vote should remove the previous vote's weight because we know there + // was a later vote + let old_vote_node = (vote_slot, vote_bank.hash()); + assert_eq!( + heaviest_subtree_fork_choice + .stake_voted_at(&old_vote_node) + .unwrap(), + 0 + ); + assert_eq!( + heaviest_subtree_fork_choice + .stake_voted_subtree(&old_vote_node) + .unwrap(), + 0 + ); + assert_eq!( + heaviest_subtree_fork_choice.best_overall_slot(), + (new_bank.slot(), new_bank.hash()) + ); + } + fn run_compute_and_select_forks( bank_forks: &RwLock, progress: &mut ProgressMap, tower: &mut Tower, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, ) -> (Option, Option) { let mut frozen_banks: Vec<_> = bank_forks .read() @@ -4546,6 +4702,7 @@ pub(crate) mod tests { &ClusterSlots::default(), &bank_forks, heaviest_subtree_fork_choice, + latest_validator_votes_for_frozen_banks, ); let (heaviest_bank, heaviest_bank_on_same_fork) = heaviest_subtree_fork_choice .select_forks(&frozen_banks, &tower, &progress, &ancestors, bank_forks); @@ -4568,7 +4725,7 @@ pub(crate) mod tests { ) } - fn setup_forks() -> (RwLock, ProgressMap) { + fn setup_forks() -> VoteSimulator { /* Build fork structure: @@ -4588,7 +4745,7 @@ pub(crate) mod tests { let mut vote_simulator = VoteSimulator::new(1); vote_simulator.fill_bank_forks(forks, &HashMap::new()); - (vote_simulator.bank_forks, vote_simulator.progress) + vote_simulator } fn check_map_eq( diff --git a/core/src/unfrozen_gossip_verified_vote_hashes.rs b/core/src/unfrozen_gossip_verified_vote_hashes.rs new file mode 100644 index 0000000000..be2ccbdab9 --- /dev/null +++ b/core/src/unfrozen_gossip_verified_vote_hashes.rs @@ -0,0 +1,132 @@ +use crate::latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks; +use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; +use std::collections::{BTreeMap, HashMap}; + +#[derive(Default)] +pub(crate) struct UnfrozenGossipVerifiedVoteHashes { + pub votes_per_slot: BTreeMap>>, +} + +impl UnfrozenGossipVerifiedVoteHashes { + // Update `latest_validator_votes_for_frozen_banks` if gossip has seen a newer vote + // for a frozen bank. + #[allow(dead_code)] + pub(crate) fn add_vote( + &mut self, + pubkey: Pubkey, + vote_slot: Slot, + hash: Hash, + is_frozen: bool, + latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, + ) { + // If this is a frozen bank, then we need to update the `latest_validator_votes_for_frozen_banks` + let frozen_hash = if is_frozen { Some(hash) } else { None }; + let (was_added, latest_frozen_vote_slot) = + latest_validator_votes_for_frozen_banks.check_add_vote(pubkey, vote_slot, frozen_hash); + + if !was_added + && latest_frozen_vote_slot + .map(|latest_frozen_vote_slot| vote_slot >= latest_frozen_vote_slot) + // If there's no latest frozen vote slot yet, then we should also insert + .unwrap_or(true) + { + // At this point it must be that: + // 1) `vote_slot` was not yet frozen + // 2) and `vote_slot` >= than the latest frozen vote slot. + + // Thus we want to record this vote for later, in case a slot with this `vote_slot` + hash gets + // frozen later + self.votes_per_slot + .entry(vote_slot) + .or_default() + .entry(hash) + .or_default() + .push(pubkey); + } + } + + // Cleanup `votes_per_slot` based on new roots + pub(crate) fn set_root(&mut self, new_root: Slot) { + let mut slots_ge_root = self.votes_per_slot.split_off(&new_root); + // `self.votes_per_slot` now only contains entries >= `new_root` + std::mem::swap(&mut self.votes_per_slot, &mut slots_ge_root); + } + + pub(crate) fn remove_slot_hash(&mut self, slot: Slot, hash: &Hash) -> Option> { + self.votes_per_slot.get_mut(&slot).and_then(|slot_hashes| { + slot_hashes.remove(hash) + // If `slot_hashes` becomes empty, it'll be removed by `set_root()` later + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_unfrozen_gossip_verified_vote_hashes_add_vote() { + let mut unfrozen_gossip_verified_vote_hashes = UnfrozenGossipVerifiedVoteHashes::default(); + let mut latest_validator_votes_for_frozen_banks = + LatestValidatorVotesForFrozenBanks::default(); + let num_validators = 10; + let validator_keys: Vec = std::iter::repeat_with(Pubkey::new_unique) + .take(num_validators) + .collect(); + + // Case 1: Frozen banks shouldn't be added + let frozen_vote_slot = 1; + let num_repeated_iterations = 10; + for _ in 0..num_repeated_iterations { + let hash = Hash::new_unique(); + let is_frozen = true; + for vote_pubkey in validator_keys.iter() { + unfrozen_gossip_verified_vote_hashes.add_vote( + *vote_pubkey, + frozen_vote_slot, + hash, + is_frozen, + &mut latest_validator_votes_for_frozen_banks, + ); + } + + assert!(unfrozen_gossip_verified_vote_hashes + .votes_per_slot + .is_empty()); + } + + // Case 2: Other >= non-frozen banks should be added in case they're frozen later + for unfrozen_vote_slot in &[frozen_vote_slot - 1, frozen_vote_slot, frozen_vote_slot + 1] { + // If the vote slot is smaller than the latest known frozen `vote_slot` + // for each pubkey (which was added above), then they shouldn't be added + let num_duplicate_hashes = 10; + for _ in 0..num_duplicate_hashes { + let hash = Hash::new_unique(); + let is_frozen = false; + for vote_pubkey in validator_keys.iter() { + unfrozen_gossip_verified_vote_hashes.add_vote( + *vote_pubkey, + *unfrozen_vote_slot, + hash, + is_frozen, + &mut latest_validator_votes_for_frozen_banks, + ); + } + } + if *unfrozen_vote_slot >= frozen_vote_slot { + let vote_hashes_map = unfrozen_gossip_verified_vote_hashes + .votes_per_slot + .get(&unfrozen_vote_slot) + .unwrap(); + assert_eq!(vote_hashes_map.len(), num_duplicate_hashes); + for pubkey_votes in vote_hashes_map.values() { + assert_eq!(*pubkey_votes, validator_keys); + } + } else { + assert!(unfrozen_gossip_verified_vote_hashes + .votes_per_slot + .is_empty()); + } + } + } +} diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index d4d9fce1d7..ab250c1102 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -28,7 +28,7 @@ use solana_ledger::{ leader_schedule::LeaderSchedule, }; use solana_local_cluster::{ - cluster::Cluster, + cluster::{Cluster, ClusterValidatorInfo}, cluster_tests, local_cluster::{ClusterConfig, LocalCluster}, validator_configs::*, @@ -40,7 +40,7 @@ use solana_runtime::{ use solana_sdk::{ account::AccountSharedData, client::{AsyncClient, SyncClient}, - clock::{self, Slot}, + clock::{self, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}, commitment_config::CommitmentConfig, epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, genesis_config::ClusterType, @@ -284,16 +284,17 @@ fn test_leader_failure_4() { /// * `leader_schedule` - An option that specifies whether the cluster should /// run with a fixed, predetermined leader schedule #[allow(clippy::cognitive_complexity)] -fn run_cluster_partition( - partitions: &[&[usize]], +fn run_cluster_partition( + partitions: &[Vec], leader_schedule: Option<(LeaderSchedule, Vec>)>, - on_partition_start: E, - on_partition_resolved: F, + mut context: C, + on_partition_start: impl FnOnce(&mut LocalCluster, &mut C), + on_before_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C), + on_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C), + partition_duration: Option, + ticks_per_slot: Option, additional_accounts: Vec<(Pubkey, AccountSharedData)>, -) where - E: FnOnce(&mut LocalCluster), - F: FnOnce(&mut LocalCluster), -{ +) { solana_logger::setup_with_default(RUST_LOG_FILTER); info!("PARTITION_TEST!"); let num_nodes = partitions.len(); @@ -351,6 +352,7 @@ fn run_cluster_partition( stakers_slot_offset: slots_per_epoch, skip_warmup_slots: true, additional_accounts, + ticks_per_slot: ticks_per_slot.unwrap_or(DEFAULT_TICKS_PER_SLOT), ..ClusterConfig::default() }; @@ -379,11 +381,14 @@ fn run_cluster_partition( } info!("PARTITION_TEST start partition"); + on_partition_start(&mut cluster, &mut context); enable_partition.store(false, Ordering::Relaxed); - on_partition_start(&mut cluster); - sleep(Duration::from_millis(leader_schedule_time)); + sleep(Duration::from_millis( + partition_duration.unwrap_or(leader_schedule_time), + )); + on_before_partition_resolved(&mut cluster, &mut context); info!("PARTITION_TEST remove partition"); enable_partition.store(true, Ordering::Relaxed); @@ -402,7 +407,7 @@ fn run_cluster_partition( ); sleep(Duration::from_millis(propagation_time)); info!("PARTITION_TEST resuming normal operation"); - on_partition_resolved(&mut cluster); + on_partition_resolved(&mut cluster, &mut context); } #[allow(unused_attributes)] @@ -410,57 +415,72 @@ fn run_cluster_partition( #[test] #[serial] fn test_cluster_partition_1_2() { - let empty = |_: &mut LocalCluster| {}; - let on_partition_resolved = |cluster: &mut LocalCluster| { + let empty = |_: &mut LocalCluster, _: &mut ()| {}; + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { cluster.check_for_new_roots(16, &"PARTITION_TEST"); }; - run_cluster_partition(&[&[1], &[1, 1]], None, empty, on_partition_resolved, vec![]) + run_cluster_partition( + &[vec![1], vec![1, 1]], + None, + (), + empty, + empty, + on_partition_resolved, + None, + None, + vec![], + ) } #[test] #[serial] fn test_cluster_partition_1_1() { - let empty = |_: &mut LocalCluster| {}; - let on_partition_resolved = |cluster: &mut LocalCluster| { + let empty = |_: &mut LocalCluster, _: &mut ()| {}; + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { cluster.check_for_new_roots(16, &"PARTITION_TEST"); }; - run_cluster_partition(&[&[1], &[1]], None, empty, on_partition_resolved, vec![]) + run_cluster_partition( + &[vec![1], vec![1]], + None, + (), + empty, + empty, + on_partition_resolved, + None, + None, + vec![], + ) } #[test] #[serial] fn test_cluster_partition_1_1_1() { - let empty = |_: &mut LocalCluster| {}; - let on_partition_resolved = |cluster: &mut LocalCluster| { + let empty = |_: &mut LocalCluster, _: &mut ()| {}; + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { cluster.check_for_new_roots(16, &"PARTITION_TEST"); }; run_cluster_partition( - &[&[1], &[1], &[1]], + &[vec![1], vec![1], vec![1]], None, + (), + empty, empty, on_partition_resolved, + None, + None, vec![], ) } fn create_custom_leader_schedule( - num_validators: usize, - num_slots_per_validator: usize, + validator_num_slots: &[usize], ) -> (LeaderSchedule, Vec>) { let mut leader_schedule = vec![]; let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new())) - .take(num_validators) + .take(validator_num_slots.len()) .collect(); - for (i, k) in validator_keys.iter().enumerate() { - let num_slots = { - if i == 0 { - // Set up the leader to have 50% of the slots - num_slots_per_validator * (num_validators - 1) - } else { - num_slots_per_validator - } - }; - for _ in 0..num_slots { + for (k, num_slots) in validator_keys.iter().zip(validator_num_slots.iter()) { + for _ in 0..*num_slots { leader_schedule.push(k.pubkey()) } } @@ -484,13 +504,17 @@ fn test_kill_heaviest_partition() { // eventually choose the major partition // 4) Check for recovery let num_slots_per_validator = 8; - let partitions: [&[usize]; 4] = [&[11], &[10], &[10], &[10]]; - let (leader_schedule, validator_keys) = - create_custom_leader_schedule(partitions.len(), num_slots_per_validator); + let partitions: [Vec; 4] = [vec![11], vec![10], vec![10], vec![10]]; + let (leader_schedule, validator_keys) = create_custom_leader_schedule(&[ + num_slots_per_validator * (partitions.len() - 1), + num_slots_per_validator, + num_slots_per_validator, + num_slots_per_validator, + ]); - let empty = |_: &mut LocalCluster| {}; + let empty = |_: &mut LocalCluster, _: &mut ()| {}; let validator_to_kill = validator_keys[0].pubkey(); - let on_partition_resolved = |cluster: &mut LocalCluster| { + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { info!("Killing validator with id: {}", validator_to_kill); cluster.exit_node(&validator_to_kill); cluster.check_for_new_roots(16, &"PARTITION_TEST"); @@ -498,52 +522,80 @@ fn test_kill_heaviest_partition() { run_cluster_partition( &partitions, Some((leader_schedule, validator_keys)), + (), + empty, empty, on_partition_resolved, + None, + None, vec![], ) } #[allow(clippy::assertions_on_constants)] -fn run_kill_partition_switch_threshold( - failures_stake: u64, - alive_stake_1: u64, - alive_stake_2: u64, - on_partition_resolved: F, -) where - F: Fn(&mut LocalCluster), -{ +fn run_kill_partition_switch_threshold( + stakes_to_kill: &[&[(usize, usize)]], + alive_stakes: &[&[(usize, usize)]], + partition_duration: Option, + ticks_per_slot: Option, + partition_context: C, + on_partition_start: impl Fn(&mut LocalCluster, &[Pubkey], &mut C), + on_before_partition_resolved: impl Fn(&mut LocalCluster, &mut C), + on_partition_resolved: impl Fn(&mut LocalCluster, &mut C), +) { // Needs to be at least 1/3 or there will be no overlap // with the confirmation supermajority 2/3 assert!(SWITCH_FORK_THRESHOLD >= 1f64 / 3f64); info!( - "stakes: {} {} {}", - failures_stake, alive_stake_1, alive_stake_2 + "stakes_to_kill: {:?}, alive_stakes: {:?}", + stakes_to_kill, alive_stakes ); // This test: // 1) Spins up three partitions // 2) Kills the first partition with the stake `failures_stake` // 5) runs `on_partition_resolved` - let num_slots_per_validator = 8; - let partitions: [&[usize]; 3] = [ - &[(failures_stake as usize)], - &[(alive_stake_1 as usize)], - &[(alive_stake_2 as usize)], - ]; - let (leader_schedule, validator_keys) = - create_custom_leader_schedule(partitions.len(), num_slots_per_validator); + let partitions: Vec<&[(usize, usize)]> = stakes_to_kill + .iter() + .cloned() + .chain(alive_stakes.iter().cloned()) + .collect(); - let validator_to_kill = validator_keys[0].pubkey(); - let on_partition_start = |cluster: &mut LocalCluster| { - info!("Killing validator with id: {}", validator_to_kill); - cluster.exit_node(&validator_to_kill); + let stake_partitions: Vec> = partitions + .iter() + .map(|stakes_and_slots| stakes_and_slots.iter().map(|(stake, _)| *stake).collect()) + .collect(); + let num_slots_per_validator: Vec = partitions + .iter() + .flat_map(|stakes_and_slots| stakes_and_slots.iter().map(|(_, num_slots)| *num_slots)) + .collect(); + + let (leader_schedule, validator_keys) = create_custom_leader_schedule(&num_slots_per_validator); + + info!( + "Validator ids: {:?}", + validator_keys + .iter() + .map(|k| k.pubkey()) + .collect::>() + ); + let validator_pubkeys: Vec = validator_keys.iter().map(|k| k.pubkey()).collect(); + let on_partition_start = |cluster: &mut LocalCluster, partition_context: &mut C| { + for validator_to_kill in &validator_pubkeys[0..stakes_to_kill.len()] { + info!("Killing validator with id: {}", validator_to_kill); + cluster.exit_node(&validator_to_kill); + } + on_partition_start(cluster, &validator_pubkeys, partition_context); }; run_cluster_partition( - &partitions, + &stake_partitions, Some((leader_schedule, validator_keys)), + partition_context, on_partition_start, + on_before_partition_resolved, on_partition_resolved, + partition_duration, + ticks_per_slot, vec![], ) } @@ -563,15 +615,24 @@ fn test_kill_partition_switch_threshold_no_progress() { // Check that no new roots were set 400 slots after partition resolves (gives time // for lockouts built during partition to resolve and gives validators an opportunity // to try and switch forks) - let on_partition_resolved = |cluster: &mut LocalCluster| { + let on_partition_start = |_: &mut LocalCluster, _: &[Pubkey], _: &mut ()| {}; + let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut ()| {}; + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { cluster.check_no_new_roots(400, &"PARTITION_TEST"); }; // This kills `max_failures_stake`, so no progress should be made run_kill_partition_switch_threshold( - failures_stake, - alive_stake_1, - alive_stake_2, + &[&[(failures_stake as usize, 16)]], + &[ + &[(alive_stake_1 as usize, 8)], + &[(alive_stake_2 as usize, 8)], + ], + None, + None, + (), + on_partition_start, + on_before_partition_resolved, on_partition_resolved, ); } @@ -606,13 +667,247 @@ fn test_kill_partition_switch_threshold_progress() { && smaller as f64 / total_stake as f64 <= SWITCH_FORK_THRESHOLD ); - let on_partition_resolved = |cluster: &mut LocalCluster| { + let on_partition_start = |_: &mut LocalCluster, _: &[Pubkey], _: &mut ()| {}; + let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut ()| {}; + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { cluster.check_for_new_roots(16, &"PARTITION_TEST"); }; run_kill_partition_switch_threshold( - failures_stake, - alive_stake_1, - alive_stake_2, + &[&[(failures_stake as usize, 16)]], + &[ + &[(alive_stake_1 as usize, 8)], + &[(alive_stake_2 as usize, 8)], + ], + None, + None, + (), + on_partition_start, + on_before_partition_resolved, + on_partition_resolved, + ); +} + +#[test] +#[serial] +#[ignore] +// Steps in this test: +// We want to create a situation like: +/* + 1 (2%, killed and restarted) --- 200 (37%, lighter fork) + / + 0 + \-------- 4 (38%, heavier fork) +*/ +// where the 2% that voted on slot 1 don't see their votes land in a block +// and thus without integrating votes from gossip into fork choice, will +// deem slot 4 the heavier fork and try to switch to slot 4, which doesn't pass the +// switch threshold. This stalls the network. + +// We do this by: +// 1) Creating a partition so all three nodes don't see each other +// 2) Kill the validator with 2% +// 3) Wait for longer than blockhash expiration +// 4) Copy in the lighter fork's blocks up, *only* up to the first slot in the lighter fork +// (not all the blocks on the lighter fork!), call this slot `L` +// 5) Restart the validator with 2% so that he votes on `L`, but the vote doesn't land +// due to blockhash expiration +// 6) Resolve the partition so that the 2% repairs the other fork, and tries to switch, +// stalling the network. + +fn test_fork_choice_ingest_votes_from_gossip() { + solana_logger::setup_with_default(RUST_LOG_FILTER); + let max_switch_threshold_failure_pct = 1.0 - 2.0 * SWITCH_FORK_THRESHOLD; + let total_stake = 100; + let max_failures_stake = (max_switch_threshold_failure_pct * total_stake as f64) as u64; + + // 1% less than the failure stake, where the 2% is allocated to a validator that + // has no leader slots and thus won't be able to vote on its own fork. + let failures_stake = max_failures_stake; + let total_alive_stake = total_stake - failures_stake; + let alive_stake_1 = total_alive_stake / 2 - 1; + let alive_stake_2 = total_alive_stake - alive_stake_1 - 1; + + // Heavier fork still doesn't have enough stake to switch. Both branches need + // the vote to land from the validator with `alive_stake_3` to allow the other + // fork to switch. + let alive_stake_3 = 2; + assert!(alive_stake_1 < alive_stake_2); + assert!(alive_stake_1 + alive_stake_3 > alive_stake_2); + + let partitions: &[&[(usize, usize)]] = &[ + &[(alive_stake_1 as usize, 8)], + &[(alive_stake_2 as usize, 8)], + &[(alive_stake_3 as usize, 0)], + ]; + + #[derive(Default)] + struct PartitionContext { + alive_stake3_info: Option, + smallest_validator_key: Pubkey, + lighter_fork_validator_key: Pubkey, + heaviest_validator_key: Pubkey, + } + let on_partition_start = + |cluster: &mut LocalCluster, validator_keys: &[Pubkey], context: &mut PartitionContext| { + // Kill validator with alive_stake_3, second in `partitions` slice + let smallest_validator_key = &validator_keys[3]; + let info = cluster.exit_node(smallest_validator_key); + context.alive_stake3_info = Some(info); + context.smallest_validator_key = *smallest_validator_key; + context.lighter_fork_validator_key = validator_keys[1]; + // Third in `partitions` slice + context.heaviest_validator_key = validator_keys[2]; + }; + + let ticks_per_slot = 8; + let on_before_partition_resolved = + |cluster: &mut LocalCluster, context: &mut PartitionContext| { + // Equal to ms_per_slot * MAX_RECENT_BLOCKHASHES, rounded up + let sleep_time_ms = + ((ticks_per_slot * DEFAULT_MS_PER_SLOT * MAX_RECENT_BLOCKHASHES as u64) + + DEFAULT_TICKS_PER_SLOT + - 1) + / DEFAULT_TICKS_PER_SLOT; + info!("Wait for blockhashes to expire, {} ms", sleep_time_ms); + + // Wait for blockhashes to expire + sleep(Duration::from_millis(sleep_time_ms)); + + let smallest_ledger_path = context + .alive_stake3_info + .as_ref() + .unwrap() + .info + .ledger_path + .clone(); + let lighter_fork_ledger_path = cluster.ledger_path(&context.lighter_fork_validator_key); + let heaviest_ledger_path = cluster.ledger_path(&context.heaviest_validator_key); + + // Open ledgers + let smallest_blockstore = open_blockstore(&smallest_ledger_path); + let lighter_fork_blockstore = open_blockstore(&lighter_fork_ledger_path); + let heaviest_blockstore = open_blockstore(&heaviest_ledger_path); + + info!("Opened blockstores"); + + // Find the first slot on the smaller fork + let mut first_slot_in_lighter_partition = 0; + for ((heavier_slot, heavier_slot_meta), (lighter_slot, _lighter_slot_meta)) in + heaviest_blockstore + .slot_meta_iterator(0) + .unwrap() + .zip(lighter_fork_blockstore.slot_meta_iterator(0).unwrap()) + { + if heavier_slot != lighter_slot { + // Find the parent of the fork point + let last_common_ancestor = heavier_slot_meta.parent_slot; + let lighter_fork_parent_meta = lighter_fork_blockstore + .meta(last_common_ancestor) + .unwrap() + .unwrap(); + // Lighter fork should only see one next slots, since only two validators + // could have generated childrenof `parent`, and the lighter fork *definitely* + // doesn't see the other fork's child, otherwise `heavier_slot != lighter_slot` + // would not have triggere above. + assert_eq!(lighter_fork_parent_meta.next_slots.len(), 1); + let lighter_fork_child = lighter_fork_parent_meta.next_slots[0]; + assert_ne!(first_slot_in_lighter_partition, heavier_slot); + first_slot_in_lighter_partition = lighter_fork_child; + info!( + "First slot in lighter partition is {}", + first_slot_in_lighter_partition + ); + break; + } + } + + assert!(first_slot_in_lighter_partition != 0); + + // Copy all the blocks from the smaller partition up to `first_slot_in_lighter_partition` + // into the smallest validator's blockstore + for lighter_slot in std::iter::once(first_slot_in_lighter_partition).chain( + AncestorIterator::new(first_slot_in_lighter_partition, &lighter_fork_blockstore), + ) { + let lighter_slot_meta = + lighter_fork_blockstore.meta(lighter_slot).unwrap().unwrap(); + assert!(lighter_slot_meta.is_full()); + // Get the shreds from the leader of the smaller fork + let lighter_fork_data_shreds = lighter_fork_blockstore + .get_data_shreds_for_slot(lighter_slot, 0) + .unwrap(); + + // Insert those shreds into the smallest validator's blockstore + smallest_blockstore + .insert_shreds(lighter_fork_data_shreds, None, false) + .unwrap(); + + // Check insert succeeded + let new_meta = smallest_blockstore.meta(lighter_slot).unwrap().unwrap(); + assert!(new_meta.is_full()); + assert_eq!(new_meta.last_index, lighter_slot_meta.last_index); + } + + // Restart the smallest validator that we killed earlier in `on_partition_start()` + drop(smallest_blockstore); + cluster.restart_node( + &context.smallest_validator_key, + context.alive_stake3_info.take().unwrap(), + ); + + loop { + // Wait for node to vote on the first slot on the less heavy fork, so it'll need + // a switch proof to flip to the other fork. + // However, this vote won't land because it's using an expired blockhash. The + // fork structure will look something like this after the vote: + /* + 1 (2%, killed and restarted) --- 200 (37%, lighter fork) + / + 0 + \-------- 4 (38%, heavier fork) + */ + if let Some(last_vote) = + last_vote_in_tower(&smallest_ledger_path, &context.smallest_validator_key) + { + // Check that the heaviest validator on the other fork doesn't have this slot, + // this must mean we voted on a unique slot on this fork + if last_vote == first_slot_in_lighter_partition { + info!( + "Saw vote on first slot in lighter partition {}", + first_slot_in_lighter_partition + ); + break; + } else { + info!( + "Haven't seen vote on first slot in lighter partition, latest vote is: {}", + last_vote + ); + } + } + + sleep(Duration::from_millis(20)); + } + + // Now resolve partition, allow validator to see the fork with the heavier validator, + // but the fork it's currently on is the heaviest, if only its own vote landed! + }; + + // Check that new roots were set after the partition resolves (gives time + // for lockouts built during partition to resolve and gives validators an opportunity + // to try and switch forks) + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut PartitionContext| { + cluster.check_for_new_roots(16, &"PARTITION_TEST"); + }; + + run_kill_partition_switch_threshold( + &[&[(failures_stake as usize - 1, 16)]], + partitions, + // Partition long enough such that the first vote made by validator with + // `alive_stake_3` won't be ingested due to BlockhashTooOld, + None, + Some(ticks_per_slot), + PartitionContext::default(), + on_partition_start, + on_before_partition_resolved, on_partition_resolved, ); } @@ -1674,7 +1969,7 @@ fn test_validator_saves_tower() { } fn open_blockstore(ledger_path: &Path) -> Blockstore { - Blockstore::open_with_access_type(ledger_path, AccessType::PrimaryOnly, None, true) + Blockstore::open_with_access_type(ledger_path, AccessType::TryPrimaryThenSecondary, None, true) .unwrap_or_else(|e| { panic!("Failed to open ledger at {:?}, err: {}", ledger_path, e); }) @@ -2196,9 +2491,9 @@ fn test_run_test_load_program_accounts_partition_root() { fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { let num_slots_per_validator = 8; - let partitions: [&[usize]; 2] = [&[(1)], &[(1)]]; + let partitions: [Vec; 2] = [vec![1], vec![1]]; let (leader_schedule, validator_keys) = - create_custom_leader_schedule(partitions.len(), num_slots_per_validator); + create_custom_leader_schedule(&[num_slots_per_validator, num_slots_per_validator]); let (update_client_sender, update_client_receiver) = unbounded(); let (scan_client_sender, scan_client_receiver) = unbounded(); @@ -2212,7 +2507,7 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { scan_client_receiver, ); - let on_partition_start = |cluster: &mut LocalCluster| { + let on_partition_start = |cluster: &mut LocalCluster, _: &mut ()| { let update_client = cluster .get_validator_client(&cluster.entry_point_info.id) .unwrap(); @@ -2223,7 +2518,9 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { scan_client_sender.send(scan_client).unwrap(); }; - let on_partition_resolved = |cluster: &mut LocalCluster| { + let on_partition_before_resolved = |_: &mut LocalCluster, _: &mut ()| {}; + + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { cluster.check_for_new_roots(20, &"run_test_load_program_accounts_partition"); exit.store(true, Ordering::Relaxed); t_update.join().unwrap(); @@ -2233,8 +2530,12 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { run_cluster_partition( &partitions, Some((leader_schedule, validator_keys)), + (), on_partition_start, + on_partition_before_resolved, on_partition_resolved, + None, + None, additional_accounts, ); }