From 02f8651a9c109c3dca3bfb2263234927b5f84aae Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 17 Feb 2022 20:31:27 +0000 Subject: [PATCH] Fix the flaky test test_restart_tower_rollback (backport #23129) (#23155) * Fix the flaky test test_restart_tower_rollback (#23129) * Add flag to disable voting until a slot to avoid duplicate voting * Fix the tower rollback test and remove it from flaky. (cherry picked from commit ab92578b0294fab9b37ed69afc6a80b5dc78a739) * Resolve conflicts Co-authored-by: Ashwin Sekar --- core/src/replay_stage.rs | 24 +++++++++++++ core/src/tvu.rs | 3 ++ core/src/validator.rs | 3 ++ local-cluster/src/validator_configs.rs | 1 + local-cluster/tests/local_cluster.rs | 49 ++++++++++++++++---------- 5 files changed, 61 insertions(+), 19 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 3aa8133d56..7a4e2771da 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -138,6 +138,9 @@ pub struct ReplayStageConfig { pub wait_for_vote_to_start_leader: bool, pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender, pub tower_storage: Arc, + // Stops voting until this slot has been reached. Should be used to avoid + // duplicate voting which can lead to slashing. + pub wait_to_vote_slot: Option, } #[derive(Default)] @@ -374,6 +377,7 @@ impl ReplayStage { wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, tower_storage, + wait_to_vote_slot, } = config; trace!("replay stage"); @@ -595,6 +599,7 @@ impl ReplayStage { has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender, + wait_to_vote_slot, ); } } @@ -678,6 +683,7 @@ impl ReplayStage { &voting_sender, &mut epoch_slots_frozen_slots, &drop_bank_sender, + wait_to_vote_slot, ); }; voting_time.stop(); @@ -1654,6 +1660,7 @@ impl ReplayStage { voting_sender: &Sender, epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, bank_drop_sender: &Sender>>, + wait_to_vote_slot: Option, ) { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -1742,6 +1749,7 @@ impl ReplayStage { *has_new_vote_been_rooted, replay_timing, voting_sender, + wait_to_vote_slot, ); } @@ -1754,10 +1762,16 @@ impl ReplayStage { switch_fork_decision: &SwitchForkDecision, vote_signatures: &mut Vec, has_new_vote_been_rooted: bool, + wait_to_vote_slot: Option, ) -> Option { if authorized_voter_keypairs.is_empty() { return None; } + if let Some(slot) = wait_to_vote_slot { + if bank.slot() < slot { + return None; + } + } let vote_account = match bank.get_vote_account(vote_account_pubkey) { None => { warn!( @@ -1852,6 +1866,7 @@ impl ReplayStage { has_new_vote_been_rooted: bool, last_vote_refresh_time: &mut LastVoteRefreshTime, voting_sender: &Sender, + wait_to_vote_slot: Option, ) { let last_voted_slot = tower.last_voted_slot(); if last_voted_slot.is_none() { @@ -1894,6 +1909,7 @@ impl ReplayStage { &SwitchForkDecision::SameFork, vote_signatures, has_new_vote_been_rooted, + wait_to_vote_slot, ); if let Some(vote_tx) = vote_tx { @@ -1931,6 +1947,7 @@ impl ReplayStage { has_new_vote_been_rooted: bool, replay_timing: &mut ReplayTiming, voting_sender: &Sender, + wait_to_vote_slot: Option, ) { let mut generate_time = Measure::start("generate_vote"); let vote_tx = Self::generate_vote_tx( @@ -1942,6 +1959,7 @@ impl ReplayStage { switch_fork_decision, vote_signatures, has_new_vote_been_rooted, + wait_to_vote_slot, ); generate_time.stop(); replay_timing.generate_vote_us += generate_time.as_us(); @@ -5730,6 +5748,7 @@ pub mod tests { has_new_vote_been_rooted, &mut ReplayTiming::default(), &voting_sender, + None, ); let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) @@ -5769,6 +5788,7 @@ pub mod tests { has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender, + None, ); // No new votes have been submitted to gossip @@ -5794,6 +5814,7 @@ pub mod tests { has_new_vote_been_rooted, &mut ReplayTiming::default(), &voting_sender, + None, ); let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) @@ -5825,6 +5846,7 @@ pub mod tests { has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender, + None, ); // No new votes have been submitted to gossip @@ -5862,6 +5884,7 @@ pub mod tests { has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender, + None, ); let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) @@ -5929,6 +5952,7 @@ pub mod tests { has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender, + None, ); let votes = cluster_info.get_votes(&mut cursor); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 4203496ed5..fd7ecc8462 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -149,6 +149,7 @@ impl Tvu { accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver), last_full_snapshot_slot: Option, block_metadata_notifier: Option, + wait_to_vote_slot: Option, ) -> Self { let Sockets { repair: repair_socket, @@ -297,6 +298,7 @@ impl Tvu { wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, tower_storage: tower_storage.clone(), + wait_to_vote_slot, }; let (voting_sender, voting_receiver) = channel(); @@ -517,6 +519,7 @@ pub mod tests { accounts_package_channel, None, None, + None, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index c17cccc243..a49b3fd7f8 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -164,6 +164,7 @@ pub struct ValidatorConfig { pub validator_exit: Arc>, pub no_wait_for_vote_to_start_leader: bool, pub accounts_shrink_ratio: AccountShrinkThreshold, + pub wait_to_vote_slot: Option, } impl Default for ValidatorConfig { @@ -223,6 +224,7 @@ impl Default for ValidatorConfig { no_wait_for_vote_to_start_leader: true, accounts_shrink_ratio: AccountShrinkThreshold::default(), accounts_db_config: None, + wait_to_vote_slot: None, } } } @@ -893,6 +895,7 @@ impl Validator { accounts_package_channel, last_full_snapshot_slot, block_metadata_notifier, + config.wait_to_vote_slot, ); let tpu = Tpu::new( diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index dff84c2ec2..4cca70621f 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -61,6 +61,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader, accounts_shrink_ratio: config.accounts_shrink_ratio, accounts_db_config: config.accounts_db_config.clone(), + wait_to_vote_slot: config.wait_to_vote_slot, } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 48dcc4958b..0741b80770 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2178,9 +2178,11 @@ fn test_run_test_load_program_accounts_root() { #[serial] fn test_restart_tower_rollback() { // Test node crashing and failing to save its tower before restart + // Cluster continues to make progress, this node is able to rejoin with + // outdated tower post restart. solana_logger::setup_with_default(RUST_LOG_FILTER); - // First set up the cluster with 4 nodes + // First set up the cluster with 2 nodes let slots_per_epoch = 2048; let node_stakes = vec![10000, 1]; @@ -2189,14 +2191,14 @@ fn test_restart_tower_rollback() { "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", ]; - let validator_b_keypair = Arc::new(Keypair::from_base58_string(validator_strings[1])); - let validator_b_pubkey = validator_b_keypair.pubkey(); - let validator_keys = validator_strings .iter() .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) .take(node_stakes.len()) .collect::>(); + + let b_pubkey = validator_keys[1].0.pubkey(); + let mut config = ClusterConfig { cluster_lamports: 100_000, node_stakes: node_stakes.clone(), @@ -2212,41 +2214,50 @@ fn test_restart_tower_rollback() { }; let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - let val_b_ledger_path = cluster.ledger_path(&validator_b_pubkey); + let val_b_ledger_path = cluster.ledger_path(&b_pubkey); let mut earlier_tower: Tower; loop { sleep(Duration::from_millis(1000)); // Grab the current saved tower - earlier_tower = restore_tower(&val_b_ledger_path, &validator_b_pubkey).unwrap(); + earlier_tower = restore_tower(&val_b_ledger_path, &b_pubkey).unwrap(); if earlier_tower.last_voted_slot().unwrap_or(0) > 1 { break; } } - let exited_validator_info: ClusterValidatorInfo; + let mut exited_validator_info: ClusterValidatorInfo; + let last_voted_slot: Slot; loop { sleep(Duration::from_millis(1000)); // Wait for second, lesser staked validator to make a root past the earlier_tower's // latest vote slot, then exit that validator - if let Some(root) = root_in_tower(&val_b_ledger_path, &validator_b_pubkey) { - if root - > earlier_tower - .last_voted_slot() - .expect("Earlier tower must have at least one vote") - { - exited_validator_info = cluster.exit_node(&validator_b_pubkey); - break; - } + let tower = restore_tower(&val_b_ledger_path, &b_pubkey).unwrap(); + if tower.root() + > earlier_tower + .last_voted_slot() + .expect("Earlier tower must have at least one vote") + { + exited_validator_info = cluster.exit_node(&b_pubkey); + last_voted_slot = tower.last_voted_slot().unwrap(); + break; } } - // Now rewrite the tower with the *earlier_tower* - save_tower(&val_b_ledger_path, &earlier_tower, &validator_b_keypair); + // Now rewrite the tower with the *earlier_tower*. We disable voting until we reach + // a slot we did not previously vote for in order to avoid duplicate vote slashing + // issues. + save_tower( + &val_b_ledger_path, + &earlier_tower, + &exited_validator_info.info.keypair, + ); + exited_validator_info.config.wait_to_vote_slot = Some(last_voted_slot + 10); + cluster.restart_node( - &validator_b_pubkey, + &b_pubkey, exited_validator_info, SocketAddrSpace::Unspecified, );