From c6c7feb0c248482a4b2cfde6fd0d788f45814f8a Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 3 May 2021 05:13:29 +0000 Subject: [PATCH] Retry latest vote if expired (#16735) (#16927) (cherry picked from commit b5d30846d6e04a1623e4c251856d196e7623c639) Co-authored-by: carllin --- core/src/banking_stage.rs | 2 +- core/src/cluster_info.rs | 147 +++++++- core/src/consensus.rs | 149 +++++--- core/src/progress_map.rs | 7 + core/src/replay_stage.rs | 533 +++++++++++++++++++++++---- local-cluster/tests/local_cluster.rs | 68 ++-- 6 files changed, 734 insertions(+), 172 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 5578d9d6a3..de08c90619 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1364,7 +1364,7 @@ impl BankingStage { pub(crate) fn next_leader_tpu( cluster_info: &ClusterInfo, - poh_recorder: &Arc>, + poh_recorder: &Mutex, ) -> Option { if let Some(leader_pubkey) = poh_recorder .lock() diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 0ac9295bde..1b14da7c4e 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1020,9 +1020,21 @@ impl ClusterInfo { self.push_message(CrdsValue::new_signed(message, &self.keypair)); } + fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) { + assert!((vote_index as usize) < MAX_LOCKOUT_HISTORY); + let self_pubkey = self.id(); + let now = timestamp(); + let vote = Vote::new(self_pubkey, vote, now); + let vote = CrdsData::Vote(vote_index, vote); + let vote = CrdsValue::new_signed(vote, &self.keypair); + self.gossip + .write() + .unwrap() + .process_push_message(&self_pubkey, vec![vote], now); + } + pub fn push_vote(&self, tower: &[Slot], vote: Transaction) { debug_assert!(tower.iter().tuple_windows().all(|(a, b)| a < b)); - let now = timestamp(); // Find a crds vote which is evicted from the tower, and recycle its // vote-index. This can be either an old vote which is popped off the // deque, or recent vote which has expired before getting enough @@ -1065,15 +1077,40 @@ impl ClusterInfo { .map(|(_ /*wallclock*/, ix)| ix) }; let vote_index = vote_index.unwrap_or(num_crds_votes); - assert!((vote_index as usize) < MAX_LOCKOUT_HISTORY); - let vote = Vote::new(self_pubkey, vote, now); - debug_assert_eq!(vote.slot().unwrap(), *tower.last().unwrap()); - let vote = CrdsData::Vote(vote_index, vote); - let vote = CrdsValue::new_signed(vote, &self.keypair); - self.gossip - .write() - .unwrap() - .process_push_message(&self_pubkey, vec![vote], now); + self.push_vote_at_index(vote, vote_index); + } + + pub fn refresh_vote(&self, vote: Transaction, vote_slot: Slot) { + let vote_index = { + let gossip = + self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read); + (0..MAX_LOCKOUT_HISTORY as u8).find(|ix| { + let vote = CrdsValueLabel::Vote(*ix, self.id()); + if let Some(vote) = gossip.crds.lookup(&vote) { + match &vote.data { + CrdsData::Vote(_, prev_vote) => match prev_vote.slot() { + Some(prev_vote_slot) => prev_vote_slot == vote_slot, + None => { + error!("crds vote with no slots!"); + false + } + }, + _ => panic!("this should not happen!"), + } + } else { + false + } + }) + }; + + // If you don't see a vote with the same slot yet, this means you probably + // restarted, and need to wait for your oldest vote to propagate back to you. + // + // We don't write to an arbitrary index, because it may replace one of this validator's + // existing votes on the network. + if let Some(vote_index) = vote_index { + self.push_vote_at_index(vote, vote_index); + } } pub fn send_vote(&self, vote: &Transaction, tpu: Option) -> Result<()> { @@ -3573,6 +3610,96 @@ mod tests { .unwrap(); } + #[test] + fn test_refresh_vote() { + let keys = Keypair::new(); + let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0); + let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); + + // Construct and push a vote for some other slot + let unrefresh_slot = 5; + let unrefresh_tower = vec![1, 3, unrefresh_slot]; + let unrefresh_vote = Vote::new(unrefresh_tower.clone(), Hash::new_unique()); + let unrefresh_ix = vote_instruction::vote( + &Pubkey::new_unique(), // vote_pubkey + &Pubkey::new_unique(), // authorized_voter_pubkey + unrefresh_vote, + ); + let unrefresh_tx = Transaction::new_with_payer( + &[unrefresh_ix], // instructions + None, // payer + ); + cluster_info.push_vote(&unrefresh_tower, unrefresh_tx.clone()); + cluster_info.flush_push_queue(); + let (_, votes, max_ts) = cluster_info.get_votes(0); + assert_eq!(votes, vec![unrefresh_tx.clone()]); + + // Now construct vote for the slot to be refreshed later + let refresh_slot = 7; + let refresh_tower = vec![1, 3, unrefresh_slot, refresh_slot]; + let refresh_vote = Vote::new(refresh_tower.clone(), Hash::new_unique()); + let refresh_ix = vote_instruction::vote( + &Pubkey::new_unique(), // vote_pubkey + &Pubkey::new_unique(), // authorized_voter_pubkey + refresh_vote.clone(), + ); + let refresh_tx = Transaction::new_with_payer( + &[refresh_ix], // instructions + None, // payer + ); + + // Trying to refresh vote when it doesn't yet exist in gossip + // shouldn't add the vote + cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot); + cluster_info.flush_push_queue(); + let (_, votes, max_ts) = cluster_info.get_votes(max_ts); + assert_eq!(votes, vec![]); + let (_, votes, _) = cluster_info.get_votes(0); + assert_eq!(votes.len(), 1); + assert!(votes.contains(&unrefresh_tx)); + + // Push the new vote for `refresh_slot` + cluster_info.push_vote(&refresh_tower, refresh_tx.clone()); + cluster_info.flush_push_queue(); + + // Should be two votes in gossip + let (_, votes, _) = cluster_info.get_votes(0); + assert_eq!(votes.len(), 2); + assert!(votes.contains(&unrefresh_tx)); + assert!(votes.contains(&refresh_tx)); + + // Refresh a few times, we should only have the latest update + let mut latest_refresh_tx = refresh_tx; + for _ in 0..10 { + let latest_refreshed_recent_blockhash = Hash::new_unique(); + let new_signer = Keypair::new(); + let refresh_ix = vote_instruction::vote( + &new_signer.pubkey(), // vote_pubkey + &new_signer.pubkey(), // authorized_voter_pubkey + refresh_vote.clone(), + ); + latest_refresh_tx = Transaction::new_signed_with_payer( + &[refresh_ix], + None, + &[&new_signer], + latest_refreshed_recent_blockhash, + ); + cluster_info.refresh_vote(latest_refresh_tx.clone(), refresh_slot); + } + cluster_info.flush_push_queue(); + + // The diff since `max_ts` should only be the latest refreshed vote + let (_, votes, _) = cluster_info.get_votes(max_ts); + assert_eq!(votes.len(), 1); + assert_eq!(votes[0], latest_refresh_tx); + + // Should still be two votes in gossip + let (_, votes, _) = cluster_info.get_votes(0); + assert_eq!(votes.len(), 2); + assert!(votes.contains(&unrefresh_tx)); + assert!(votes.contains(&latest_refresh_tx)); + } + #[test] fn test_push_vote() { let mut rng = rand::thread_rng(); diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 20ede67845..8921afd1ef 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -98,6 +98,7 @@ pub(crate) struct ComputedBankState { // Tree of intervals of lockouts of the form [slot, slot + slot.lockout], // keyed by end of the range pub lockout_intervals: LockoutIntervals, + pub my_latest_landed_vote: Option, } #[frozen_abi(digest = "Eay84NBbJqiMBfE7HHH2o6e51wcvoU79g8zCi5sw6uj3")] @@ -108,6 +109,12 @@ pub struct Tower { threshold_size: f64, lockouts: VoteState, last_vote: Vote, + #[serde(skip)] + // The blockhash used in the last vote transaction, may or may not equal the + // blockhash of the voted block itself, depending if the vote slot was refreshed. + // For instance, a vote for slot 5, may be refreshed/resubmitted for inclusion in + // block 10, in which case `last_vote_tx_blockhash` equals the blockhash of 10, not 5. + last_vote_tx_blockhash: Hash, last_timestamp: BlockTimestamp, #[serde(skip)] path: PathBuf, @@ -134,6 +141,7 @@ impl Default for Tower { lockouts: VoteState::default(), last_vote: Vote::default(), last_timestamp: BlockTimestamp::default(), + last_vote_tx_blockhash: Hash::default(), path: PathBuf::default(), tmp_path: PathBuf::default(), stray_restored_slot: Option::default(), @@ -217,7 +225,7 @@ impl Tower { } pub(crate) fn collect_vote_lockouts( - node_pubkey: &Pubkey, + vote_account_pubkey: &Pubkey, bank_slot: Slot, vote_accounts: F, ancestors: &HashMap>, @@ -234,11 +242,12 @@ impl Tower { // Tree of intervals of lockouts of the form [slot, slot + slot.lockout], // keyed by end of the range let mut lockout_intervals = LockoutIntervals::new(); + let mut my_latest_landed_vote = None; for (key, (voted_stake, account)) in vote_accounts { if voted_stake == 0 { continue; } - trace!("{} {} with stake {}", node_pubkey, key, voted_stake); + trace!("{} {} with stake {}", vote_account_pubkey, key, voted_stake); let mut vote_state = match account.vote_state().as_ref() { Err(_) => { datapoint_warn!( @@ -260,7 +269,8 @@ impl Tower { .push((vote.slot, key)); } - if key == *node_pubkey || vote_state.node_pubkey == *node_pubkey { + if key == *vote_account_pubkey { + my_latest_landed_vote = vote_state.nth_recent_vote(0).map(|v| v.slot); debug!("vote state {:?}", vote_state); debug!( "observed slot {}", @@ -349,6 +359,7 @@ impl Tower { total_stake, bank_weight, lockout_intervals, + my_latest_landed_vote, } } @@ -364,13 +375,24 @@ impl Tower { .unwrap_or(false) } - fn new_vote( - local_vote_state: &VoteState, + pub fn tower_slots(&self) -> Vec { + self.lockouts.tower() + } + + pub fn last_vote_tx_blockhash(&self) -> Hash { + self.last_vote_tx_blockhash + } + + pub fn refresh_last_vote_tx_blockhash(&mut self, new_vote_tx_blockhash: Hash) { + self.last_vote_tx_blockhash = new_vote_tx_blockhash; + } + + fn apply_vote_and_generate_vote_diff( + local_vote_state: &mut VoteState, slot: Slot, hash: Hash, last_voted_slot_in_bank: Option, - ) -> (Vote, Vec /*VoteState.tower*/) { - let mut local_vote_state = local_vote_state.clone(); + ) -> Vote { let vote = Vote::new(vec![slot], hash); local_vote_state.process_vote_unchecked(&vote); let slots = if let Some(last_voted_slot_in_bank) = last_voted_slot_in_bank { @@ -389,44 +411,48 @@ impl Tower { slots, local_vote_state.votes ); - (Vote::new(slots, hash), local_vote_state.tower()) + Vote::new(slots, hash) } - fn last_voted_slot_in_bank(bank: &Bank, vote_account_pubkey: &Pubkey) -> Option { + pub fn last_voted_slot_in_bank(bank: &Bank, vote_account_pubkey: &Pubkey) -> Option { let (_stake, vote_account) = bank.get_vote_account(vote_account_pubkey)?; let slot = vote_account.vote_state().as_ref().ok()?.last_voted_slot(); slot } - pub fn record_bank_vote( + pub fn record_bank_vote(&mut self, bank: &Bank, vote_account_pubkey: &Pubkey) -> Option { + let last_voted_slot_in_bank = Self::last_voted_slot_in_bank(bank, vote_account_pubkey); + + // Returns the new root if one is made after applying a vote for the given bank to + // `self.lockouts` + self.record_bank_vote_and_update_lockouts(bank.slot(), bank.hash(), last_voted_slot_in_bank) + } + + fn record_bank_vote_and_update_lockouts( &mut self, - bank: &Bank, - vote_account_pubkey: &Pubkey, - ) -> (Option, Vec /*VoteState.tower*/) { - let (vote, tower_slots) = self.new_vote_from_bank(bank, vote_account_pubkey); - - let new_root = self.record_bank_vote_update_lockouts(vote); - (new_root, tower_slots) - } - - pub fn new_vote_from_bank( - &self, - bank: &Bank, - vote_account_pubkey: &Pubkey, - ) -> (Vote, Vec /*VoteState.tower*/) { - let voted_slot = Self::last_voted_slot_in_bank(bank, vote_account_pubkey); - Self::new_vote(&self.lockouts, bank.slot(), bank.hash(), voted_slot) - } - - pub fn record_bank_vote_update_lockouts(&mut self, vote: Vote) -> Option { - let slot = vote.last_voted_slot().unwrap_or(0); - trace!("{} record_vote for {}", self.node_pubkey, slot); + vote_slot: Slot, + vote_hash: Hash, + last_voted_slot_in_bank: Option, + ) -> Option { + trace!("{} record_vote for {}", self.node_pubkey, vote_slot); let old_root = self.root(); - self.lockouts.process_vote_unchecked(&vote); - self.last_vote = vote; + let mut new_vote = Self::apply_vote_and_generate_vote_diff( + &mut self.lockouts, + vote_slot, + vote_hash, + last_voted_slot_in_bank, + ); + + new_vote.timestamp = self.maybe_timestamp(self.last_vote.last_voted_slot().unwrap_or(0)); + self.last_vote = new_vote; + let new_root = self.root(); - datapoint_info!("tower-vote", ("latest", slot, i64), ("root", new_root, i64)); + datapoint_info!( + "tower-vote", + ("latest", vote_slot, i64), + ("root", new_root, i64) + ); if old_root != new_root { Some(new_root) } else { @@ -436,8 +462,7 @@ impl Tower { #[cfg(test)] pub fn record_vote(&mut self, slot: Slot, hash: Hash) -> Option { - let vote = Vote::new(vec![slot], hash); - self.record_bank_vote_update_lockouts(vote) + self.record_bank_vote_and_update_lockouts(slot, hash, self.last_voted_slot()) } pub fn last_voted_slot(&self) -> Option { @@ -452,10 +477,8 @@ impl Tower { self.stray_restored_slot } - pub fn last_vote_and_timestamp(&mut self) -> Vote { - let mut last_vote = self.last_vote.clone(); - last_vote.timestamp = self.maybe_timestamp(last_vote.last_voted_slot().unwrap_or(0)); - last_vote + pub fn last_vote(&mut self) -> Vote { + self.last_vote.clone() } fn maybe_timestamp(&mut self, current_slot: Slot) -> Option { @@ -1458,7 +1481,7 @@ pub mod test { return heaviest_fork_failures; } - let (new_root, _) = tower.record_bank_vote(&vote_bank, &my_vote_pubkey); + let new_root = tower.record_bank_vote(&vote_bank, &my_vote_pubkey); if let Some(new_root) = new_root { self.set_root(new_root); } @@ -2443,23 +2466,26 @@ pub mod test { } #[test] - fn test_new_vote() { - let local = VoteState::default(); - let (vote, tower_slots) = Tower::new_vote(&local, 0, Hash::default(), None); - assert_eq!(local.votes.len(), 0); + fn test_apply_vote_and_generate_vote_diff() { + let mut local = VoteState::default(); + let vote = Tower::apply_vote_and_generate_vote_diff(&mut local, 0, Hash::default(), None); + assert_eq!(local.votes.len(), 1); assert_eq!(vote.slots, vec![0]); - assert_eq!(tower_slots, vec![0]); + assert_eq!(local.tower(), vec![0]); } #[test] - fn test_new_vote_dup_vote() { - let local = VoteState::default(); - let vote = Tower::new_vote(&local, 0, Hash::default(), Some(0)); - assert!(vote.0.slots.is_empty()); + fn test_apply_vote_and_generate_vote_diff_dup_vote() { + let mut local = VoteState::default(); + // If `latest_voted_slot_in_bank == Some(0)`, then we already have a vote for 0. Adding + // another vote for slot 0 should return an empty vote as the diff. + let vote = + Tower::apply_vote_and_generate_vote_diff(&mut local, 0, Hash::default(), Some(0)); + assert!(vote.slots.is_empty()); } #[test] - fn test_new_vote_next_vote() { + fn test_apply_vote_and_generate_vote_diff_next_vote() { let mut local = VoteState::default(); let vote = Vote { slots: vec![0], @@ -2468,13 +2494,14 @@ pub mod test { }; local.process_vote_unchecked(&vote); assert_eq!(local.votes.len(), 1); - let (vote, tower_slots) = Tower::new_vote(&local, 1, Hash::default(), Some(0)); + let vote = + Tower::apply_vote_and_generate_vote_diff(&mut local, 1, Hash::default(), Some(0)); assert_eq!(vote.slots, vec![1]); - assert_eq!(tower_slots, vec![0, 1]); + assert_eq!(local.tower(), vec![0, 1]); } #[test] - fn test_new_vote_next_after_expired_vote() { + fn test_apply_vote_and_generate_vote_diff_next_after_expired_vote() { let mut local = VoteState::default(); let vote = Vote { slots: vec![0], @@ -2483,10 +2510,14 @@ pub mod test { }; local.process_vote_unchecked(&vote); assert_eq!(local.votes.len(), 1); - let (vote, tower_slots) = Tower::new_vote(&local, 3, Hash::default(), Some(0)); + + // First vote expired, so should be evicted from tower. Thus even with + // `latest_voted_slot_in_bank == Some(0)`, the first vote slot won't be + // observable in any of the results. + let vote = + Tower::apply_vote_and_generate_vote_diff(&mut local, 3, Hash::default(), Some(0)); assert_eq!(vote.slots, vec![3]); - // First vote expired, so should be evicted from tower. - assert_eq!(tower_slots, vec![3]); + assert_eq!(local.tower(), vec![3]); } #[test] @@ -2562,10 +2593,12 @@ pub mod test { } else { vec![] }; - let expected = Vote::new(slots, Hash::default()); + let mut expected = Vote::new(slots, Hash::default()); for i in 0..num_votes { tower.record_vote(i as u64, Hash::default()); } + + expected.timestamp = tower.last_vote.timestamp; assert_eq!(expected, tower.last_vote) } diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs index 868a9bbcd9..af29f1fb5f 100644 --- a/core/src/progress_map.rs +++ b/core/src/progress_map.rs @@ -259,6 +259,7 @@ pub(crate) struct ForkStats { pub(crate) computed: bool, pub(crate) lockout_intervals: LockoutIntervals, pub(crate) bank_hash: Option, + pub(crate) my_latest_landed_vote: Option, } #[derive(Clone, Default)] @@ -532,6 +533,12 @@ impl ProgressMap { } } + pub fn my_latest_landed_vote(&self, slot: Slot) -> Option { + self.progress_map + .get(&slot) + .and_then(|s| s.fork_stats.my_latest_landed_vote) + } + pub fn set_supermajority_confirmed_slot(&mut self, slot: Slot) { let slot_progress = self.get_mut(&slot).unwrap(); slot_progress.fork_stats.is_supermajority_confirmed = true; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 3323d4e54b..2af5fc2dfa 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -41,7 +41,7 @@ use solana_runtime::{ commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender, }; use solana_sdk::{ - clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, + clock::{Slot, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS}, genesis_config::ClusterType, hash::Hash, pubkey::Pubkey, @@ -50,7 +50,7 @@ use solana_sdk::{ timing::timestamp, transaction::Transaction, }; -use solana_vote_program::{vote_instruction, vote_state::Vote}; +use solana_vote_program::vote_state::Vote; use std::{ collections::{BTreeMap, HashMap, HashSet}, result, @@ -60,15 +60,16 @@ use std::{ Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }; pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const MAX_UNCONFIRMED_SLOTS: usize = 5; -const MAX_VOTE_SIGNATURES: usize = 200; pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; +const MAX_VOTE_SIGNATURES: usize = 200; +const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000; #[derive(PartialEq, Debug)] pub(crate) enum HeaviestForkFailures { @@ -97,6 +98,11 @@ impl Drop for Finalizer { } } +struct LastVoteRefreshTime { + last_refresh_time: Instant, + last_print_time: Instant, +} + #[derive(Default)] struct SkippedSlotsInfo { last_retransmit_slot: u64, @@ -334,6 +340,10 @@ impl ReplayStage { let mut latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks = LatestValidatorVotesForFrozenBanks::default(); let mut voted_signatures = Vec::new(); let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader; + let mut last_vote_refresh_time = LastVoteRefreshTime { + last_refresh_time: Instant::now(), + last_print_time: Instant::now(), + }; loop { let allocated = thread_mem_usage::Allocatedp::default(); @@ -456,7 +466,7 @@ impl ReplayStage { let mut compute_bank_stats_time = Measure::start("compute_bank_stats"); let newly_computed_slot_stats = Self::compute_bank_stats( - &my_pubkey, + &vote_account, &ancestors, &mut frozen_banks, &tower, @@ -488,9 +498,14 @@ impl ReplayStage { let (heaviest_bank, heaviest_bank_on_same_voted_fork) = heaviest_subtree_fork_choice .select_forks(&frozen_banks, &tower, &progress, &ancestors, &bank_forks); select_forks_time.stop(); - Self::report_memory(&allocated, "select_fork", start); + if let Some(heaviest_bank_on_same_voted_fork) = heaviest_bank_on_same_voted_fork.as_ref() { + if let Some(my_latest_landed_vote) = progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot()) { + Self::refresh_last_vote(&mut tower, &cluster_info, heaviest_bank_on_same_voted_fork, &poh_recorder, my_latest_landed_vote, &vote_account, &authorized_voter_keypairs.read().unwrap(), &mut voted_signatures, has_new_vote_been_rooted, &mut last_vote_refresh_time); + } + } + let mut select_vote_and_reset_forks_time = Measure::start("select_vote_and_reset_forks"); let SelectVoteAndResetForkResult { @@ -1319,8 +1334,7 @@ impl ReplayStage { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); } trace!("handle votable bank {}", bank.slot()); - let (new_root, tower_slots) = tower.record_bank_vote(bank, vote_account_pubkey); - let last_vote = tower.last_vote_and_timestamp(); + let new_root = tower.record_bank_vote(bank, vote_account_pubkey); if let Err(err) = tower.save(&cluster_info.keypair) { error!("Unable to save tower: {:?}", err); @@ -1396,29 +1410,25 @@ impl ReplayStage { poh_recorder, vote_account_pubkey, authorized_voter_keypairs, - last_vote, - &tower_slots, + tower, switch_fork_decision, vote_signatures, *has_new_vote_been_rooted, ); } - #[allow(clippy::too_many_arguments)] - fn push_vote( - cluster_info: &ClusterInfo, - bank: &Arc, - poh_recorder: &Arc>, + fn generate_vote_tx( + node_keypair: &Arc, + bank: &Bank, vote_account_pubkey: &Pubkey, authorized_voter_keypairs: &[Arc], vote: Vote, - tower: &[Slot], switch_fork_decision: &SwitchForkDecision, vote_signatures: &mut Vec, has_new_vote_been_rooted: bool, - ) { + ) -> Option { if authorized_voter_keypairs.is_empty() { - return; + return None; } let vote_account = match bank.get_vote_account(vote_account_pubkey) { None => { @@ -1426,7 +1436,7 @@ impl ReplayStage { "Vote account {} does not exist. Unable to vote", vote_account_pubkey, ); - return; + return None; } Some((_stake, vote_account)) => vote_account, }; @@ -1437,7 +1447,7 @@ impl ReplayStage { "Vote account {} is unreadable. Unable to vote", vote_account_pubkey, ); - return; + return None; } Ok(vote_state) => vote_state, }; @@ -1450,7 +1460,7 @@ impl ReplayStage { vote_account_pubkey, bank.epoch() ); - return; + return None; }; let authorized_voter_keypair = match authorized_voter_keypairs @@ -1460,28 +1470,19 @@ impl ReplayStage { None => { warn!("The authorized keypair {} for vote account {} is not available. Unable to vote", authorized_voter_pubkey, vote_account_pubkey); - return; + return None; } Some(authorized_voter_keypair) => authorized_voter_keypair, }; - let node_keypair = &cluster_info.keypair; // Send our last few votes along with the new one - let vote_ix = if bank.slot() > Self::get_unlock_switch_vote_slot(bank.cluster_type()) { - switch_fork_decision - .to_vote_instruction( - vote, - &vote_account_pubkey, - &authorized_voter_keypair.pubkey(), - ) - .expect("Switch threshold failure should not lead to voting") - } else { - vote_instruction::vote( + let vote_ix = switch_fork_decision + .to_vote_instruction( + vote, &vote_account_pubkey, &authorized_voter_keypair.pubkey(), - vote, ) - }; + .expect("Switch threshold failure should not lead to voting"); let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey())); @@ -1498,11 +1499,110 @@ impl ReplayStage { vote_signatures.clear(); } - let _ = cluster_info.send_vote( - &vote_tx, - crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), + Some(vote_tx) + } + + #[allow(clippy::too_many_arguments)] + fn refresh_last_vote( + tower: &mut Tower, + cluster_info: &ClusterInfo, + heaviest_bank_on_same_fork: &Bank, + poh_recorder: &Mutex, + my_latest_landed_vote: Slot, + vote_account_pubkey: &Pubkey, + authorized_voter_keypairs: &[Arc], + vote_signatures: &mut Vec, + has_new_vote_been_rooted: bool, + last_vote_refresh_time: &mut LastVoteRefreshTime, + ) { + let last_voted_slot = tower.last_voted_slot(); + if last_voted_slot.is_none() { + return; + } + + // Refresh the vote if our latest vote hasn't landed, and the recent blockhash of the + // last attempt at a vote transaction has expired + let last_voted_slot = last_voted_slot.unwrap(); + if my_latest_landed_vote > last_voted_slot + && last_vote_refresh_time.last_print_time.elapsed().as_secs() >= 1 + { + last_vote_refresh_time.last_print_time = Instant::now(); + info!("Last landed vote for slot {} in bank {} is greater than the current last vote for slot: {} tracked by Tower", my_latest_landed_vote, heaviest_bank_on_same_fork.slot(), last_voted_slot); + } + if my_latest_landed_vote >= last_voted_slot + || heaviest_bank_on_same_fork + .check_hash_age(&tower.last_vote_tx_blockhash(), MAX_PROCESSING_AGE) + .unwrap_or(false) + // In order to avoid voting on multiple forks all past MAX_PROCESSING_AGE that don't + // include the last voted blockhash + || last_vote_refresh_time.last_refresh_time.elapsed().as_millis() < MAX_VOTE_REFRESH_INTERVAL_MILLIS as u128 + { + return; + } + + // TODO: check the timestamp in this vote is correct, i.e. it shouldn't + // have changed from the original timestamp of the vote. + let vote_tx = Self::generate_vote_tx( + &cluster_info.keypair, + heaviest_bank_on_same_fork, + vote_account_pubkey, + authorized_voter_keypairs, + tower.last_vote(), + &SwitchForkDecision::SameFork, + vote_signatures, + has_new_vote_been_rooted, ); - cluster_info.push_vote(tower, vote_tx); + + if let Some(vote_tx) = vote_tx { + let recent_blockhash = vote_tx.message.recent_blockhash; + tower.refresh_last_vote_tx_blockhash(recent_blockhash); + + // Send the votes to the TPU and gossip for network propagation + let hash_string = format!("{}", recent_blockhash); + datapoint_info!( + "refresh_vote", + ("last_voted_slot", last_voted_slot, i64), + ("target_bank_slot", heaviest_bank_on_same_fork.slot(), i64), + ("target_bank_hash", hash_string, String), + ); + let _ = cluster_info.send_vote( + &vote_tx, + crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), + ); + cluster_info.refresh_vote(vote_tx, last_voted_slot); + last_vote_refresh_time.last_refresh_time = Instant::now(); + } + } + + fn push_vote( + cluster_info: &ClusterInfo, + bank: &Bank, + poh_recorder: &Mutex, + vote_account_pubkey: &Pubkey, + authorized_voter_keypairs: &[Arc], + tower: &mut Tower, + switch_fork_decision: &SwitchForkDecision, + vote_signatures: &mut Vec, + has_new_vote_been_rooted: bool, + ) { + let vote_tx = Self::generate_vote_tx( + &cluster_info.keypair, + bank, + vote_account_pubkey, + authorized_voter_keypairs, + tower.last_vote(), + switch_fork_decision, + vote_signatures, + has_new_vote_been_rooted, + ); + if let Some(vote_tx) = vote_tx { + tower.refresh_last_vote_tx_blockhash(vote_tx.message.recent_blockhash); + let _ = cluster_info.send_vote( + &vote_tx, + crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder), + ); + cluster_info.push_vote(&tower.tower_slots(), vote_tx); + } } fn update_commitment_cache( @@ -1714,7 +1814,7 @@ impl ReplayStage { #[allow(clippy::too_many_arguments)] pub(crate) fn compute_bank_stats( - my_pubkey: &Pubkey, + my_vote_pubkey: &Pubkey, ancestors: &HashMap>, frozen_banks: &mut Vec>, tower: &Tower, @@ -1739,7 +1839,7 @@ impl ReplayStage { .computed; if !is_computed { let computed_bank_state = Tower::collect_vote_lockouts( - my_pubkey, + my_vote_pubkey, bank_slot, bank.vote_accounts().into_iter(), &ancestors, @@ -1757,6 +1857,7 @@ impl ReplayStage { voted_stakes, total_stake, lockout_intervals, + my_latest_landed_vote, .. } = computed_bank_state; let stats = progress @@ -1767,6 +1868,7 @@ impl ReplayStage { stats.lockout_intervals = lockout_intervals; stats.block_height = bank.block_height(); stats.bank_hash = Some(bank.hash()); + stats.my_latest_landed_vote = my_latest_landed_vote; stats.computed = true; new_stats.push(bank_slot); datapoint_info!( @@ -1777,7 +1879,7 @@ impl ReplayStage { ); info!( "{} slot_weight: {} {} {} {}", - my_pubkey, + my_vote_pubkey, bank_slot, stats.weight, stats.fork_weight, @@ -2426,6 +2528,7 @@ impl ReplayStage { pub(crate) mod tests { use super::*; use crate::{ + cluster_info::Node, consensus::test::{initialize_state, VoteSimulator}, consensus::Tower, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, @@ -2457,6 +2560,7 @@ pub(crate) mod tests { hash::{hash, Hash}, instruction::InstructionError, packet::PACKET_DATA_SIZE, + poh_config::PohConfig, signature::{Keypair, Signature, Signer}, system_transaction, transaction::TransactionError, @@ -2491,10 +2595,15 @@ pub(crate) mod tests { struct ReplayBlockstoreComponents { blockstore: Arc, - validator_voting_keys: HashMap, + validator_node_to_vote_keys: HashMap, + validator_authorized_voter_keypairs: HashMap, + my_vote_pubkey: Pubkey, progress: ProgressMap, - bank_forks: Arc>, + cluster_info: ClusterInfo, leader_schedule_cache: Arc, + poh_recorder: Mutex, + bank_forks: Arc>, + tower: Tower, rpc_subscriptions: Arc, } @@ -2507,10 +2616,11 @@ pub(crate) mod tests { let validator_authorized_voter_keypairs: Vec<_> = (0..20).map(|_| ValidatorVoteKeypairs::new_rand()).collect(); - let validator_voting_keys: HashMap<_, _> = validator_authorized_voter_keypairs - .iter() - .map(|v| (v.node_keypair.pubkey(), v.vote_keypair.pubkey())) - .collect(); + let validator_node_to_vote_keys: HashMap = + validator_authorized_voter_keypairs + .iter() + .map(|v| (v.node_keypair.pubkey(), v.vote_keypair.pubkey())) + .collect(); let GenesisConfigInfo { genesis_config, .. } = genesis_utils::create_genesis_config_with_vote_accounts( 10_000, @@ -2535,12 +2645,47 @@ pub(crate) mod tests { ), ); + // ClusterInfo + let my_keypairs = &validator_authorized_voter_keypairs[0]; + let my_pubkey = my_keypairs.node_keypair.pubkey(); + let cluster_info = ClusterInfo::new( + Node::new_localhost_with_pubkey(&my_pubkey).info, + Arc::new(Keypair::from_bytes(&my_keypairs.node_keypair.to_bytes()).unwrap()), + ); + assert_eq!(my_pubkey, cluster_info.id()); + // Leader schedule cache let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); + // PohRecorder + let poh_recorder = Mutex::new( + PohRecorder::new( + bank0.tick_height(), + bank0.last_blockhash(), + bank0.slot(), + None, + bank0.ticks_per_slot(), + &Pubkey::default(), + &blockstore, + &leader_schedule_cache, + &Arc::new(PohConfig::default()), + Arc::new(AtomicBool::new(false)), + ) + .0, + ); + // BankForks let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); + // Tower + let my_vote_pubkey = my_keypairs.vote_keypair.pubkey(); + let tower = Tower::new_from_bankforks( + &bank_forks.read().unwrap(), + &ledger_path, + &cluster_info.id(), + &my_vote_pubkey, + ); + // RpcSubscriptions let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); @@ -2552,12 +2697,23 @@ pub(crate) mod tests { optimistically_confirmed_bank, )); + let validator_authorized_voter_keypairs: HashMap = + validator_authorized_voter_keypairs + .into_iter() + .map(|keys| (keys.vote_keypair.pubkey(), keys)) + .collect(); + ReplayBlockstoreComponents { blockstore, - validator_voting_keys, + validator_node_to_vote_keys, + validator_authorized_voter_keypairs, + my_vote_pubkey, progress, - bank_forks, + cluster_info, leader_schedule_cache, + poh_recorder, + bank_forks, + tower, rpc_subscriptions, } } @@ -2566,11 +2722,12 @@ pub(crate) mod tests { fn test_child_slots_of_same_parent() { let ReplayBlockstoreComponents { blockstore, - validator_voting_keys, + validator_node_to_vote_keys, mut progress, bank_forks, leader_schedule_cache, rpc_subscriptions, + .. } = replay_blockstore_components(); // Insert a non-root bank so that the propagation logic will update this @@ -2585,7 +2742,9 @@ pub(crate) mod tests { ForkProgress::new_from_bank( &bank1, bank1.collector_id(), - validator_voting_keys.get(&bank1.collector_id()).unwrap(), + validator_node_to_vote_keys + .get(&bank1.collector_id()) + .unwrap(), Some(0), DuplicateStats::default(), 0, @@ -2655,7 +2814,7 @@ pub(crate) mod tests { ]; for slot in expected_leader_slots { let leader = leader_schedule_cache.slot_leader_at(slot, None).unwrap(); - let vote_key = validator_voting_keys.get(&leader).unwrap(); + let vote_key = validator_node_to_vote_keys.get(&leader).unwrap(); assert!(progress .get_propagated_stats(1) .unwrap() @@ -3318,15 +3477,16 @@ pub(crate) mod tests { #[test] fn test_compute_bank_stats_confirmed() { let vote_keypairs = ValidatorVoteKeypairs::new_rand(); - let node_pubkey = vote_keypairs.node_keypair.pubkey(); - let keypairs: HashMap<_, _> = vec![(node_pubkey, vote_keypairs)].into_iter().collect(); + let my_node_pubkey = vote_keypairs.node_keypair.pubkey(); + let my_vote_pubkey = vote_keypairs.vote_keypair.pubkey(); + let keypairs: HashMap<_, _> = vec![(my_node_pubkey, vote_keypairs)].into_iter().collect(); let (bank_forks, mut progress, mut heaviest_subtree_fork_choice) = initialize_state(&keypairs, 10_000); let mut latest_validator_votes_for_frozen_banks = LatestValidatorVotesForFrozenBanks::default(); let bank0 = bank_forks.get(0).unwrap().clone(); - let my_keypairs = keypairs.get(&node_pubkey).unwrap(); + let my_keypairs = keypairs.get(&my_node_pubkey).unwrap(); let vote_tx = vote_transaction::new_vote_transaction( vec![0], bank0.hash(), @@ -3338,7 +3498,7 @@ pub(crate) mod tests { ); let bank_forks = RwLock::new(bank_forks); - let bank1 = Bank::new_from_parent(&bank0, &node_pubkey, 1); + let bank1 = Bank::new_from_parent(&bank0, &my_node_pubkey, 1); bank1.process_transaction(&vote_tx).unwrap(); bank1.freeze(); @@ -3353,7 +3513,7 @@ pub(crate) mod tests { .collect(); let tower = Tower::new_for_tests(0, 0.67); let newly_computed = ReplayStage::compute_bank_stats( - &node_pubkey, + &my_vote_pubkey, &ancestors, &mut frozen_banks, &tower, @@ -3404,7 +3564,7 @@ pub(crate) mod tests { .cloned() .collect(); let newly_computed = ReplayStage::compute_bank_stats( - &node_pubkey, + &my_vote_pubkey, &ancestors, &mut frozen_banks, &tower, @@ -3440,7 +3600,7 @@ pub(crate) mod tests { .cloned() .collect(); let newly_computed = ReplayStage::compute_bank_stats( - &node_pubkey, + &my_vote_pubkey, &ancestors, &mut frozen_banks, &tower, @@ -3459,8 +3619,8 @@ pub(crate) mod tests { fn test_same_weight_select_lower_slot() { // Init state let mut vote_simulator = VoteSimulator::new(1); - let node_pubkey = vote_simulator.node_pubkeys[0]; - let tower = Tower::new_with_key(&node_pubkey); + let my_node_pubkey = vote_simulator.node_pubkeys[0]; + let tower = Tower::new_with_key(&my_node_pubkey); // Create the tree of banks in a BankForks object let forks = tr(0) / (tr(1)) / (tr(2)); @@ -3477,8 +3637,10 @@ pub(crate) mod tests { let mut latest_validator_votes_for_frozen_banks = LatestValidatorVotesForFrozenBanks::default(); let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); + + let my_vote_pubkey = vote_simulator.vote_pubkeys[0]; ReplayStage::compute_bank_stats( - &node_pubkey, + &my_vote_pubkey, &ancestors, &mut frozen_banks, &tower, @@ -3529,8 +3691,8 @@ pub(crate) mod tests { fn test_child_bank_heavier() { // Init state let mut vote_simulator = VoteSimulator::new(1); - let node_pubkey = vote_simulator.node_pubkeys[0]; - let mut tower = Tower::new_with_key(&node_pubkey); + let my_node_pubkey = vote_simulator.node_pubkeys[0]; + let mut tower = Tower::new_with_key(&my_node_pubkey); // Create the tree of banks in a BankForks object let forks = tr(0) / (tr(1) / (tr(2) / (tr(3)))); @@ -3538,13 +3700,13 @@ pub(crate) mod tests { // Set the voting behavior let mut cluster_votes = HashMap::new(); let votes = vec![0, 2]; - cluster_votes.insert(node_pubkey, votes.clone()); + cluster_votes.insert(my_node_pubkey, votes.clone()); vote_simulator.fill_bank_forks(forks, &cluster_votes); // Fill banks with votes for vote in votes { assert!(vote_simulator - .simulate_vote(vote, &node_pubkey, &mut tower,) + .simulate_vote(vote, &my_node_pubkey, &mut tower,) .is_empty()); } @@ -3557,8 +3719,9 @@ pub(crate) mod tests { .cloned() .collect(); + let my_vote_pubkey = vote_simulator.vote_pubkeys[0]; ReplayStage::compute_bank_stats( - &node_pubkey, + &my_vote_pubkey, &vote_simulator.bank_forks.read().unwrap().ancestors(), &mut frozen_banks, &tower, @@ -4364,7 +4527,7 @@ pub(crate) mod tests { #[test] fn test_leader_snapshot_restart_propagation() { let ReplayBlockstoreComponents { - validator_voting_keys, + validator_node_to_vote_keys, mut progress, bank_forks, leader_schedule_cache, @@ -4399,7 +4562,7 @@ pub(crate) mod tests { let vote_tracker = VoteTracker::default(); // Add votes - for vote_key in validator_voting_keys.values() { + for vote_key in validator_node_to_vote_keys.values() { vote_tracker.insert_vote(root_bank.slot(), *vote_key); } @@ -4408,7 +4571,7 @@ pub(crate) mod tests { // Update propagation status let tower = Tower::new_for_tests(0, 0.67); ReplayStage::compute_bank_stats( - &my_pubkey, + &validator_node_to_vote_keys[&my_pubkey], &ancestors, &mut frozen_banks, &tower, @@ -4608,6 +4771,234 @@ pub(crate) mod tests { assert_eq!(heaviest_subtree_fork_choice.best_overall_slot().0, 4); } + #[test] + fn test_replay_stage_refresh_last_vote() { + let ReplayBlockstoreComponents { + mut validator_authorized_voter_keypairs, + cluster_info, + poh_recorder, + bank_forks, + mut tower, + my_vote_pubkey, + .. + } = replay_blockstore_components(); + + let mut last_vote_refresh_time = LastVoteRefreshTime { + last_refresh_time: Instant::now(), + last_print_time: Instant::now(), + }; + let has_new_vote_been_rooted = false; + let mut voted_signatures = vec![]; + + let my_vote_keypair = vec![Arc::new( + validator_authorized_voter_keypairs + .remove(&my_vote_pubkey) + .unwrap() + .vote_keypair, + )]; + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + + fn fill_bank_with_ticks(bank: &Bank) { + let parent_distance = bank.slot() - bank.parent_slot(); + for _ in 0..parent_distance { + let last_blockhash = bank.last_blockhash(); + while bank.last_blockhash() == last_blockhash { + bank.register_tick(&Hash::new_unique()) + } + } + } + + // Simulate landing a vote for slot 0 landing in slot 1 + let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); + fill_bank_with_ticks(&bank1); + tower.record_bank_vote(&bank0, &my_vote_pubkey); + ReplayStage::push_vote( + &cluster_info, + &bank0, + &poh_recorder, + &my_vote_pubkey, + &my_vote_keypair, + &mut tower, + &SwitchForkDecision::SameFork, + &mut voted_signatures, + has_new_vote_been_rooted, + ); + let (_, votes, max_ts) = cluster_info.get_votes(0); + assert_eq!(votes.len(), 1); + let vote_tx = &votes[0]; + assert_eq!(vote_tx.message.recent_blockhash, bank0.last_blockhash()); + assert_eq!(tower.last_vote_tx_blockhash(), bank0.last_blockhash()); + assert_eq!(tower.last_voted_slot().unwrap(), 0); + bank1.process_transaction(vote_tx).unwrap(); + bank1.freeze(); + + // Trying to refresh the vote for bank 0 in bank 1 or bank 2 won't succeed because + // the last vote has landed already + let bank2 = Arc::new(Bank::new_from_parent(&bank1, &Pubkey::default(), 2)); + fill_bank_with_ticks(&bank2); + bank2.freeze(); + for refresh_bank in &[&bank1, &bank2] { + ReplayStage::refresh_last_vote( + &mut tower, + &cluster_info, + refresh_bank, + &poh_recorder, + Tower::last_voted_slot_in_bank(&refresh_bank, &my_vote_pubkey).unwrap(), + &my_vote_pubkey, + &my_vote_keypair, + &mut voted_signatures, + has_new_vote_been_rooted, + &mut last_vote_refresh_time, + ); + + // No new votes have been submitted to gossip + let (_, votes, _max_ts) = cluster_info.get_votes(max_ts); + assert!(votes.is_empty()); + // Tower's latest vote tx blockhash hasn't changed either + assert_eq!(tower.last_vote_tx_blockhash(), bank0.last_blockhash()); + assert_eq!(tower.last_voted_slot().unwrap(), 0); + } + + // Simulate submitting a new vote for bank 1 to the network, but the vote + // not landing + tower.record_bank_vote(&bank1, &my_vote_pubkey); + ReplayStage::push_vote( + &cluster_info, + &bank1, + &poh_recorder, + &my_vote_pubkey, + &my_vote_keypair, + &mut tower, + &SwitchForkDecision::SameFork, + &mut voted_signatures, + has_new_vote_been_rooted, + ); + let (_, votes, max_ts) = cluster_info.get_votes(max_ts); + assert_eq!(votes.len(), 1); + let vote_tx = &votes[0]; + assert_eq!(vote_tx.message.recent_blockhash, bank1.last_blockhash()); + assert_eq!(tower.last_vote_tx_blockhash(), bank1.last_blockhash()); + assert_eq!(tower.last_voted_slot().unwrap(), 1); + + // Trying to refresh the vote for bank 1 in bank 2 won't succeed because + // the last vote has not expired yet + ReplayStage::refresh_last_vote( + &mut tower, + &cluster_info, + &bank2, + &poh_recorder, + Tower::last_voted_slot_in_bank(&bank2, &my_vote_pubkey).unwrap(), + &my_vote_pubkey, + &my_vote_keypair, + &mut voted_signatures, + has_new_vote_been_rooted, + &mut last_vote_refresh_time, + ); + // No new votes have been submitted to gossip + let (_, votes, max_ts) = cluster_info.get_votes(max_ts); + assert!(votes.is_empty()); + assert_eq!(tower.last_vote_tx_blockhash(), bank1.last_blockhash()); + assert_eq!(tower.last_voted_slot().unwrap(), 1); + + // Create a bank where the last vote transaction will have expired + let expired_bank = Arc::new(Bank::new_from_parent( + &bank2, + &Pubkey::default(), + bank2.slot() + MAX_PROCESSING_AGE as Slot, + )); + fill_bank_with_ticks(&expired_bank); + expired_bank.freeze(); + + // Now trying to refresh the vote for slot 1 will succeed because the recent blockhash + // of the last vote transaction has expired + last_vote_refresh_time.last_refresh_time = last_vote_refresh_time + .last_refresh_time + .checked_sub(Duration::from_millis( + MAX_VOTE_REFRESH_INTERVAL_MILLIS as u64 + 1, + )) + .unwrap(); + let clone_refresh_time = last_vote_refresh_time.last_refresh_time; + ReplayStage::refresh_last_vote( + &mut tower, + &cluster_info, + &expired_bank, + &poh_recorder, + Tower::last_voted_slot_in_bank(&expired_bank, &my_vote_pubkey).unwrap(), + &my_vote_pubkey, + &my_vote_keypair, + &mut voted_signatures, + has_new_vote_been_rooted, + &mut last_vote_refresh_time, + ); + assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time); + let (_, votes, max_ts) = cluster_info.get_votes(max_ts); + assert_eq!(votes.len(), 1); + let vote_tx = &votes[0]; + assert_eq!( + vote_tx.message.recent_blockhash, + expired_bank.last_blockhash() + ); + assert_eq!( + tower.last_vote_tx_blockhash(), + expired_bank.last_blockhash() + ); + assert_eq!(tower.last_voted_slot().unwrap(), 1); + + // Processing the vote transaction should be valid + let expired_bank_child = Arc::new(Bank::new_from_parent( + &expired_bank, + &Pubkey::default(), + expired_bank.slot() + 1, + )); + expired_bank_child.process_transaction(vote_tx).unwrap(); + let (_stake, vote_account) = expired_bank_child + .get_vote_account(&my_vote_pubkey) + .unwrap(); + assert_eq!( + vote_account.vote_state().as_ref().unwrap().tower(), + vec![0, 1] + ); + fill_bank_with_ticks(&expired_bank_child); + expired_bank_child.freeze(); + + // Trying to refresh the vote on a sibling bank where: + // 1) The vote for slot 1 hasn't landed + // 2) The latest refresh vote transaction's recent blockhash (the sibling's hash) doesn't exist + // This will still not refresh because `MAX_VOTE_REFRESH_INTERVAL_MILLIS` has not expired yet + let expired_bank_sibling = Arc::new(Bank::new_from_parent( + &bank2, + &Pubkey::default(), + expired_bank_child.slot() + 1, + )); + fill_bank_with_ticks(&expired_bank_sibling); + expired_bank_sibling.freeze(); + // Set the last refresh to now, shouldn't refresh because the last refresh just happened. + last_vote_refresh_time.last_refresh_time = Instant::now(); + ReplayStage::refresh_last_vote( + &mut tower, + &cluster_info, + &expired_bank_sibling, + &poh_recorder, + Tower::last_voted_slot_in_bank(&expired_bank_sibling, &my_vote_pubkey).unwrap(), + &my_vote_pubkey, + &my_vote_keypair, + &mut voted_signatures, + has_new_vote_been_rooted, + &mut last_vote_refresh_time, + ); + let (_, votes, _max_ts) = cluster_info.get_votes(max_ts); + assert!(votes.is_empty()); + assert_eq!( + vote_tx.message.recent_blockhash, + expired_bank.last_blockhash() + ); + assert_eq!( + tower.last_vote_tx_blockhash(), + expired_bank.last_blockhash() + ); + assert_eq!(tower.last_voted_slot().unwrap(), 1); + } + fn run_compute_and_select_forks( bank_forks: &RwLock, progress: &mut ProgressMap, diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index ab250c1102..94f00b1a8d 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -52,7 +52,7 @@ use solana_sdk::{ }; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ - collections::{HashMap, HashSet}, + collections::{BTreeSet, HashMap, HashSet}, fs, io::Read, iter, @@ -689,7 +689,6 @@ fn test_kill_partition_switch_threshold_progress() { #[test] #[serial] -#[ignore] // Steps in this test: // We want to create a situation like: /* @@ -714,7 +713,7 @@ fn test_kill_partition_switch_threshold_progress() { // 6) Resolve the partition so that the 2% repairs the other fork, and tries to switch, // stalling the network. -fn test_fork_choice_ingest_votes_from_gossip() { +fn test_fork_choice_refresh_old_votes() { solana_logger::setup_with_default(RUST_LOG_FILTER); let max_switch_threshold_failure_pct = 1.0 - 2.0 * SWITCH_FORK_THRESHOLD; let total_stake = 100; @@ -790,36 +789,41 @@ fn test_fork_choice_ingest_votes_from_gossip() { info!("Opened blockstores"); + // Get latest votes + let lighter_fork_latest_vote = last_vote_in_tower( + &lighter_fork_ledger_path, + &context.lighter_fork_validator_key, + ) + .unwrap(); + let heaviest_fork_latest_vote = + last_vote_in_tower(&heaviest_ledger_path, &context.heaviest_validator_key).unwrap(); + // Find the first slot on the smaller fork - let mut first_slot_in_lighter_partition = 0; - for ((heavier_slot, heavier_slot_meta), (lighter_slot, _lighter_slot_meta)) in - heaviest_blockstore - .slot_meta_iterator(0) - .unwrap() - .zip(lighter_fork_blockstore.slot_meta_iterator(0).unwrap()) - { - if heavier_slot != lighter_slot { - // Find the parent of the fork point - let last_common_ancestor = heavier_slot_meta.parent_slot; - let lighter_fork_parent_meta = lighter_fork_blockstore - .meta(last_common_ancestor) - .unwrap() - .unwrap(); - // Lighter fork should only see one next slots, since only two validators - // could have generated childrenof `parent`, and the lighter fork *definitely* - // doesn't see the other fork's child, otherwise `heavier_slot != lighter_slot` - // would not have triggere above. - assert_eq!(lighter_fork_parent_meta.next_slots.len(), 1); - let lighter_fork_child = lighter_fork_parent_meta.next_slots[0]; - assert_ne!(first_slot_in_lighter_partition, heavier_slot); - first_slot_in_lighter_partition = lighter_fork_child; - info!( - "First slot in lighter partition is {}", - first_slot_in_lighter_partition - ); - break; - } - } + let lighter_ancestors: BTreeSet = std::iter::once(lighter_fork_latest_vote) + .chain(AncestorIterator::new( + lighter_fork_latest_vote, + &lighter_fork_blockstore, + )) + .collect(); + let heavier_ancestors: BTreeSet = std::iter::once(heaviest_fork_latest_vote) + .chain(AncestorIterator::new( + heaviest_fork_latest_vote, + &heaviest_blockstore, + )) + .collect(); + let first_slot_in_lighter_partition = *lighter_ancestors + .iter() + .zip(heavier_ancestors.iter()) + .find(|(x, y)| x != y) + .unwrap() + .0; + + // Must have been updated in the above loop + assert!(first_slot_in_lighter_partition != 0); + info!( + "First slot in lighter partition is {}", + first_slot_in_lighter_partition + ); assert!(first_slot_in_lighter_partition != 0);