diff --git a/ci/buildkite-pipeline.sh b/ci/buildkite-pipeline.sh index 667a30302a..ec9cff2190 100755 --- a/ci/buildkite-pipeline.sh +++ b/ci/buildkite-pipeline.sh @@ -256,7 +256,15 @@ EOF command_step "local-cluster" \ ". ci/rust-version.sh; ci/docker-run.sh \$\$rust_stable_docker_image ci/test-local-cluster.sh" \ - 50 + 40 + + command_step "local-cluster-flakey" \ + ". ci/rust-version.sh; ci/docker-run.sh \$\$rust_stable_docker_image ci/test-local-cluster-flakey.sh" \ + 10 + + command_step "local-cluster-slow" \ + ". ci/rust-version.sh; ci/docker-run.sh \$\$rust_stable_docker_image ci/test-local-cluster-slow.sh" \ + 25 } pull_or_push_steps() { diff --git a/ci/run-local.sh b/ci/run-local.sh index 092f35f562..a033b46eba 100755 --- a/ci/run-local.sh +++ b/ci/run-local.sh @@ -27,6 +27,8 @@ steps+=(test-stable-perf) steps+=(test-downstream-builds) steps+=(test-bench) steps+=(test-local-cluster) +steps+=(test-local-cluster-flakey) +steps+=(test-local-cluster-slow) step_index=0 if [[ -n "$1" ]]; then diff --git a/ci/test-local-cluster-flakey.sh b/ci/test-local-cluster-flakey.sh new file mode 120000 index 0000000000..0c92a5c7bd --- /dev/null +++ b/ci/test-local-cluster-flakey.sh @@ -0,0 +1 @@ +test-stable.sh \ No newline at end of file diff --git a/ci/test-local-cluster-slow.sh b/ci/test-local-cluster-slow.sh new file mode 120000 index 0000000000..0c92a5c7bd --- /dev/null +++ b/ci/test-local-cluster-slow.sh @@ -0,0 +1 @@ +test-stable.sh \ No newline at end of file diff --git a/ci/test-stable.sh b/ci/test-stable.sh index 177175e874..200da2d8fb 100755 --- a/ci/test-stable.sh +++ b/ci/test-stable.sh @@ -100,7 +100,17 @@ test-stable-perf) ;; test-local-cluster) _ "$cargo" stable build --release --bins ${V:+--verbose} - _ "$cargo" stable test --release --package solana-local-cluster ${V:+--verbose} -- --nocapture --test-threads=1 + _ "$cargo" stable test --release --package solana-local-cluster --test local_cluster ${V:+--verbose} -- --nocapture --test-threads=1 + exit 0 + ;; +test-local-cluster-flakey) + _ "$cargo" stable build --release --bins ${V:+--verbose} + _ "$cargo" stable test --release --package solana-local-cluster --test local_cluster_flakey ${V:+--verbose} -- --nocapture --test-threads=1 + exit 0 + ;; +test-local-cluster-slow) + _ "$cargo" stable build --release --bins ${V:+--verbose} + _ "$cargo" stable test --release --package solana-local-cluster --test local_cluster_slow ${V:+--verbose} -- --nocapture --test-threads=1 exit 0 ;; test-wasm) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 18ad89790f..748a03a71f 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -792,37 +792,3 @@ impl Drop for LocalCluster { self.close(); } } - -#[cfg(test)] -mod test { - use {super::*, solana_sdk::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH}; - - #[test] - fn test_local_cluster_start_and_exit() { - solana_logger::setup(); - let num_nodes = 1; - let cluster = - LocalCluster::new_with_equal_stakes(num_nodes, 100, 3, SocketAddrSpace::Unspecified); - assert_eq!(cluster.validators.len(), num_nodes); - } - - #[test] - fn test_local_cluster_start_and_exit_with_config() { - solana_logger::setup(); - const NUM_NODES: usize = 1; - let mut config = ClusterConfig { - validator_configs: make_identical_validator_configs( - &ValidatorConfig::default(), - NUM_NODES, - ), - node_stakes: vec![3; NUM_NODES], - cluster_lamports: 100, - ticks_per_slot: 8, - slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH as u64, - stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH as u64, - ..ClusterConfig::default() - }; - let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - assert_eq!(cluster.validators.len(), NUM_NODES); - } -} diff --git a/local-cluster/tests/common.rs b/local-cluster/tests/common.rs new file mode 100644 index 0000000000..f19a813ad8 --- /dev/null +++ b/local-cluster/tests/common.rs @@ -0,0 +1,401 @@ +#![allow(clippy::integer_arithmetic, dead_code)] +use { + log::*, + solana_client::rpc_client::RpcClient, + solana_core::{ + broadcast_stage::BroadcastStageType, + consensus::{Tower, SWITCH_FORK_THRESHOLD}, + tower_storage::FileTowerStorage, + validator::ValidatorConfig, + }, + solana_gossip::gossip_service::discover_cluster, + solana_ledger::{ + ancestor_iterator::AncestorIterator, + blockstore::{Blockstore, PurgeType}, + blockstore_db::AccessType, + leader_schedule::{FixedSchedule, LeaderSchedule}, + }, + solana_local_cluster::{ + cluster::{Cluster, ClusterValidatorInfo}, + cluster_tests, + local_cluster::{ClusterConfig, LocalCluster}, + validator_configs::*, + }, + solana_sdk::{ + account::AccountSharedData, + clock::{self, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT}, + hash::Hash, + pubkey::Pubkey, + signature::{Keypair, Signer}, + }, + solana_streamer::socket::SocketAddrSpace, + std::{ + collections::HashSet, + fs, iter, + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::sleep, + time::Duration, + }, +}; + +pub const RUST_LOG_FILTER: &str = + "error,solana_core::replay_stage=warn,solana_local_cluster=info,local_cluster=info"; + +pub fn last_vote_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<(Slot, Hash)> { + restore_tower(tower_path, node_pubkey).map(|tower| tower.last_voted_slot_hash().unwrap()) +} + +pub fn restore_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { + let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); + + let tower = Tower::restore(&file_tower_storage, node_pubkey); + if let Err(tower_err) = tower { + if tower_err.is_file_missing() { + return None; + } else { + panic!("tower restore failed...: {:?}", tower_err); + } + } + // actually saved tower must have at least one vote. + Tower::restore(&file_tower_storage, node_pubkey).ok() +} + +pub fn remove_tower(tower_path: &Path, node_pubkey: &Pubkey) { + let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); + fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap(); +} + +pub fn open_blockstore(ledger_path: &Path) -> Blockstore { + Blockstore::open_with_access_type(ledger_path, AccessType::TryPrimaryThenSecondary, None, true) + .unwrap_or_else(|e| { + panic!("Failed to open ledger at {:?}, err: {}", ledger_path, e); + }) +} + +pub fn purge_slots(blockstore: &Blockstore, start_slot: Slot, slot_count: Slot) { + blockstore.purge_from_next_slots(start_slot, start_slot + slot_count); + blockstore.purge_slots(start_slot, start_slot + slot_count, PurgeType::Exact); +} + +// Fetches the last vote in the tower, blocking until it has also appeared in blockstore. +// Fails if tower is empty +pub fn wait_for_last_vote_in_tower_to_land_in_ledger( + ledger_path: &Path, + node_pubkey: &Pubkey, +) -> Slot { + let (last_vote, _) = last_vote_in_tower(ledger_path, node_pubkey).unwrap(); + loop { + // We reopen in a loop to make sure we get updates + let blockstore = open_blockstore(ledger_path); + if blockstore.is_full(last_vote) { + break; + } + sleep(Duration::from_millis(100)); + } + last_vote +} + +pub fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) { + for slot in std::iter::once(end_slot).chain(AncestorIterator::new(end_slot, source)) { + let source_meta = source.meta(slot).unwrap().unwrap(); + assert!(source_meta.is_full()); + + let shreds = source.get_data_shreds_for_slot(slot, 0).unwrap(); + dest.insert_shreds(shreds, None, false).unwrap(); + + let dest_meta = dest.meta(slot).unwrap().unwrap(); + assert!(dest_meta.is_full()); + assert_eq!(dest_meta.last_index, source_meta.last_index); + } +} + +/// Computes the numbr of milliseconds `num_blocks` blocks will take given +/// each slot contains `ticks_per_slot` +pub fn ms_for_n_slots(num_blocks: u64, ticks_per_slot: u64) -> u64 { + ((ticks_per_slot * DEFAULT_MS_PER_SLOT * num_blocks) + DEFAULT_TICKS_PER_SLOT - 1) + / DEFAULT_TICKS_PER_SLOT +} + +#[allow(clippy::assertions_on_constants)] +pub fn run_kill_partition_switch_threshold( + stakes_to_kill: &[&[(usize, usize)]], + alive_stakes: &[&[(usize, usize)]], + partition_duration: Option, + ticks_per_slot: Option, + partition_context: C, + on_partition_start: impl Fn(&mut LocalCluster, &[Pubkey], Vec, &mut C), + on_before_partition_resolved: impl Fn(&mut LocalCluster, &mut C), + on_partition_resolved: impl Fn(&mut LocalCluster, &mut C), +) { + // Needs to be at least 1/3 or there will be no overlap + // with the confirmation supermajority 2/3 + assert!(SWITCH_FORK_THRESHOLD >= 1f64 / 3f64); + info!( + "stakes_to_kill: {:?}, alive_stakes: {:?}", + stakes_to_kill, alive_stakes + ); + + // This test: + // 1) Spins up three partitions + // 2) Kills the first partition with the stake `failures_stake` + // 5) runs `on_partition_resolved` + let partitions: Vec<&[(usize, usize)]> = stakes_to_kill + .iter() + .cloned() + .chain(alive_stakes.iter().cloned()) + .collect(); + + let stake_partitions: Vec> = partitions + .iter() + .map(|stakes_and_slots| stakes_and_slots.iter().map(|(stake, _)| *stake).collect()) + .collect(); + let num_slots_per_validator: Vec = partitions + .iter() + .flat_map(|stakes_and_slots| stakes_and_slots.iter().map(|(_, num_slots)| *num_slots)) + .collect(); + + let (leader_schedule, validator_keys) = create_custom_leader_schedule(&num_slots_per_validator); + + info!( + "Validator ids: {:?}", + validator_keys + .iter() + .map(|k| k.pubkey()) + .collect::>() + ); + let validator_pubkeys: Vec = validator_keys.iter().map(|k| k.pubkey()).collect(); + let on_partition_start = |cluster: &mut LocalCluster, partition_context: &mut C| { + let dead_validator_infos: Vec = validator_pubkeys + [0..stakes_to_kill.len()] + .iter() + .map(|validator_to_kill| { + info!("Killing validator with id: {}", validator_to_kill); + cluster.exit_node(validator_to_kill) + }) + .collect(); + on_partition_start( + cluster, + &validator_pubkeys, + dead_validator_infos, + partition_context, + ); + }; + run_cluster_partition( + &stake_partitions, + Some((leader_schedule, validator_keys)), + partition_context, + on_partition_start, + on_before_partition_resolved, + on_partition_resolved, + partition_duration, + ticks_per_slot, + vec![], + ) +} + +pub fn create_custom_leader_schedule( + validator_num_slots: &[usize], +) -> (LeaderSchedule, Vec>) { + let mut leader_schedule = vec![]; + let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new())) + .take(validator_num_slots.len()) + .collect(); + for (k, num_slots) in validator_keys.iter().zip(validator_num_slots.iter()) { + for _ in 0..*num_slots { + leader_schedule.push(k.pubkey()) + } + } + + info!("leader_schedule: {}", leader_schedule.len()); + ( + LeaderSchedule::new_from_schedule(leader_schedule), + validator_keys, + ) +} + +/// This function runs a network, initiates a partition based on a +/// configuration, resolve the partition, then checks that the network +/// continues to achieve consensus +/// # Arguments +/// * `partitions` - A slice of partition configurations, where each partition +/// configuration is a slice of (usize, bool), representing a node's stake and +/// whether or not it should be killed during the partition +/// * `leader_schedule` - An option that specifies whether the cluster should +/// run with a fixed, predetermined leader schedule +#[allow(clippy::cognitive_complexity)] +pub fn run_cluster_partition( + partitions: &[Vec], + leader_schedule: Option<(LeaderSchedule, Vec>)>, + mut context: C, + on_partition_start: impl FnOnce(&mut LocalCluster, &mut C), + on_before_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C), + on_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C), + partition_duration: Option, + ticks_per_slot: Option, + additional_accounts: Vec<(Pubkey, AccountSharedData)>, +) { + solana_logger::setup_with_default(RUST_LOG_FILTER); + info!("PARTITION_TEST!"); + let num_nodes = partitions.len(); + let node_stakes: Vec<_> = partitions + .iter() + .flat_map(|p| p.iter().map(|stake_weight| 100 * *stake_weight as u64)) + .collect(); + assert_eq!(node_stakes.len(), num_nodes); + let cluster_lamports = node_stakes.iter().sum::() * 2; + let enable_partition = Arc::new(AtomicBool::new(true)); + let mut validator_config = ValidatorConfig { + enable_partition: Some(enable_partition.clone()), + ..ValidatorConfig::default() + }; + + // Returns: + // 1) The keys for the validators + // 2) The amount of time it would take to iterate through one full iteration of the given + // leader schedule + let (validator_keys, leader_schedule_time): (Vec<_>, u64) = { + if let Some((leader_schedule, validator_keys)) = leader_schedule { + assert_eq!(validator_keys.len(), num_nodes); + let num_slots_per_rotation = leader_schedule.num_slots() as u64; + let fixed_schedule = FixedSchedule { + start_epoch: 0, + leader_schedule: Arc::new(leader_schedule), + }; + validator_config.fixed_leader_schedule = Some(fixed_schedule); + ( + validator_keys, + num_slots_per_rotation * clock::DEFAULT_MS_PER_SLOT, + ) + } else { + ( + iter::repeat_with(|| Arc::new(Keypair::new())) + .take(partitions.len()) + .collect(), + 10_000, + ) + } + }; + + let slots_per_epoch = 2048; + let mut config = ClusterConfig { + cluster_lamports, + node_stakes, + validator_configs: make_identical_validator_configs(&validator_config, num_nodes), + validator_keys: Some( + validator_keys + .into_iter() + .zip(iter::repeat_with(|| true)) + .collect(), + ), + slots_per_epoch, + stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, + additional_accounts, + ticks_per_slot: ticks_per_slot.unwrap_or(DEFAULT_TICKS_PER_SLOT), + ..ClusterConfig::default() + }; + + info!( + "PARTITION_TEST starting cluster with {:?} partitions slots_per_epoch: {}", + partitions, config.slots_per_epoch, + ); + let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + + info!("PARTITION_TEST spend_and_verify_all_nodes(), ensure all nodes are caught up"); + cluster_tests::spend_and_verify_all_nodes( + &cluster.entry_point_info, + &cluster.funding_keypair, + num_nodes, + HashSet::new(), + SocketAddrSpace::Unspecified, + ); + + let cluster_nodes = discover_cluster( + &cluster.entry_point_info.gossip, + num_nodes, + SocketAddrSpace::Unspecified, + ) + .unwrap(); + + // Check epochs have correct number of slots + info!("PARTITION_TEST sleeping until partition starting condition",); + for node in &cluster_nodes { + let node_client = RpcClient::new_socket(node.rpc); + let epoch_info = node_client.get_epoch_info().unwrap(); + assert_eq!(epoch_info.slots_in_epoch, slots_per_epoch); + } + + info!("PARTITION_TEST start partition"); + on_partition_start(&mut cluster, &mut context); + enable_partition.store(false, Ordering::Relaxed); + + sleep(Duration::from_millis( + partition_duration.unwrap_or(leader_schedule_time), + )); + + on_before_partition_resolved(&mut cluster, &mut context); + info!("PARTITION_TEST remove partition"); + enable_partition.store(true, Ordering::Relaxed); + + // Give partitions time to propagate their blocks from during the partition + // after the partition resolves + let timeout = 10_000; + let propagation_time = leader_schedule_time; + info!( + "PARTITION_TEST resolving partition. sleeping {} ms", + timeout + ); + sleep(Duration::from_millis(timeout)); + info!( + "PARTITION_TEST waiting for blocks to propagate after partition {}ms", + propagation_time + ); + sleep(Duration::from_millis(propagation_time)); + info!("PARTITION_TEST resuming normal operation"); + on_partition_resolved(&mut cluster, &mut context); +} + +pub fn test_faulty_node( + faulty_node_type: BroadcastStageType, + node_stakes: Vec, +) -> (LocalCluster, Vec>) { + solana_logger::setup_with_default("solana_local_cluster=info"); + let num_nodes = node_stakes.len(); + + let error_validator_config = ValidatorConfig { + broadcast_stage_type: faulty_node_type, + ..ValidatorConfig::default() + }; + let mut validator_configs = Vec::with_capacity(num_nodes); + + // First validator is the bootstrap leader with the malicious broadcast logic. + validator_configs.push(error_validator_config); + validator_configs.resize_with(num_nodes, ValidatorConfig::default); + + let mut validator_keys = Vec::with_capacity(num_nodes); + validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true)); + + assert_eq!(node_stakes.len(), num_nodes); + assert_eq!(validator_keys.len(), num_nodes); + + let mut cluster_config = ClusterConfig { + cluster_lamports: 10_000, + node_stakes, + validator_configs, + validator_keys: Some(validator_keys.clone()), + skip_warmup_slots: true, + ..ClusterConfig::default() + }; + + let cluster = LocalCluster::new(&mut cluster_config, SocketAddrSpace::Unspecified); + let validator_keys: Vec> = validator_keys + .into_iter() + .map(|(keypair, _)| keypair) + .collect(); + + (cluster, validator_keys) +} diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 8c261be7c2..4152d6ff6a 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -1,6 +1,11 @@ #![allow(clippy::integer_arithmetic)] use { assert_matches::assert_matches, + common::{ + create_custom_leader_schedule, last_vote_in_tower, ms_for_n_slots, open_blockstore, + purge_slots, remove_tower, restore_tower, run_cluster_partition, + run_kill_partition_switch_threshold, test_faulty_node, RUST_LOG_FILTER, + }, crossbeam_channel::{unbounded, Receiver}, gag::BufferRedirect, log::*, @@ -13,9 +18,7 @@ use { thin_client::{create_client, ThinClient}, }, solana_core::{ - broadcast_stage::{ - broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType, - }, + broadcast_stage::BroadcastStageType, consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, replay_stage::DUPLICATE_THRESHOLD, @@ -23,17 +26,8 @@ use { validator::ValidatorConfig, }, solana_download_utils::download_snapshot_archive, - solana_gossip::{ - cluster_info::VALIDATOR_PORT_RANGE, - crds::Cursor, - gossip_service::{self, discover_cluster}, - }, - solana_ledger::{ - ancestor_iterator::AncestorIterator, - blockstore::{Blockstore, PurgeType}, - blockstore_db::AccessType, - leader_schedule::{FixedSchedule, LeaderSchedule}, - }, + solana_gossip::{cluster_info::VALIDATOR_PORT_RANGE, gossip_service::discover_cluster}, + solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore}, solana_local_cluster::{ cluster::{Cluster, ClusterValidatorInfo}, cluster_tests, @@ -49,20 +43,19 @@ use { solana_sdk::{ account::AccountSharedData, client::{AsyncClient, SyncClient}, - clock::{self, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE}, + clock::{self, Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE}, commitment_config::CommitmentConfig, epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, genesis_config::ClusterType, - hash::Hash, poh_config::PohConfig, pubkey::Pubkey, signature::{Keypair, Signer}, system_program, system_transaction, }, solana_streamer::socket::SocketAddrSpace, - solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction}, + solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY, std::{ - collections::{BTreeSet, HashMap, HashSet}, + collections::{HashMap, HashSet}, fs, io::Read, iter, @@ -77,8 +70,33 @@ use { tempfile::TempDir, }; -const RUST_LOG_FILTER: &str = - "error,solana_core::replay_stage=warn,solana_local_cluster=info,local_cluster=info"; +mod common; + +#[test] +fn test_local_cluster_start_and_exit() { + solana_logger::setup(); + let num_nodes = 1; + let cluster = + LocalCluster::new_with_equal_stakes(num_nodes, 100, 3, SocketAddrSpace::Unspecified); + assert_eq!(cluster.validators.len(), num_nodes); +} + +#[test] +fn test_local_cluster_start_and_exit_with_config() { + solana_logger::setup(); + const NUM_NODES: usize = 1; + let mut config = ClusterConfig { + validator_configs: make_identical_validator_configs(&ValidatorConfig::default(), NUM_NODES), + node_stakes: vec![3; NUM_NODES], + cluster_lamports: 100, + ticks_per_slot: 8, + slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH as u64, + stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH as u64, + ..ClusterConfig::default() + }; + let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + assert_eq!(cluster.validators.len(), NUM_NODES); +} #[test] #[serial] @@ -298,148 +316,6 @@ fn test_leader_failure_4() { ); } -/// This function runs a network, initiates a partition based on a -/// configuration, resolve the partition, then checks that the network -/// continues to achieve consensus -/// # Arguments -/// * `partitions` - A slice of partition configurations, where each partition -/// configuration is a slice of (usize, bool), representing a node's stake and -/// whether or not it should be killed during the partition -/// * `leader_schedule` - An option that specifies whether the cluster should -/// run with a fixed, predetermined leader schedule -#[allow(clippy::cognitive_complexity)] -fn run_cluster_partition( - partitions: &[Vec], - leader_schedule: Option<(LeaderSchedule, Vec>)>, - mut context: C, - on_partition_start: impl FnOnce(&mut LocalCluster, &mut C), - on_before_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C), - on_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C), - partition_duration: Option, - ticks_per_slot: Option, - additional_accounts: Vec<(Pubkey, AccountSharedData)>, -) { - solana_logger::setup_with_default(RUST_LOG_FILTER); - info!("PARTITION_TEST!"); - let num_nodes = partitions.len(); - let node_stakes: Vec<_> = partitions - .iter() - .flat_map(|p| p.iter().map(|stake_weight| 100 * *stake_weight as u64)) - .collect(); - assert_eq!(node_stakes.len(), num_nodes); - let cluster_lamports = node_stakes.iter().sum::() * 2; - let enable_partition = Arc::new(AtomicBool::new(true)); - let mut validator_config = ValidatorConfig { - enable_partition: Some(enable_partition.clone()), - ..ValidatorConfig::default() - }; - - // Returns: - // 1) The keys for the validators - // 2) The amount of time it would take to iterate through one full iteration of the given - // leader schedule - let (validator_keys, leader_schedule_time): (Vec<_>, u64) = { - if let Some((leader_schedule, validator_keys)) = leader_schedule { - assert_eq!(validator_keys.len(), num_nodes); - let num_slots_per_rotation = leader_schedule.num_slots() as u64; - let fixed_schedule = FixedSchedule { - start_epoch: 0, - leader_schedule: Arc::new(leader_schedule), - }; - validator_config.fixed_leader_schedule = Some(fixed_schedule); - ( - validator_keys, - num_slots_per_rotation * clock::DEFAULT_MS_PER_SLOT, - ) - } else { - ( - iter::repeat_with(|| Arc::new(Keypair::new())) - .take(partitions.len()) - .collect(), - 10_000, - ) - } - }; - - let slots_per_epoch = 2048; - let mut config = ClusterConfig { - cluster_lamports, - node_stakes, - validator_configs: make_identical_validator_configs(&validator_config, num_nodes), - validator_keys: Some( - validator_keys - .into_iter() - .zip(iter::repeat_with(|| true)) - .collect(), - ), - slots_per_epoch, - stakers_slot_offset: slots_per_epoch, - skip_warmup_slots: true, - additional_accounts, - ticks_per_slot: ticks_per_slot.unwrap_or(DEFAULT_TICKS_PER_SLOT), - ..ClusterConfig::default() - }; - - info!( - "PARTITION_TEST starting cluster with {:?} partitions slots_per_epoch: {}", - partitions, config.slots_per_epoch, - ); - let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - - info!("PARTITION_TEST spend_and_verify_all_nodes(), ensure all nodes are caught up"); - cluster_tests::spend_and_verify_all_nodes( - &cluster.entry_point_info, - &cluster.funding_keypair, - num_nodes, - HashSet::new(), - SocketAddrSpace::Unspecified, - ); - - let cluster_nodes = discover_cluster( - &cluster.entry_point_info.gossip, - num_nodes, - SocketAddrSpace::Unspecified, - ) - .unwrap(); - - // Check epochs have correct number of slots - info!("PARTITION_TEST sleeping until partition starting condition",); - for node in &cluster_nodes { - let node_client = RpcClient::new_socket(node.rpc); - let epoch_info = node_client.get_epoch_info().unwrap(); - assert_eq!(epoch_info.slots_in_epoch, slots_per_epoch); - } - - info!("PARTITION_TEST start partition"); - on_partition_start(&mut cluster, &mut context); - enable_partition.store(false, Ordering::Relaxed); - - sleep(Duration::from_millis( - partition_duration.unwrap_or(leader_schedule_time), - )); - - on_before_partition_resolved(&mut cluster, &mut context); - info!("PARTITION_TEST remove partition"); - enable_partition.store(true, Ordering::Relaxed); - - // Give partitions time to propagate their blocks from during the partition - // after the partition resolves - let timeout = 10_000; - let propagation_time = leader_schedule_time; - info!( - "PARTITION_TEST resolving partition. sleeping {} ms", - timeout - ); - sleep(Duration::from_millis(timeout)); - info!( - "PARTITION_TEST waiting for blocks to propagate after partition {}ms", - propagation_time - ); - sleep(Duration::from_millis(propagation_time)); - info!("PARTITION_TEST resuming normal operation"); - on_partition_resolved(&mut cluster, &mut context); -} - #[allow(unused_attributes)] #[ignore] #[test] @@ -502,752 +378,6 @@ fn test_cluster_partition_1_1_1() { ) } -fn create_custom_leader_schedule( - validator_num_slots: &[usize], -) -> (LeaderSchedule, Vec>) { - let mut leader_schedule = vec![]; - let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new())) - .take(validator_num_slots.len()) - .collect(); - for (k, num_slots) in validator_keys.iter().zip(validator_num_slots.iter()) { - for _ in 0..*num_slots { - leader_schedule.push(k.pubkey()) - } - } - - info!("leader_schedule: {}", leader_schedule.len()); - ( - LeaderSchedule::new_from_schedule(leader_schedule), - validator_keys, - ) -} - -#[test] -#[serial] -fn test_kill_heaviest_partition() { - // This test: - // 1) Spins up four partitions, the heaviest being the first with more stake - // 2) Schedules the other validators for sufficient slots in the schedule - // so that they will still be locked out of voting for the major partition - // when the partition resolves - // 3) Kills the most staked partition. Validators are locked out, but should all - // eventually choose the major partition - // 4) Check for recovery - let num_slots_per_validator = 8; - let partitions: [Vec; 4] = [vec![11], vec![10], vec![10], vec![10]]; - let (leader_schedule, validator_keys) = create_custom_leader_schedule(&[ - num_slots_per_validator * (partitions.len() - 1), - num_slots_per_validator, - num_slots_per_validator, - num_slots_per_validator, - ]); - - let empty = |_: &mut LocalCluster, _: &mut ()| {}; - let validator_to_kill = validator_keys[0].pubkey(); - let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { - info!("Killing validator with id: {}", validator_to_kill); - cluster.exit_node(&validator_to_kill); - cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified); - }; - run_cluster_partition( - &partitions, - Some((leader_schedule, validator_keys)), - (), - empty, - empty, - on_partition_resolved, - None, - None, - vec![], - ) -} - -#[allow(clippy::assertions_on_constants)] -fn run_kill_partition_switch_threshold( - stakes_to_kill: &[&[(usize, usize)]], - alive_stakes: &[&[(usize, usize)]], - partition_duration: Option, - ticks_per_slot: Option, - partition_context: C, - on_partition_start: impl Fn(&mut LocalCluster, &[Pubkey], Vec, &mut C), - on_before_partition_resolved: impl Fn(&mut LocalCluster, &mut C), - on_partition_resolved: impl Fn(&mut LocalCluster, &mut C), -) { - // Needs to be at least 1/3 or there will be no overlap - // with the confirmation supermajority 2/3 - assert!(SWITCH_FORK_THRESHOLD >= 1f64 / 3f64); - info!( - "stakes_to_kill: {:?}, alive_stakes: {:?}", - stakes_to_kill, alive_stakes - ); - - // This test: - // 1) Spins up three partitions - // 2) Kills the first partition with the stake `failures_stake` - // 5) runs `on_partition_resolved` - let partitions: Vec<&[(usize, usize)]> = stakes_to_kill - .iter() - .cloned() - .chain(alive_stakes.iter().cloned()) - .collect(); - - let stake_partitions: Vec> = partitions - .iter() - .map(|stakes_and_slots| stakes_and_slots.iter().map(|(stake, _)| *stake).collect()) - .collect(); - let num_slots_per_validator: Vec = partitions - .iter() - .flat_map(|stakes_and_slots| stakes_and_slots.iter().map(|(_, num_slots)| *num_slots)) - .collect(); - - let (leader_schedule, validator_keys) = create_custom_leader_schedule(&num_slots_per_validator); - - info!( - "Validator ids: {:?}", - validator_keys - .iter() - .map(|k| k.pubkey()) - .collect::>() - ); - let validator_pubkeys: Vec = validator_keys.iter().map(|k| k.pubkey()).collect(); - let on_partition_start = |cluster: &mut LocalCluster, partition_context: &mut C| { - let dead_validator_infos: Vec = validator_pubkeys - [0..stakes_to_kill.len()] - .iter() - .map(|validator_to_kill| { - info!("Killing validator with id: {}", validator_to_kill); - cluster.exit_node(validator_to_kill) - }) - .collect(); - on_partition_start( - cluster, - &validator_pubkeys, - dead_validator_infos, - partition_context, - ); - }; - run_cluster_partition( - &stake_partitions, - Some((leader_schedule, validator_keys)), - partition_context, - on_partition_start, - on_before_partition_resolved, - on_partition_resolved, - partition_duration, - ticks_per_slot, - vec![], - ) -} - -fn find_latest_replayed_slot_from_ledger( - ledger_path: &Path, - mut latest_slot: Slot, -) -> (Slot, HashSet) { - loop { - let mut blockstore = open_blockstore(ledger_path); - // This is kind of a hack because we can't query for new frozen blocks over RPC - // since the validator is not voting. - let new_latest_slots: Vec = blockstore - .slot_meta_iterator(latest_slot) - .unwrap() - .filter_map(|(s, _)| if s > latest_slot { Some(s) } else { None }) - .collect(); - - for new_latest_slot in new_latest_slots { - latest_slot = new_latest_slot; - info!("Checking latest_slot {}", latest_slot); - // Wait for the slot to be fully received by the validator - let entries; - loop { - info!("Waiting for slot {} to be full", latest_slot); - if blockstore.is_full(latest_slot) { - entries = blockstore.get_slot_entries(latest_slot, 0).unwrap(); - assert!(!entries.is_empty()); - break; - } else { - sleep(Duration::from_millis(50)); - blockstore = open_blockstore(ledger_path); - } - } - // Check the slot has been replayed - let non_tick_entry = entries.into_iter().find(|e| !e.transactions.is_empty()); - if let Some(non_tick_entry) = non_tick_entry { - // Wait for the slot to be replayed - loop { - info!("Waiting for slot {} to be replayed", latest_slot); - if !blockstore - .map_transactions_to_statuses( - latest_slot, - non_tick_entry.transactions.clone().into_iter(), - ) - .unwrap() - .is_empty() - { - return ( - latest_slot, - AncestorIterator::new(latest_slot, &blockstore).collect(), - ); - } else { - sleep(Duration::from_millis(50)); - blockstore = open_blockstore(ledger_path); - } - } - } else { - info!( - "No transactions in slot {}, can't tell if it was replayed", - latest_slot - ); - } - } - sleep(Duration::from_millis(50)); - } -} - -#[test] -#[serial] -fn test_switch_threshold_uses_gossip_votes() { - solana_logger::setup_with_default(RUST_LOG_FILTER); - let total_stake = 100; - - // Minimum stake needed to generate a switching proof - let minimum_switch_stake = (SWITCH_FORK_THRESHOLD as f64 * total_stake as f64) as u64; - - // Make the heavier stake insufficient for switching so tha the lighter validator - // cannot switch without seeing a vote from the dead/failure_stake validator. - let heavier_stake = minimum_switch_stake; - let lighter_stake = heavier_stake - 1; - let failures_stake = total_stake - heavier_stake - lighter_stake; - - let partitions: &[&[(usize, usize)]] = &[ - &[(heavier_stake as usize, 8)], - &[(lighter_stake as usize, 8)], - ]; - - #[derive(Default)] - struct PartitionContext { - heaviest_validator_key: Pubkey, - lighter_validator_key: Pubkey, - dead_validator_info: Option, - } - - let on_partition_start = |_cluster: &mut LocalCluster, - validator_keys: &[Pubkey], - mut dead_validator_infos: Vec, - context: &mut PartitionContext| { - assert_eq!(dead_validator_infos.len(), 1); - context.dead_validator_info = Some(dead_validator_infos.pop().unwrap()); - // validator_keys[0] is the validator that will be killed, i.e. the validator with - // stake == `failures_stake` - context.heaviest_validator_key = validator_keys[1]; - context.lighter_validator_key = validator_keys[2]; - }; - - let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut PartitionContext| {}; - - // Check that new roots were set after the partition resolves (gives time - // for lockouts built during partition to resolve and gives validators an opportunity - // to try and switch forks) - let on_partition_resolved = |cluster: &mut LocalCluster, context: &mut PartitionContext| { - let lighter_validator_ledger_path = cluster.ledger_path(&context.lighter_validator_key); - let heavier_validator_ledger_path = cluster.ledger_path(&context.heaviest_validator_key); - - let (lighter_validator_latest_vote, _) = last_vote_in_tower( - &lighter_validator_ledger_path, - &context.lighter_validator_key, - ) - .unwrap(); - - info!( - "Lighter validator's latest vote is for slot {}", - lighter_validator_latest_vote - ); - - // Lighter partition should stop voting after detecting the heavier partition and try - // to switch. Loop until we see a greater vote by the heavier validator than the last - // vote made by the lighter validator on the lighter fork. - let mut heavier_validator_latest_vote; - let mut heavier_validator_latest_vote_hash; - let heavier_blockstore = open_blockstore(&heavier_validator_ledger_path); - loop { - let (sanity_check_lighter_validator_latest_vote, _) = last_vote_in_tower( - &lighter_validator_ledger_path, - &context.lighter_validator_key, - ) - .unwrap(); - - // Lighter validator should stop voting, because `on_partition_resolved` is only - // called after a propagation time where blocks from the other fork should have - // finished propagating - assert_eq!( - sanity_check_lighter_validator_latest_vote, - lighter_validator_latest_vote - ); - - let (new_heavier_validator_latest_vote, new_heavier_validator_latest_vote_hash) = - last_vote_in_tower( - &heavier_validator_ledger_path, - &context.heaviest_validator_key, - ) - .unwrap(); - - heavier_validator_latest_vote = new_heavier_validator_latest_vote; - heavier_validator_latest_vote_hash = new_heavier_validator_latest_vote_hash; - - // Latest vote for each validator should be on different forks - assert_ne!(lighter_validator_latest_vote, heavier_validator_latest_vote); - if heavier_validator_latest_vote > lighter_validator_latest_vote { - let heavier_ancestors: HashSet = - AncestorIterator::new(heavier_validator_latest_vote, &heavier_blockstore) - .collect(); - assert!(!heavier_ancestors.contains(&lighter_validator_latest_vote)); - break; - } - } - - info!("Checking to make sure lighter validator doesn't switch"); - let mut latest_slot = lighter_validator_latest_vote; - - // Number of chances the validator had to switch votes but didn't - let mut total_voting_opportunities = 0; - while total_voting_opportunities <= 5 { - let (new_latest_slot, latest_slot_ancestors) = - find_latest_replayed_slot_from_ledger(&lighter_validator_ledger_path, latest_slot); - latest_slot = new_latest_slot; - // Ensure `latest_slot` is on the other fork - if latest_slot_ancestors.contains(&heavier_validator_latest_vote) { - let tower = restore_tower( - &lighter_validator_ledger_path, - &context.lighter_validator_key, - ) - .unwrap(); - // Check that there was an opportunity to vote - if !tower.is_locked_out(latest_slot, &latest_slot_ancestors) { - // Ensure the lighter blockstore has not voted again - let new_lighter_validator_latest_vote = tower.last_voted_slot().unwrap(); - assert_eq!( - new_lighter_validator_latest_vote, - lighter_validator_latest_vote - ); - info!( - "Incrementing voting opportunities: {}", - total_voting_opportunities - ); - total_voting_opportunities += 1; - } else { - info!( - "Tower still locked out, can't vote for slot: {}", - latest_slot - ); - } - } else if latest_slot > heavier_validator_latest_vote { - warn!( - "validator is still generating blocks on its own fork, last processed slot: {}", - latest_slot - ); - } - sleep(Duration::from_millis(50)); - } - - // Make a vote from the killed validator for slot `heavier_validator_latest_vote` in gossip - info!( - "Simulate vote for slot: {} from dead validator", - heavier_validator_latest_vote - ); - let vote_keypair = &context - .dead_validator_info - .as_ref() - .unwrap() - .info - .voting_keypair - .clone(); - let node_keypair = &context - .dead_validator_info - .as_ref() - .unwrap() - .info - .keypair - .clone(); - - cluster_tests::submit_vote_to_cluster_gossip( - node_keypair, - vote_keypair, - heavier_validator_latest_vote, - heavier_validator_latest_vote_hash, - // Make the vote transaction with a random blockhash. Thus, the vote only lives in gossip but - // never makes it into a block - Hash::new_unique(), - cluster - .get_contact_info(&context.heaviest_validator_key) - .unwrap() - .gossip, - &SocketAddrSpace::Unspecified, - ) - .unwrap(); - - loop { - // Wait for the lighter validator to switch to the heavier fork - let (new_lighter_validator_latest_vote, _) = last_vote_in_tower( - &lighter_validator_ledger_path, - &context.lighter_validator_key, - ) - .unwrap(); - - if new_lighter_validator_latest_vote != lighter_validator_latest_vote { - info!( - "Lighter validator switched forks at slot: {}", - new_lighter_validator_latest_vote - ); - let (heavier_validator_latest_vote, _) = last_vote_in_tower( - &heavier_validator_ledger_path, - &context.heaviest_validator_key, - ) - .unwrap(); - let (smaller, larger) = - if new_lighter_validator_latest_vote > heavier_validator_latest_vote { - ( - heavier_validator_latest_vote, - new_lighter_validator_latest_vote, - ) - } else { - ( - new_lighter_validator_latest_vote, - heavier_validator_latest_vote, - ) - }; - - // Check the new vote is on the same fork as the heaviest fork - let heavier_blockstore = open_blockstore(&heavier_validator_ledger_path); - let larger_slot_ancestors: HashSet = - AncestorIterator::new(larger, &heavier_blockstore) - .chain(std::iter::once(larger)) - .collect(); - assert!(larger_slot_ancestors.contains(&smaller)); - break; - } else { - sleep(Duration::from_millis(50)); - } - } - }; - - let ticks_per_slot = 8; - run_kill_partition_switch_threshold( - &[&[(failures_stake as usize, 0)]], - partitions, - // Partition long enough such that the first vote made by validator with - // `alive_stake_3` won't be ingested due to BlockhashTooOld, - None, - Some(ticks_per_slot), - PartitionContext::default(), - on_partition_start, - on_before_partition_resolved, - on_partition_resolved, - ); -} - -#[test] -#[serial] -fn test_kill_partition_switch_threshold_no_progress() { - let max_switch_threshold_failure_pct = 1.0 - 2.0 * SWITCH_FORK_THRESHOLD; - let total_stake = 10_000; - let max_failures_stake = (max_switch_threshold_failure_pct * total_stake as f64) as u64; - - let failures_stake = max_failures_stake; - let total_alive_stake = total_stake - failures_stake; - let alive_stake_1 = total_alive_stake / 2; - let alive_stake_2 = total_alive_stake - alive_stake_1; - - // Check that no new roots were set 400 slots after partition resolves (gives time - // for lockouts built during partition to resolve and gives validators an opportunity - // to try and switch forks) - let on_partition_start = - |_: &mut LocalCluster, _: &[Pubkey], _: Vec, _: &mut ()| {}; - let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut ()| {}; - let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { - cluster.check_no_new_roots(400, "PARTITION_TEST", SocketAddrSpace::Unspecified); - }; - - // This kills `max_failures_stake`, so no progress should be made - run_kill_partition_switch_threshold( - &[&[(failures_stake as usize, 16)]], - &[ - &[(alive_stake_1 as usize, 8)], - &[(alive_stake_2 as usize, 8)], - ], - None, - None, - (), - on_partition_start, - on_before_partition_resolved, - on_partition_resolved, - ); -} - -#[test] -#[serial] -fn test_kill_partition_switch_threshold_progress() { - let max_switch_threshold_failure_pct = 1.0 - 2.0 * SWITCH_FORK_THRESHOLD; - let total_stake = 10_000; - - // Kill `< max_failures_stake` of the validators - let max_failures_stake = (max_switch_threshold_failure_pct * total_stake as f64) as u64; - let failures_stake = max_failures_stake - 1; - let total_alive_stake = total_stake - failures_stake; - - // Partition the remaining alive validators, should still make progress - // once the partition resolves - let alive_stake_1 = total_alive_stake / 2; - let alive_stake_2 = total_alive_stake - alive_stake_1; - let bigger = std::cmp::max(alive_stake_1, alive_stake_2); - let smaller = std::cmp::min(alive_stake_1, alive_stake_2); - - // At least one of the forks must have > SWITCH_FORK_THRESHOLD in order - // to guarantee switching proofs can be created. Make sure the other fork - // is <= SWITCH_FORK_THRESHOLD to make sure progress can be made. Caches - // bugs such as liveness issues bank-weighted fork choice, which may stall - // because the fork with less stake could have more weight, but other fork would: - // 1) Not be able to generate a switching proof - // 2) Other more staked fork stops voting, so doesn't catch up in bank weight. - assert!( - bigger as f64 / total_stake as f64 > SWITCH_FORK_THRESHOLD - && smaller as f64 / total_stake as f64 <= SWITCH_FORK_THRESHOLD - ); - - let on_partition_start = - |_: &mut LocalCluster, _: &[Pubkey], _: Vec, _: &mut ()| {}; - let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut ()| {}; - let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { - cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified); - }; - run_kill_partition_switch_threshold( - &[&[(failures_stake as usize, 16)]], - &[ - &[(alive_stake_1 as usize, 8)], - &[(alive_stake_2 as usize, 8)], - ], - None, - None, - (), - on_partition_start, - on_before_partition_resolved, - on_partition_resolved, - ); -} - -#[test] -#[serial] -// Steps in this test: -// We want to create a situation like: -/* - 1 (2%, killed and restarted) --- 200 (37%, lighter fork) - / - 0 - \-------- 4 (38%, heavier fork) -*/ -// where the 2% that voted on slot 1 don't see their votes land in a block -// due to blockhash expiration, and thus without resigning their votes with -// a newer blockhash, will deem slot 4 the heavier fork and try to switch to -// slot 4, which doesn't pass the switch threshold. This stalls the network. - -// We do this by: -// 1) Creating a partition so all three nodes don't see each other -// 2) Kill the validator with 2% -// 3) Wait for longer than blockhash expiration -// 4) Copy in the lighter fork's blocks up, *only* up to the first slot in the lighter fork -// (not all the blocks on the lighter fork!), call this slot `L` -// 5) Restart the validator with 2% so that he votes on `L`, but the vote doesn't land -// due to blockhash expiration -// 6) Resolve the partition so that the 2% repairs the other fork, and tries to switch, -// stalling the network. - -fn test_fork_choice_refresh_old_votes() { - solana_logger::setup_with_default(RUST_LOG_FILTER); - let max_switch_threshold_failure_pct = 1.0 - 2.0 * SWITCH_FORK_THRESHOLD; - let total_stake = 100; - let max_failures_stake = (max_switch_threshold_failure_pct * total_stake as f64) as u64; - - // 1% less than the failure stake, where the 2% is allocated to a validator that - // has no leader slots and thus won't be able to vote on its own fork. - let failures_stake = max_failures_stake; - let total_alive_stake = total_stake - failures_stake; - let alive_stake_1 = total_alive_stake / 2 - 1; - let alive_stake_2 = total_alive_stake - alive_stake_1 - 1; - - // Heavier fork still doesn't have enough stake to switch. Both branches need - // the vote to land from the validator with `alive_stake_3` to allow the other - // fork to switch. - let alive_stake_3 = 2; - assert!(alive_stake_1 < alive_stake_2); - assert!(alive_stake_1 + alive_stake_3 > alive_stake_2); - - let partitions: &[&[(usize, usize)]] = &[ - &[(alive_stake_1 as usize, 8)], - &[(alive_stake_2 as usize, 8)], - &[(alive_stake_3 as usize, 0)], - ]; - - #[derive(Default)] - struct PartitionContext { - alive_stake3_info: Option, - smallest_validator_key: Pubkey, - lighter_fork_validator_key: Pubkey, - heaviest_validator_key: Pubkey, - } - let on_partition_start = |cluster: &mut LocalCluster, - validator_keys: &[Pubkey], - _: Vec, - context: &mut PartitionContext| { - // Kill validator with alive_stake_3, second in `partitions` slice - let smallest_validator_key = &validator_keys[3]; - let info = cluster.exit_node(smallest_validator_key); - context.alive_stake3_info = Some(info); - context.smallest_validator_key = *smallest_validator_key; - // validator_keys[0] is the validator that will be killed, i.e. the validator with - // stake == `failures_stake` - context.lighter_fork_validator_key = validator_keys[1]; - // Third in `partitions` slice - context.heaviest_validator_key = validator_keys[2]; - }; - - let ticks_per_slot = 8; - let on_before_partition_resolved = - |cluster: &mut LocalCluster, context: &mut PartitionContext| { - // Equal to ms_per_slot * MAX_PROCESSING_AGE, rounded up - let sleep_time_ms = ms_for_n_slots(MAX_PROCESSING_AGE as u64, ticks_per_slot); - info!("Wait for blockhashes to expire, {} ms", sleep_time_ms); - - // Wait for blockhashes to expire - sleep(Duration::from_millis(sleep_time_ms)); - - let smallest_ledger_path = context - .alive_stake3_info - .as_ref() - .unwrap() - .info - .ledger_path - .clone(); - let lighter_fork_ledger_path = cluster.ledger_path(&context.lighter_fork_validator_key); - let heaviest_ledger_path = cluster.ledger_path(&context.heaviest_validator_key); - - // Get latest votes. We make sure to wait until the vote has landed in - // blockstore. This is important because if we were the leader for the block there - // is a possibility of voting before broadcast has inserted in blockstore. - let lighter_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger( - &lighter_fork_ledger_path, - &context.lighter_fork_validator_key, - ); - let heaviest_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger( - &heaviest_ledger_path, - &context.heaviest_validator_key, - ); - - // Open ledgers - let smallest_blockstore = open_blockstore(&smallest_ledger_path); - let lighter_fork_blockstore = open_blockstore(&lighter_fork_ledger_path); - let heaviest_blockstore = open_blockstore(&heaviest_ledger_path); - - info!("Opened blockstores"); - - // Find the first slot on the smaller fork - let lighter_ancestors: BTreeSet = std::iter::once(lighter_fork_latest_vote) - .chain(AncestorIterator::new( - lighter_fork_latest_vote, - &lighter_fork_blockstore, - )) - .collect(); - let heavier_ancestors: BTreeSet = std::iter::once(heaviest_fork_latest_vote) - .chain(AncestorIterator::new( - heaviest_fork_latest_vote, - &heaviest_blockstore, - )) - .collect(); - let first_slot_in_lighter_partition = *lighter_ancestors - .iter() - .zip(heavier_ancestors.iter()) - .find(|(x, y)| x != y) - .unwrap() - .0; - - // Must have been updated in the above loop - assert!(first_slot_in_lighter_partition != 0); - info!( - "First slot in lighter partition is {}", - first_slot_in_lighter_partition - ); - - // Copy all the blocks from the smaller partition up to `first_slot_in_lighter_partition` - // into the smallest validator's blockstore - copy_blocks( - first_slot_in_lighter_partition, - &lighter_fork_blockstore, - &smallest_blockstore, - ); - - // Restart the smallest validator that we killed earlier in `on_partition_start()` - drop(smallest_blockstore); - cluster.restart_node( - &context.smallest_validator_key, - context.alive_stake3_info.take().unwrap(), - SocketAddrSpace::Unspecified, - ); - - loop { - // Wait for node to vote on the first slot on the less heavy fork, so it'll need - // a switch proof to flip to the other fork. - // However, this vote won't land because it's using an expired blockhash. The - // fork structure will look something like this after the vote: - /* - 1 (2%, killed and restarted) --- 200 (37%, lighter fork) - / - 0 - \-------- 4 (38%, heavier fork) - */ - if let Some((last_vote_slot, _last_vote_hash)) = - last_vote_in_tower(&smallest_ledger_path, &context.smallest_validator_key) - { - // Check that the heaviest validator on the other fork doesn't have this slot, - // this must mean we voted on a unique slot on this fork - if last_vote_slot == first_slot_in_lighter_partition { - info!( - "Saw vote on first slot in lighter partition {}", - first_slot_in_lighter_partition - ); - break; - } else { - info!( - "Haven't seen vote on first slot in lighter partition, latest vote is: {}", - last_vote_slot - ); - } - } - - sleep(Duration::from_millis(20)); - } - - // Now resolve partition, allow validator to see the fork with the heavier validator, - // but the fork it's currently on is the heaviest, if only its own vote landed! - }; - - // Check that new roots were set after the partition resolves (gives time - // for lockouts built during partition to resolve and gives validators an opportunity - // to try and switch forks) - let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut PartitionContext| { - cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified); - }; - - run_kill_partition_switch_threshold( - &[&[(failures_stake as usize - 1, 16)]], - partitions, - // Partition long enough such that the first vote made by validator with - // `alive_stake_3` won't be ingested due to BlockhashTooOld, - None, - Some(ticks_per_slot), - PartitionContext::default(), - on_partition_start, - on_before_partition_resolved, - on_partition_resolved, - ); -} - #[test] #[serial] fn test_two_unbalanced_stakes() { @@ -1358,26 +488,6 @@ fn test_restart_node() { ); } -#[test] -#[serial] -fn test_listener_startup() { - let mut config = ClusterConfig { - node_stakes: vec![100; 1], - cluster_lamports: 1_000, - num_listeners: 3, - validator_configs: make_identical_validator_configs(&ValidatorConfig::default(), 1), - ..ClusterConfig::default() - }; - let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - let cluster_nodes = discover_cluster( - &cluster.entry_point_info.gossip, - 4, - SocketAddrSpace::Unspecified, - ) - .unwrap(); - assert_eq!(cluster_nodes.len(), 4); -} - #[test] #[serial] fn test_mainnet_beta_cluster_type() { @@ -2463,246 +1573,6 @@ fn test_fake_shreds_broadcast_leader() { ); } -#[test] -#[serial] -#[allow(unused_attributes)] -fn test_duplicate_shreds_broadcast_leader() { - // Create 4 nodes: - // 1) Bad leader sending different versions of shreds to both of the other nodes - // 2) 1 node who's voting behavior in gossip - // 3) 1 validator gets the same version as the leader, will see duplicate confirmation - // 4) 1 validator will not get the same version as the leader. For each of these - // duplicate slots `S` either: - // a) The leader's version of `S` gets > DUPLICATE_THRESHOLD of votes in gossip and so this - // node will repair that correct version - // b) A descendant `D` of some version of `S` gets > DUPLICATE_THRESHOLD votes in gossip, - // but no version of `S` does. Then the node will not know to repair the right version - // by just looking at gossip, but will instead have to use EpochSlots repair after - // detecting that a descendant does not chain to its version of `S`, and marks that descendant - // dead. - // Scenarios a) or b) are triggered by our node in 2) who's voting behavior we control. - - // Critical that bad_leader_stake + good_node_stake < DUPLICATE_THRESHOLD and that - // bad_leader_stake + good_node_stake + our_node_stake > DUPLICATE_THRESHOLD so that - // our vote is the determining factor - let bad_leader_stake = 10000000000; - // Ensure that the good_node_stake is always on the critical path, and the partition node - // should never be on the critical path. This way, none of the bad shreds sent to the partition - // node corrupt the good node. - let good_node_stake = 500000; - let our_node_stake = 10000000000; - let partition_node_stake = 1; - - let node_stakes = vec![ - bad_leader_stake, - partition_node_stake, - good_node_stake, - // Needs to be last in the vector, so that we can - // find the id of this node. See call to `test_faulty_node` - // below for more details. - our_node_stake, - ]; - assert_eq!(*node_stakes.last().unwrap(), our_node_stake); - let total_stake: u64 = node_stakes.iter().sum(); - - assert!( - ((bad_leader_stake + good_node_stake) as f64 / total_stake as f64) < DUPLICATE_THRESHOLD - ); - assert!( - (bad_leader_stake + good_node_stake + our_node_stake) as f64 / total_stake as f64 - > DUPLICATE_THRESHOLD - ); - - // Important that the partition node stake is the smallest so that it gets selected - // for the partition. - assert!(partition_node_stake < our_node_stake && partition_node_stake < good_node_stake); - - // 1) Set up the cluster - let (mut cluster, validator_keys) = test_faulty_node( - BroadcastStageType::BroadcastDuplicates(BroadcastDuplicatesConfig { - stake_partition: partition_node_stake, - }), - node_stakes, - ); - - // This is why it's important our node was last in `node_stakes` - let our_id = validator_keys.last().unwrap().pubkey(); - - // 2) Kill our node and start up a thread to simulate votes to control our voting behavior - let our_info = cluster.exit_node(&our_id); - let node_keypair = our_info.info.keypair; - let vote_keypair = our_info.info.voting_keypair; - let bad_leader_id = cluster.entry_point_info.id; - let bad_leader_ledger_path = cluster.validators[&bad_leader_id].info.ledger_path.clone(); - info!("our node id: {}", node_keypair.pubkey()); - - // 3) Start up a spy to listen for votes - let exit = Arc::new(AtomicBool::new(false)); - let (gossip_service, _tcp_listener, cluster_info) = gossip_service::make_gossip_node( - // Need to use our validator's keypair to gossip EpochSlots and votes for our - // node later. - Keypair::from_bytes(&node_keypair.to_bytes()).unwrap(), - Some(&cluster.entry_point_info.gossip), - &exit, - None, - 0, - false, - SocketAddrSpace::Unspecified, - ); - - let t_voter = { - let exit = exit.clone(); - std::thread::spawn(move || { - let mut cursor = Cursor::default(); - let mut max_vote_slot = 0; - let mut gossip_vote_index = 0; - loop { - if exit.load(Ordering::Relaxed) { - return; - } - - let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor); - let mut parsed_vote_iter: Vec<_> = labels - .into_iter() - .zip(votes.into_iter()) - .filter_map(|(label, leader_vote_tx)| { - // Filter out votes not from the bad leader - if label.pubkey() == bad_leader_id { - let vote = vote_transaction::parse_vote_transaction(&leader_vote_tx) - .map(|(_, vote, _)| vote) - .unwrap(); - // Filter out empty votes - if !vote.slots.is_empty() { - Some((vote, leader_vote_tx)) - } else { - None - } - } else { - None - } - }) - .collect(); - - parsed_vote_iter.sort_by(|(vote, _), (vote2, _)| { - vote.slots.last().unwrap().cmp(vote2.slots.last().unwrap()) - }); - - for (parsed_vote, leader_vote_tx) in &parsed_vote_iter { - if let Some(latest_vote_slot) = parsed_vote.slots.last() { - info!("received vote for {}", latest_vote_slot); - // Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot. - if *latest_vote_slot > max_vote_slot { - let new_epoch_slots: Vec = - (max_vote_slot + 1..latest_vote_slot + 1).collect(); - info!( - "Simulating epoch slots from our node: {:?}", - new_epoch_slots - ); - cluster_info.push_epoch_slots(&new_epoch_slots); - max_vote_slot = *latest_vote_slot; - } - - // Only vote on even slots. Note this may violate lockouts if the - // validator started voting on a different fork before we could exit - // it above. - let vote_hash = parsed_vote.hash; - if latest_vote_slot % 2 == 0 { - info!( - "Simulating vote from our node on slot {}, hash {}", - latest_vote_slot, vote_hash - ); - - // Add all recent vote slots on this fork to allow cluster to pass - // vote threshold checks in replay. Note this will instantly force a - // root by this validator, but we're not concerned with lockout violations - // by this validator so it's fine. - let leader_blockstore = open_blockstore(&bad_leader_ledger_path); - let mut vote_slots: Vec = AncestorIterator::new_inclusive( - *latest_vote_slot, - &leader_blockstore, - ) - .take(MAX_LOCKOUT_HISTORY) - .collect(); - vote_slots.reverse(); - let vote_tx = vote_transaction::new_vote_transaction( - vote_slots, - vote_hash, - leader_vote_tx.message.recent_blockhash, - &node_keypair, - &vote_keypair, - &vote_keypair, - None, - ); - gossip_vote_index += 1; - gossip_vote_index %= MAX_LOCKOUT_HISTORY; - cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8) - } - } - // Give vote some time to propagate - sleep(Duration::from_millis(100)); - } - - if parsed_vote_iter.is_empty() { - sleep(Duration::from_millis(100)); - } - } - }) - }; - - // 4) Check that the cluster is making progress - cluster.check_for_new_roots( - 16, - "test_duplicate_shreds_broadcast_leader", - SocketAddrSpace::Unspecified, - ); - - // Clean up threads - exit.store(true, Ordering::Relaxed); - t_voter.join().unwrap(); - gossip_service.join().unwrap(); -} - -fn test_faulty_node( - faulty_node_type: BroadcastStageType, - node_stakes: Vec, -) -> (LocalCluster, Vec>) { - solana_logger::setup_with_default("solana_local_cluster=info"); - let num_nodes = node_stakes.len(); - - let error_validator_config = ValidatorConfig { - broadcast_stage_type: faulty_node_type, - ..ValidatorConfig::default() - }; - let mut validator_configs = Vec::with_capacity(num_nodes); - - // First validator is the bootstrap leader with the malicious broadcast logic. - validator_configs.push(error_validator_config); - validator_configs.resize_with(num_nodes, ValidatorConfig::default); - - let mut validator_keys = Vec::with_capacity(num_nodes); - validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true)); - - assert_eq!(node_stakes.len(), num_nodes); - assert_eq!(validator_keys.len(), num_nodes); - - let mut cluster_config = ClusterConfig { - cluster_lamports: 10_000, - node_stakes, - validator_configs, - validator_keys: Some(validator_keys.clone()), - skip_warmup_slots: true, - ..ClusterConfig::default() - }; - - let cluster = LocalCluster::new(&mut cluster_config, SocketAddrSpace::Unspecified); - let validator_keys: Vec> = validator_keys - .into_iter() - .map(|(keypair, _)| keypair) - .collect(); - - (cluster, validator_keys) -} - #[test] fn test_wait_for_max_stake() { solana_logger::setup_with_default(RUST_LOG_FILTER); @@ -3040,396 +1910,16 @@ fn test_validator_saves_tower() { assert_eq!(tower4.root(), tower3.root() + 1); } -fn open_blockstore(ledger_path: &Path) -> Blockstore { - Blockstore::open_with_access_type(ledger_path, AccessType::TryPrimaryThenSecondary, None, true) - .unwrap_or_else(|e| { - panic!("Failed to open ledger at {:?}, err: {}", ledger_path, e); - }) -} - -fn purge_slots(blockstore: &Blockstore, start_slot: Slot, slot_count: Slot) { - blockstore.purge_from_next_slots(start_slot, start_slot + slot_count); - blockstore.purge_slots(start_slot, start_slot + slot_count, PurgeType::Exact); -} - -fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) { - for slot in std::iter::once(end_slot).chain(AncestorIterator::new(end_slot, source)) { - let source_meta = source.meta(slot).unwrap().unwrap(); - assert!(source_meta.is_full()); - - let shreds = source.get_data_shreds_for_slot(slot, 0).unwrap(); - dest.insert_shreds(shreds, None, false).unwrap(); - - let dest_meta = dest.meta(slot).unwrap().unwrap(); - assert!(dest_meta.is_full()); - assert_eq!(dest_meta.last_index, source_meta.last_index); - } -} - fn save_tower(tower_path: &Path, tower: &Tower, node_keypair: &Keypair) { let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); let saved_tower = SavedTower::new(tower, node_keypair).unwrap(); file_tower_storage.store(&saved_tower).unwrap(); } -fn restore_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { - let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); - - let tower = Tower::restore(&file_tower_storage, node_pubkey); - if let Err(tower_err) = tower { - if tower_err.is_file_missing() { - return None; - } else { - panic!("tower restore failed...: {:?}", tower_err); - } - } - // actually saved tower must have at least one vote. - Tower::restore(&file_tower_storage, node_pubkey).ok() -} - -fn last_vote_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<(Slot, Hash)> { - restore_tower(tower_path, node_pubkey).map(|tower| tower.last_voted_slot_hash().unwrap()) -} - -// Fetches the last vote in the tower, blocking until it has also appeared in blockstore. -// Fails if tower is empty -fn wait_for_last_vote_in_tower_to_land_in_ledger(ledger_path: &Path, node_pubkey: &Pubkey) -> Slot { - let (last_vote, _) = last_vote_in_tower(ledger_path, node_pubkey).unwrap(); - loop { - // We reopen in a loop to make sure we get updates - let blockstore = open_blockstore(ledger_path); - if blockstore.is_full(last_vote) { - break; - } - sleep(Duration::from_millis(100)); - } - last_vote -} - fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { restore_tower(tower_path, node_pubkey).map(|tower| tower.root()) } -fn remove_tower(tower_path: &Path, node_pubkey: &Pubkey) { - let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); - fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap(); -} - -// A bit convoluted test case; but this roughly follows this test theoretical scenario: -// -// Step 1: You have validator A + B with 31% and 36% of the stake. Run only validator B: -// -// S0 -> S1 -> S2 -> S3 (B vote) -// -// Step 2: Turn off B, and truncate the ledger after slot `S3` (simulate votes not -// landing in next slot). -// Copy ledger fully to validator A and validator C -// -// Step 3: Turn on A, and have it vote up to S3. Truncate anything past slot `S3`. -// -// S0 -> S1 -> S2 -> S3 (A & B vote, optimistically confirmed) -// -// Step 4: -// Start validator C with 33% of the stake with same ledger, but only up to slot S2. -// Have `C` generate some blocks like: -// -// S0 -> S1 -> S2 -> S4 -// -// Step 3: Then restart `A` which had 31% of the stake. With the tower, from `A`'s -// perspective it sees: -// -// S0 -> S1 -> S2 -> S3 (voted) -// | -// -> S4 -> S5 (C's vote for S4) -// -// The fork choice rule weights look like: -// -// S0 -> S1 -> S2 (ABC) -> S3 -// | -// -> S4 (C) -> S5 -// -// Step 5: -// Without the persisted tower: -// `A` would choose to vote on the fork with `S4 -> S5`. This is true even if `A` -// generates a new fork starting at slot `S3` because `C` has more stake than `A` -// so `A` will eventually pick the fork `C` is on. -// -// Furthermore `B`'s vote on `S3` is not observable because there are no -// descendants of slot `S3`, so that fork will not be chosen over `C`'s fork -// -// With the persisted tower: -// `A` should not be able to generate a switching proof. -// -fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: bool) { - solana_logger::setup_with_default(RUST_LOG_FILTER); - - // First set up the cluster with 4 nodes - let slots_per_epoch = 2048; - let node_stakes = vec![31, 36, 33, 0]; - - // Each pubkeys are prefixed with A, B, C and D. - // D is needed to: - // 1) Propagate A's votes for S2 to validator C after A shuts down so that - // C can avoid NoPropagatedConfirmation errors and continue to generate blocks - // 2) Provide gossip discovery for `A` when it restarts because `A` will restart - // at a different gossip port than the entrypoint saved in C's gossip table - let validator_keys = vec![ - "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", - "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", - "4mx9yoFBeYasDKBGDWCTWGJdWuJCKbgqmuP8bN9umybCh5Jzngw7KQxe99Rf5uzfyzgba1i65rJW4Wqk7Ab5S8ye", - "3zsEPEDsjfEay7te9XqNjRTCE7vwuT6u4DHzBJC19yp7GS8BuNRMRjnpVrKCBzb3d44kxc4KPGSHkCmk6tEfswCg", - ] - .iter() - .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) - .take(node_stakes.len()) - .collect::>(); - let validators = validator_keys - .iter() - .map(|(kp, _)| kp.pubkey()) - .collect::>(); - let (validator_a_pubkey, validator_b_pubkey, validator_c_pubkey) = - (validators[0], validators[1], validators[2]); - - // Disable voting on all validators other than validator B to ensure neither of the below two - // scenarios occur: - // 1. If the cluster immediately forks on restart while we're killing validators A and C, - // with Validator B on one side, and `A` and `C` on a heavier fork, it's possible that the lockouts - // on `A` and `C`'s latest votes do not extend past validator B's latest vote. Then validator B - // will be stuck unable to vote, but also unable generate a switching proof to the heavier fork. - // - // 2. Validator A doesn't vote past `next_slot_on_a` before we can kill it. This is essential - // because if validator A votes past `next_slot_on_a`, and then we copy over validator B's ledger - // below only for slots <= `next_slot_on_a`, validator A will not know how it's last vote chains - // to the otehr forks, and may violate switching proofs on restart. - let mut validator_configs = - make_identical_validator_configs(&ValidatorConfig::default(), node_stakes.len()); - - validator_configs[0].voting_disabled = true; - validator_configs[2].voting_disabled = true; - - let mut config = ClusterConfig { - cluster_lamports: 100_000, - node_stakes, - validator_configs, - validator_keys: Some(validator_keys), - slots_per_epoch, - stakers_slot_offset: slots_per_epoch, - skip_warmup_slots: true, - ..ClusterConfig::default() - }; - let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - - let base_slot = 26; // S2 - let next_slot_on_a = 27; // S3 - let truncated_slots = 100; // just enough to purge all following slots after the S2 and S3 - - let val_a_ledger_path = cluster.ledger_path(&validator_a_pubkey); - let val_b_ledger_path = cluster.ledger_path(&validator_b_pubkey); - let val_c_ledger_path = cluster.ledger_path(&validator_c_pubkey); - - info!( - "val_a {} ledger path {:?}", - validator_a_pubkey, val_a_ledger_path - ); - info!( - "val_b {} ledger path {:?}", - validator_b_pubkey, val_b_ledger_path - ); - info!( - "val_c {} ledger path {:?}", - validator_c_pubkey, val_c_ledger_path - ); - - // Immediately kill validator A, and C - info!("Exiting validators A and C"); - let mut validator_a_info = cluster.exit_node(&validator_a_pubkey); - let mut validator_c_info = cluster.exit_node(&validator_c_pubkey); - - // Step 1: - // Let validator B, (D) run for a while. - let now = Instant::now(); - loop { - let elapsed = now.elapsed(); - assert!( - elapsed <= Duration::from_secs(30), - "Validator B failed to vote on any slot >= {} in {} secs", - next_slot_on_a, - elapsed.as_secs() - ); - sleep(Duration::from_millis(100)); - - if let Some((last_vote, _)) = last_vote_in_tower(&val_b_ledger_path, &validator_b_pubkey) { - if last_vote >= next_slot_on_a { - break; - } - } - } - // kill B - let _validator_b_info = cluster.exit_node(&validator_b_pubkey); - - // Step 2: - // Stop validator and truncate ledger, copy over B's ledger to A - info!("truncate validator C's ledger"); - { - // first copy from validator B's ledger - std::fs::remove_dir_all(&validator_c_info.info.ledger_path).unwrap(); - let mut opt = fs_extra::dir::CopyOptions::new(); - opt.copy_inside = true; - fs_extra::dir::copy(&val_b_ledger_path, &val_c_ledger_path, &opt).unwrap(); - // Remove B's tower in the C's new copied ledger - remove_tower(&val_c_ledger_path, &validator_b_pubkey); - - let blockstore = open_blockstore(&val_c_ledger_path); - purge_slots(&blockstore, base_slot + 1, truncated_slots); - } - info!("Create validator A's ledger"); - { - // Find latest vote in B, and wait for it to reach blockstore - let b_last_vote = - wait_for_last_vote_in_tower_to_land_in_ledger(&val_b_ledger_path, &validator_b_pubkey); - - // Now we copy these blocks to A - let b_blockstore = open_blockstore(&val_b_ledger_path); - let a_blockstore = open_blockstore(&val_a_ledger_path); - copy_blocks(b_last_vote, &b_blockstore, &a_blockstore); - - // Purge uneccessary slots - purge_slots(&a_blockstore, next_slot_on_a + 1, truncated_slots); - } - - // Step 3: - // Restart A with voting enabled so that it can vote on B's fork - // up to `next_slot_on_a`, thereby optimistcally confirming `next_slot_on_a` - info!("Restarting A"); - validator_a_info.config.voting_disabled = false; - cluster.restart_node( - &validator_a_pubkey, - validator_a_info, - SocketAddrSpace::Unspecified, - ); - - info!("Waiting for A to vote on slot descended from slot `next_slot_on_a`"); - let now = Instant::now(); - loop { - if let Some((last_vote_slot, _)) = - last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) - { - if last_vote_slot >= next_slot_on_a { - info!( - "Validator A has caught up and voted on slot: {}", - last_vote_slot - ); - break; - } - } - - if now.elapsed().as_secs() >= 30 { - panic!( - "Validator A has not seen optimistic confirmation slot > {} in 30 seconds", - next_slot_on_a - ); - } - - sleep(Duration::from_millis(20)); - } - - info!("Killing A"); - let validator_a_info = cluster.exit_node(&validator_a_pubkey); - { - let blockstore = open_blockstore(&val_a_ledger_path); - purge_slots(&blockstore, next_slot_on_a + 1, truncated_slots); - if !with_tower { - info!("Removing tower!"); - remove_tower(&val_a_ledger_path, &validator_a_pubkey); - - // Remove next_slot_on_a from ledger to force validator A to select - // votes_on_c_fork. Otherwise the validator A will immediately vote - // for 27 on restart, because it hasn't gotten the heavier fork from - // validator C yet. - // Then it will be stuck on 27 unable to switch because C doesn't - // have enough stake to generate a switching proof - purge_slots(&blockstore, next_slot_on_a, truncated_slots); - } else { - info!("Not removing tower!"); - } - } - - // Step 4: - // Run validator C only to make it produce and vote on its own fork. - info!("Restart validator C again!!!"); - validator_c_info.config.voting_disabled = false; - cluster.restart_node( - &validator_c_pubkey, - validator_c_info, - SocketAddrSpace::Unspecified, - ); - - let mut votes_on_c_fork = std::collections::BTreeSet::new(); // S4 and S5 - for _ in 0..100 { - sleep(Duration::from_millis(100)); - - if let Some((last_vote, _)) = last_vote_in_tower(&val_c_ledger_path, &validator_c_pubkey) { - if last_vote != base_slot { - votes_on_c_fork.insert(last_vote); - // Collect 4 votes - if votes_on_c_fork.len() >= 4 { - break; - } - } - } - } - assert!(!votes_on_c_fork.is_empty()); - info!("collected validator C's votes: {:?}", votes_on_c_fork); - - // Step 5: - // verify whether there was violation or not - info!("Restart validator A again!!!"); - cluster.restart_node( - &validator_a_pubkey, - validator_a_info, - SocketAddrSpace::Unspecified, - ); - - // monitor for actual votes from validator A - let mut bad_vote_detected = false; - let mut a_votes = vec![]; - for _ in 0..100 { - sleep(Duration::from_millis(100)); - - if let Some((last_vote, _)) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) { - a_votes.push(last_vote); - let blockstore = Blockstore::open_with_access_type( - &val_a_ledger_path, - AccessType::TryPrimaryThenSecondary, - None, - true, - ) - .unwrap(); - let mut ancestors = AncestorIterator::new(last_vote, &blockstore); - if ancestors.any(|a| votes_on_c_fork.contains(&a)) { - bad_vote_detected = true; - break; - } - } - } - - info!("Observed A's votes on: {:?}", a_votes); - - // an elaborate way of assert!(with_tower && !bad_vote_detected || ...) - let expects_optimistic_confirmation_violation = !with_tower; - if bad_vote_detected != expects_optimistic_confirmation_violation { - if bad_vote_detected { - panic!("No violation expected because of persisted tower!"); - } else { - panic!("Violation expected because of removed persisted tower!"); - } - } else if bad_vote_detected { - info!("THIS TEST expected violations. And indeed, there was some, because of removed persisted tower."); - } else { - info!("THIS TEST expected no violation. And indeed, there was none, thanks to persisted tower."); - } -} - enum ClusterMode { MasterOnly, MasterSlave, @@ -3676,18 +2166,6 @@ fn test_hard_fork_invalidates_tower() { .check_for_new_roots(16, "hard fork", SocketAddrSpace::Unspecified); } -#[test] -#[serial] -fn test_no_optimistic_confirmation_violation_with_tower() { - do_test_optimistic_confirmation_violation_with_or_without_tower(true); -} - -#[test] -#[serial] -fn test_optimistic_confirmation_violation_without_tower() { - do_test_optimistic_confirmation_violation_with_or_without_tower(false); -} - #[test] #[serial] fn test_run_test_load_program_accounts_root() { @@ -4213,10 +2691,3 @@ fn setup_snapshot_validator_config( num_account_paths, ) } - -/// Computes the numbr of milliseconds `num_blocks` blocks will take given -/// each slot contains `ticks_per_slot` -fn ms_for_n_slots(num_blocks: u64, ticks_per_slot: u64) -> u64 { - ((ticks_per_slot * DEFAULT_MS_PER_SLOT * num_blocks) + DEFAULT_TICKS_PER_SLOT - 1) - / DEFAULT_TICKS_PER_SLOT -} diff --git a/local-cluster/tests/local_cluster_flakey.rs b/local-cluster/tests/local_cluster_flakey.rs new file mode 100644 index 0000000000..ee75a7f3b1 --- /dev/null +++ b/local-cluster/tests/local_cluster_flakey.rs @@ -0,0 +1,356 @@ +//! Move flakey tests here so that when they fail, there's less to retry in CI +//! because these tests are run separately from the rest of local cluster tests. +#![allow(clippy::integer_arithmetic)] +use { + common::{ + copy_blocks, last_vote_in_tower, open_blockstore, purge_slots, remove_tower, + wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER, + }, + log::*, + serial_test::serial, + solana_core::validator::ValidatorConfig, + solana_ledger::{ + ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db::AccessType, + }, + solana_local_cluster::{ + cluster::Cluster, + local_cluster::{ClusterConfig, LocalCluster}, + validator_configs::*, + }, + solana_sdk::signature::{Keypair, Signer}, + solana_streamer::socket::SocketAddrSpace, + std::{ + sync::Arc, + thread::sleep, + time::{Duration, Instant}, + }, +}; + +mod common; + +#[test] +#[serial] +fn test_no_optimistic_confirmation_violation_with_tower() { + do_test_optimistic_confirmation_violation_with_or_without_tower(true); +} + +#[test] +#[serial] +fn test_optimistic_confirmation_violation_without_tower() { + do_test_optimistic_confirmation_violation_with_or_without_tower(false); +} + +// A bit convoluted test case; but this roughly follows this test theoretical scenario: +// +// Step 1: You have validator A + B with 31% and 36% of the stake. Run only validator B: +// +// S0 -> S1 -> S2 -> S3 (B vote) +// +// Step 2: Turn off B, and truncate the ledger after slot `S3` (simulate votes not +// landing in next slot). +// Copy ledger fully to validator A and validator C +// +// Step 3: Turn on A, and have it vote up to S3. Truncate anything past slot `S3`. +// +// S0 -> S1 -> S2 -> S3 (A & B vote, optimistically confirmed) +// +// Step 4: +// Start validator C with 33% of the stake with same ledger, but only up to slot S2. +// Have `C` generate some blocks like: +// +// S0 -> S1 -> S2 -> S4 +// +// Step 3: Then restart `A` which had 31% of the stake. With the tower, from `A`'s +// perspective it sees: +// +// S0 -> S1 -> S2 -> S3 (voted) +// | +// -> S4 -> S5 (C's vote for S4) +// +// The fork choice rule weights look like: +// +// S0 -> S1 -> S2 (ABC) -> S3 +// | +// -> S4 (C) -> S5 +// +// Step 5: +// Without the persisted tower: +// `A` would choose to vote on the fork with `S4 -> S5`. This is true even if `A` +// generates a new fork starting at slot `S3` because `C` has more stake than `A` +// so `A` will eventually pick the fork `C` is on. +// +// Furthermore `B`'s vote on `S3` is not observable because there are no +// descendants of slot `S3`, so that fork will not be chosen over `C`'s fork +// +// With the persisted tower: +// `A` should not be able to generate a switching proof. +// +fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: bool) { + solana_logger::setup_with_default(RUST_LOG_FILTER); + + // First set up the cluster with 4 nodes + let slots_per_epoch = 2048; + let node_stakes = vec![31, 36, 33, 0]; + + // Each pubkeys are prefixed with A, B, C and D. + // D is needed to: + // 1) Propagate A's votes for S2 to validator C after A shuts down so that + // C can avoid NoPropagatedConfirmation errors and continue to generate blocks + // 2) Provide gossip discovery for `A` when it restarts because `A` will restart + // at a different gossip port than the entrypoint saved in C's gossip table + let validator_keys = vec![ + "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", + "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", + "4mx9yoFBeYasDKBGDWCTWGJdWuJCKbgqmuP8bN9umybCh5Jzngw7KQxe99Rf5uzfyzgba1i65rJW4Wqk7Ab5S8ye", + "3zsEPEDsjfEay7te9XqNjRTCE7vwuT6u4DHzBJC19yp7GS8BuNRMRjnpVrKCBzb3d44kxc4KPGSHkCmk6tEfswCg", + ] + .iter() + .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) + .take(node_stakes.len()) + .collect::>(); + let validators = validator_keys + .iter() + .map(|(kp, _)| kp.pubkey()) + .collect::>(); + let (validator_a_pubkey, validator_b_pubkey, validator_c_pubkey) = + (validators[0], validators[1], validators[2]); + + // Disable voting on all validators other than validator B to ensure neither of the below two + // scenarios occur: + // 1. If the cluster immediately forks on restart while we're killing validators A and C, + // with Validator B on one side, and `A` and `C` on a heavier fork, it's possible that the lockouts + // on `A` and `C`'s latest votes do not extend past validator B's latest vote. Then validator B + // will be stuck unable to vote, but also unable generate a switching proof to the heavier fork. + // + // 2. Validator A doesn't vote past `next_slot_on_a` before we can kill it. This is essential + // because if validator A votes past `next_slot_on_a`, and then we copy over validator B's ledger + // below only for slots <= `next_slot_on_a`, validator A will not know how it's last vote chains + // to the otehr forks, and may violate switching proofs on restart. + let mut validator_configs = + make_identical_validator_configs(&ValidatorConfig::default(), node_stakes.len()); + + validator_configs[0].voting_disabled = true; + validator_configs[2].voting_disabled = true; + + let mut config = ClusterConfig { + cluster_lamports: 100_000, + node_stakes, + validator_configs, + validator_keys: Some(validator_keys), + slots_per_epoch, + stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, + ..ClusterConfig::default() + }; + let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + + let base_slot = 26; // S2 + let next_slot_on_a = 27; // S3 + let truncated_slots = 100; // just enough to purge all following slots after the S2 and S3 + + let val_a_ledger_path = cluster.ledger_path(&validator_a_pubkey); + let val_b_ledger_path = cluster.ledger_path(&validator_b_pubkey); + let val_c_ledger_path = cluster.ledger_path(&validator_c_pubkey); + + info!( + "val_a {} ledger path {:?}", + validator_a_pubkey, val_a_ledger_path + ); + info!( + "val_b {} ledger path {:?}", + validator_b_pubkey, val_b_ledger_path + ); + info!( + "val_c {} ledger path {:?}", + validator_c_pubkey, val_c_ledger_path + ); + + // Immediately kill validator A, and C + info!("Exiting validators A and C"); + let mut validator_a_info = cluster.exit_node(&validator_a_pubkey); + let mut validator_c_info = cluster.exit_node(&validator_c_pubkey); + + // Step 1: + // Let validator B, (D) run for a while. + let now = Instant::now(); + loop { + let elapsed = now.elapsed(); + assert!( + elapsed <= Duration::from_secs(30), + "Validator B failed to vote on any slot >= {} in {} secs", + next_slot_on_a, + elapsed.as_secs() + ); + sleep(Duration::from_millis(100)); + + if let Some((last_vote, _)) = last_vote_in_tower(&val_b_ledger_path, &validator_b_pubkey) { + if last_vote >= next_slot_on_a { + break; + } + } + } + // kill B + let _validator_b_info = cluster.exit_node(&validator_b_pubkey); + + // Step 2: + // Stop validator and truncate ledger, copy over B's ledger to A + info!("truncate validator C's ledger"); + { + // first copy from validator B's ledger + std::fs::remove_dir_all(&validator_c_info.info.ledger_path).unwrap(); + let mut opt = fs_extra::dir::CopyOptions::new(); + opt.copy_inside = true; + fs_extra::dir::copy(&val_b_ledger_path, &val_c_ledger_path, &opt).unwrap(); + // Remove B's tower in the C's new copied ledger + remove_tower(&val_c_ledger_path, &validator_b_pubkey); + + let blockstore = open_blockstore(&val_c_ledger_path); + purge_slots(&blockstore, base_slot + 1, truncated_slots); + } + info!("Create validator A's ledger"); + { + // Find latest vote in B, and wait for it to reach blockstore + let b_last_vote = + wait_for_last_vote_in_tower_to_land_in_ledger(&val_b_ledger_path, &validator_b_pubkey); + + // Now we copy these blocks to A + let b_blockstore = open_blockstore(&val_b_ledger_path); + let a_blockstore = open_blockstore(&val_a_ledger_path); + copy_blocks(b_last_vote, &b_blockstore, &a_blockstore); + + // Purge uneccessary slots + purge_slots(&a_blockstore, next_slot_on_a + 1, truncated_slots); + } + + // Step 3: + // Restart A with voting enabled so that it can vote on B's fork + // up to `next_slot_on_a`, thereby optimistcally confirming `next_slot_on_a` + info!("Restarting A"); + validator_a_info.config.voting_disabled = false; + cluster.restart_node( + &validator_a_pubkey, + validator_a_info, + SocketAddrSpace::Unspecified, + ); + + info!("Waiting for A to vote on slot descended from slot `next_slot_on_a`"); + let now = Instant::now(); + loop { + if let Some((last_vote_slot, _)) = + last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) + { + if last_vote_slot >= next_slot_on_a { + info!( + "Validator A has caught up and voted on slot: {}", + last_vote_slot + ); + break; + } + } + + if now.elapsed().as_secs() >= 30 { + panic!( + "Validator A has not seen optimistic confirmation slot > {} in 30 seconds", + next_slot_on_a + ); + } + + sleep(Duration::from_millis(20)); + } + + info!("Killing A"); + let validator_a_info = cluster.exit_node(&validator_a_pubkey); + { + let blockstore = open_blockstore(&val_a_ledger_path); + purge_slots(&blockstore, next_slot_on_a + 1, truncated_slots); + if !with_tower { + info!("Removing tower!"); + remove_tower(&val_a_ledger_path, &validator_a_pubkey); + + // Remove next_slot_on_a from ledger to force validator A to select + // votes_on_c_fork. Otherwise the validator A will immediately vote + // for 27 on restart, because it hasn't gotten the heavier fork from + // validator C yet. + // Then it will be stuck on 27 unable to switch because C doesn't + // have enough stake to generate a switching proof + purge_slots(&blockstore, next_slot_on_a, truncated_slots); + } else { + info!("Not removing tower!"); + } + } + + // Step 4: + // Run validator C only to make it produce and vote on its own fork. + info!("Restart validator C again!!!"); + validator_c_info.config.voting_disabled = false; + cluster.restart_node( + &validator_c_pubkey, + validator_c_info, + SocketAddrSpace::Unspecified, + ); + + let mut votes_on_c_fork = std::collections::BTreeSet::new(); // S4 and S5 + for _ in 0..100 { + sleep(Duration::from_millis(100)); + + if let Some((last_vote, _)) = last_vote_in_tower(&val_c_ledger_path, &validator_c_pubkey) { + if last_vote != base_slot { + votes_on_c_fork.insert(last_vote); + // Collect 4 votes + if votes_on_c_fork.len() >= 4 { + break; + } + } + } + } + assert!(!votes_on_c_fork.is_empty()); + info!("collected validator C's votes: {:?}", votes_on_c_fork); + + // Step 5: + // verify whether there was violation or not + info!("Restart validator A again!!!"); + cluster.restart_node( + &validator_a_pubkey, + validator_a_info, + SocketAddrSpace::Unspecified, + ); + + // monitor for actual votes from validator A + let mut bad_vote_detected = false; + let mut a_votes = vec![]; + for _ in 0..100 { + sleep(Duration::from_millis(100)); + + if let Some((last_vote, _)) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) { + a_votes.push(last_vote); + let blockstore = Blockstore::open_with_access_type( + &val_a_ledger_path, + AccessType::TryPrimaryThenSecondary, + None, + true, + ) + .unwrap(); + let mut ancestors = AncestorIterator::new(last_vote, &blockstore); + if ancestors.any(|a| votes_on_c_fork.contains(&a)) { + bad_vote_detected = true; + break; + } + } + } + + info!("Observed A's votes on: {:?}", a_votes); + + // an elaborate way of assert!(with_tower && !bad_vote_detected || ...) + let expects_optimistic_confirmation_violation = !with_tower; + if bad_vote_detected != expects_optimistic_confirmation_violation { + if bad_vote_detected { + panic!("No violation expected because of persisted tower!"); + } else { + panic!("Violation expected because of removed persisted tower!"); + } + } else if bad_vote_detected { + info!("THIS TEST expected violations. And indeed, there was some, because of removed persisted tower."); + } else { + info!("THIS TEST expected no violation. And indeed, there was none, thanks to persisted tower."); + } +} diff --git a/local-cluster/tests/local_cluster_slow.rs b/local-cluster/tests/local_cluster_slow.rs new file mode 100644 index 0000000000..1c183a06fc --- /dev/null +++ b/local-cluster/tests/local_cluster_slow.rs @@ -0,0 +1,919 @@ +//! If a test takes over 100s to run on CI, move it here so that it's clear where the +//! biggest improvements to CI times can be found. +#![allow(clippy::integer_arithmetic)] +use { + common::{ + copy_blocks, create_custom_leader_schedule, last_vote_in_tower, ms_for_n_slots, + open_blockstore, restore_tower, run_cluster_partition, run_kill_partition_switch_threshold, + test_faulty_node, wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER, + }, + log::*, + serial_test::serial, + solana_core::{ + broadcast_stage::{ + broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType, + }, + consensus::SWITCH_FORK_THRESHOLD, + replay_stage::DUPLICATE_THRESHOLD, + validator::ValidatorConfig, + }, + solana_gossip::{ + crds::Cursor, + gossip_service::{self, discover_cluster}, + }, + solana_ledger::ancestor_iterator::AncestorIterator, + solana_local_cluster::{ + cluster::{Cluster, ClusterValidatorInfo}, + cluster_tests, + local_cluster::{ClusterConfig, LocalCluster}, + validator_configs::*, + }, + solana_sdk::{ + clock::{Slot, MAX_PROCESSING_AGE}, + hash::Hash, + pubkey::Pubkey, + signature::{Keypair, Signer}, + }, + solana_streamer::socket::SocketAddrSpace, + solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction}, + std::{ + collections::{BTreeSet, HashSet}, + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::sleep, + time::Duration, + }, +}; + +mod common; + +#[test] +#[serial] +// Steps in this test: +// We want to create a situation like: +/* + 1 (2%, killed and restarted) --- 200 (37%, lighter fork) + / + 0 + \-------- 4 (38%, heavier fork) +*/ +// where the 2% that voted on slot 1 don't see their votes land in a block +// due to blockhash expiration, and thus without resigning their votes with +// a newer blockhash, will deem slot 4 the heavier fork and try to switch to +// slot 4, which doesn't pass the switch threshold. This stalls the network. + +// We do this by: +// 1) Creating a partition so all three nodes don't see each other +// 2) Kill the validator with 2% +// 3) Wait for longer than blockhash expiration +// 4) Copy in the lighter fork's blocks up, *only* up to the first slot in the lighter fork +// (not all the blocks on the lighter fork!), call this slot `L` +// 5) Restart the validator with 2% so that he votes on `L`, but the vote doesn't land +// due to blockhash expiration +// 6) Resolve the partition so that the 2% repairs the other fork, and tries to switch, +// stalling the network. + +fn test_fork_choice_refresh_old_votes() { + solana_logger::setup_with_default(RUST_LOG_FILTER); + let max_switch_threshold_failure_pct = 1.0 - 2.0 * SWITCH_FORK_THRESHOLD; + let total_stake = 100; + let max_failures_stake = (max_switch_threshold_failure_pct * total_stake as f64) as u64; + + // 1% less than the failure stake, where the 2% is allocated to a validator that + // has no leader slots and thus won't be able to vote on its own fork. + let failures_stake = max_failures_stake; + let total_alive_stake = total_stake - failures_stake; + let alive_stake_1 = total_alive_stake / 2 - 1; + let alive_stake_2 = total_alive_stake - alive_stake_1 - 1; + + // Heavier fork still doesn't have enough stake to switch. Both branches need + // the vote to land from the validator with `alive_stake_3` to allow the other + // fork to switch. + let alive_stake_3 = 2; + assert!(alive_stake_1 < alive_stake_2); + assert!(alive_stake_1 + alive_stake_3 > alive_stake_2); + + let partitions: &[&[(usize, usize)]] = &[ + &[(alive_stake_1 as usize, 8)], + &[(alive_stake_2 as usize, 8)], + &[(alive_stake_3 as usize, 0)], + ]; + + #[derive(Default)] + struct PartitionContext { + alive_stake3_info: Option, + smallest_validator_key: Pubkey, + lighter_fork_validator_key: Pubkey, + heaviest_validator_key: Pubkey, + } + let on_partition_start = |cluster: &mut LocalCluster, + validator_keys: &[Pubkey], + _: Vec, + context: &mut PartitionContext| { + // Kill validator with alive_stake_3, second in `partitions` slice + let smallest_validator_key = &validator_keys[3]; + let info = cluster.exit_node(smallest_validator_key); + context.alive_stake3_info = Some(info); + context.smallest_validator_key = *smallest_validator_key; + // validator_keys[0] is the validator that will be killed, i.e. the validator with + // stake == `failures_stake` + context.lighter_fork_validator_key = validator_keys[1]; + // Third in `partitions` slice + context.heaviest_validator_key = validator_keys[2]; + }; + + let ticks_per_slot = 8; + let on_before_partition_resolved = + |cluster: &mut LocalCluster, context: &mut PartitionContext| { + // Equal to ms_per_slot * MAX_PROCESSING_AGE, rounded up + let sleep_time_ms = ms_for_n_slots(MAX_PROCESSING_AGE as u64, ticks_per_slot); + info!("Wait for blockhashes to expire, {} ms", sleep_time_ms); + + // Wait for blockhashes to expire + sleep(Duration::from_millis(sleep_time_ms)); + + let smallest_ledger_path = context + .alive_stake3_info + .as_ref() + .unwrap() + .info + .ledger_path + .clone(); + let lighter_fork_ledger_path = cluster.ledger_path(&context.lighter_fork_validator_key); + let heaviest_ledger_path = cluster.ledger_path(&context.heaviest_validator_key); + + // Get latest votes. We make sure to wait until the vote has landed in + // blockstore. This is important because if we were the leader for the block there + // is a possibility of voting before broadcast has inserted in blockstore. + let lighter_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger( + &lighter_fork_ledger_path, + &context.lighter_fork_validator_key, + ); + let heaviest_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger( + &heaviest_ledger_path, + &context.heaviest_validator_key, + ); + + // Open ledgers + let smallest_blockstore = open_blockstore(&smallest_ledger_path); + let lighter_fork_blockstore = open_blockstore(&lighter_fork_ledger_path); + let heaviest_blockstore = open_blockstore(&heaviest_ledger_path); + + info!("Opened blockstores"); + + // Find the first slot on the smaller fork + let lighter_ancestors: BTreeSet = std::iter::once(lighter_fork_latest_vote) + .chain(AncestorIterator::new( + lighter_fork_latest_vote, + &lighter_fork_blockstore, + )) + .collect(); + let heavier_ancestors: BTreeSet = std::iter::once(heaviest_fork_latest_vote) + .chain(AncestorIterator::new( + heaviest_fork_latest_vote, + &heaviest_blockstore, + )) + .collect(); + let first_slot_in_lighter_partition = *lighter_ancestors + .iter() + .zip(heavier_ancestors.iter()) + .find(|(x, y)| x != y) + .unwrap() + .0; + + // Must have been updated in the above loop + assert!(first_slot_in_lighter_partition != 0); + info!( + "First slot in lighter partition is {}", + first_slot_in_lighter_partition + ); + + // Copy all the blocks from the smaller partition up to `first_slot_in_lighter_partition` + // into the smallest validator's blockstore + copy_blocks( + first_slot_in_lighter_partition, + &lighter_fork_blockstore, + &smallest_blockstore, + ); + + // Restart the smallest validator that we killed earlier in `on_partition_start()` + drop(smallest_blockstore); + cluster.restart_node( + &context.smallest_validator_key, + context.alive_stake3_info.take().unwrap(), + SocketAddrSpace::Unspecified, + ); + + loop { + // Wait for node to vote on the first slot on the less heavy fork, so it'll need + // a switch proof to flip to the other fork. + // However, this vote won't land because it's using an expired blockhash. The + // fork structure will look something like this after the vote: + /* + 1 (2%, killed and restarted) --- 200 (37%, lighter fork) + / + 0 + \-------- 4 (38%, heavier fork) + */ + if let Some((last_vote_slot, _last_vote_hash)) = + last_vote_in_tower(&smallest_ledger_path, &context.smallest_validator_key) + { + // Check that the heaviest validator on the other fork doesn't have this slot, + // this must mean we voted on a unique slot on this fork + if last_vote_slot == first_slot_in_lighter_partition { + info!( + "Saw vote on first slot in lighter partition {}", + first_slot_in_lighter_partition + ); + break; + } else { + info!( + "Haven't seen vote on first slot in lighter partition, latest vote is: {}", + last_vote_slot + ); + } + } + + sleep(Duration::from_millis(20)); + } + + // Now resolve partition, allow validator to see the fork with the heavier validator, + // but the fork it's currently on is the heaviest, if only its own vote landed! + }; + + // Check that new roots were set after the partition resolves (gives time + // for lockouts built during partition to resolve and gives validators an opportunity + // to try and switch forks) + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut PartitionContext| { + cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified); + }; + + run_kill_partition_switch_threshold( + &[&[(failures_stake as usize - 1, 16)]], + partitions, + // Partition long enough such that the first vote made by validator with + // `alive_stake_3` won't be ingested due to BlockhashTooOld, + None, + Some(ticks_per_slot), + PartitionContext::default(), + on_partition_start, + on_before_partition_resolved, + on_partition_resolved, + ); +} + +#[test] +#[serial] +fn test_kill_heaviest_partition() { + // This test: + // 1) Spins up four partitions, the heaviest being the first with more stake + // 2) Schedules the other validators for sufficient slots in the schedule + // so that they will still be locked out of voting for the major partition + // when the partition resolves + // 3) Kills the most staked partition. Validators are locked out, but should all + // eventually choose the major partition + // 4) Check for recovery + let num_slots_per_validator = 8; + let partitions: [Vec; 4] = [vec![11], vec![10], vec![10], vec![10]]; + let (leader_schedule, validator_keys) = create_custom_leader_schedule(&[ + num_slots_per_validator * (partitions.len() - 1), + num_slots_per_validator, + num_slots_per_validator, + num_slots_per_validator, + ]); + + let empty = |_: &mut LocalCluster, _: &mut ()| {}; + let validator_to_kill = validator_keys[0].pubkey(); + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { + info!("Killing validator with id: {}", validator_to_kill); + cluster.exit_node(&validator_to_kill); + cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified); + }; + run_cluster_partition( + &partitions, + Some((leader_schedule, validator_keys)), + (), + empty, + empty, + on_partition_resolved, + None, + None, + vec![], + ) +} + +#[test] +#[serial] +fn test_kill_partition_switch_threshold_no_progress() { + let max_switch_threshold_failure_pct = 1.0 - 2.0 * SWITCH_FORK_THRESHOLD; + let total_stake = 10_000; + let max_failures_stake = (max_switch_threshold_failure_pct * total_stake as f64) as u64; + + let failures_stake = max_failures_stake; + let total_alive_stake = total_stake - failures_stake; + let alive_stake_1 = total_alive_stake / 2; + let alive_stake_2 = total_alive_stake - alive_stake_1; + + // Check that no new roots were set 400 slots after partition resolves (gives time + // for lockouts built during partition to resolve and gives validators an opportunity + // to try and switch forks) + let on_partition_start = + |_: &mut LocalCluster, _: &[Pubkey], _: Vec, _: &mut ()| {}; + let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut ()| {}; + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { + cluster.check_no_new_roots(400, "PARTITION_TEST", SocketAddrSpace::Unspecified); + }; + + // This kills `max_failures_stake`, so no progress should be made + run_kill_partition_switch_threshold( + &[&[(failures_stake as usize, 16)]], + &[ + &[(alive_stake_1 as usize, 8)], + &[(alive_stake_2 as usize, 8)], + ], + None, + None, + (), + on_partition_start, + on_before_partition_resolved, + on_partition_resolved, + ); +} + +#[test] +#[serial] +fn test_kill_partition_switch_threshold_progress() { + let max_switch_threshold_failure_pct = 1.0 - 2.0 * SWITCH_FORK_THRESHOLD; + let total_stake = 10_000; + + // Kill `< max_failures_stake` of the validators + let max_failures_stake = (max_switch_threshold_failure_pct * total_stake as f64) as u64; + let failures_stake = max_failures_stake - 1; + let total_alive_stake = total_stake - failures_stake; + + // Partition the remaining alive validators, should still make progress + // once the partition resolves + let alive_stake_1 = total_alive_stake / 2; + let alive_stake_2 = total_alive_stake - alive_stake_1; + let bigger = std::cmp::max(alive_stake_1, alive_stake_2); + let smaller = std::cmp::min(alive_stake_1, alive_stake_2); + + // At least one of the forks must have > SWITCH_FORK_THRESHOLD in order + // to guarantee switching proofs can be created. Make sure the other fork + // is <= SWITCH_FORK_THRESHOLD to make sure progress can be made. Caches + // bugs such as liveness issues bank-weighted fork choice, which may stall + // because the fork with less stake could have more weight, but other fork would: + // 1) Not be able to generate a switching proof + // 2) Other more staked fork stops voting, so doesn't catch up in bank weight. + assert!( + bigger as f64 / total_stake as f64 > SWITCH_FORK_THRESHOLD + && smaller as f64 / total_stake as f64 <= SWITCH_FORK_THRESHOLD + ); + + let on_partition_start = + |_: &mut LocalCluster, _: &[Pubkey], _: Vec, _: &mut ()| {}; + let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut ()| {}; + let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { + cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified); + }; + run_kill_partition_switch_threshold( + &[&[(failures_stake as usize, 16)]], + &[ + &[(alive_stake_1 as usize, 8)], + &[(alive_stake_2 as usize, 8)], + ], + None, + None, + (), + on_partition_start, + on_before_partition_resolved, + on_partition_resolved, + ); +} + +#[test] +#[serial] +#[allow(unused_attributes)] +fn test_duplicate_shreds_broadcast_leader() { + // Create 4 nodes: + // 1) Bad leader sending different versions of shreds to both of the other nodes + // 2) 1 node who's voting behavior in gossip + // 3) 1 validator gets the same version as the leader, will see duplicate confirmation + // 4) 1 validator will not get the same version as the leader. For each of these + // duplicate slots `S` either: + // a) The leader's version of `S` gets > DUPLICATE_THRESHOLD of votes in gossip and so this + // node will repair that correct version + // b) A descendant `D` of some version of `S` gets > DUPLICATE_THRESHOLD votes in gossip, + // but no version of `S` does. Then the node will not know to repair the right version + // by just looking at gossip, but will instead have to use EpochSlots repair after + // detecting that a descendant does not chain to its version of `S`, and marks that descendant + // dead. + // Scenarios a) or b) are triggered by our node in 2) who's voting behavior we control. + + // Critical that bad_leader_stake + good_node_stake < DUPLICATE_THRESHOLD and that + // bad_leader_stake + good_node_stake + our_node_stake > DUPLICATE_THRESHOLD so that + // our vote is the determining factor + let bad_leader_stake = 10000000000; + // Ensure that the good_node_stake is always on the critical path, and the partition node + // should never be on the critical path. This way, none of the bad shreds sent to the partition + // node corrupt the good node. + let good_node_stake = 500000; + let our_node_stake = 10000000000; + let partition_node_stake = 1; + + let node_stakes = vec![ + bad_leader_stake, + partition_node_stake, + good_node_stake, + // Needs to be last in the vector, so that we can + // find the id of this node. See call to `test_faulty_node` + // below for more details. + our_node_stake, + ]; + assert_eq!(*node_stakes.last().unwrap(), our_node_stake); + let total_stake: u64 = node_stakes.iter().sum(); + + assert!( + ((bad_leader_stake + good_node_stake) as f64 / total_stake as f64) < DUPLICATE_THRESHOLD + ); + assert!( + (bad_leader_stake + good_node_stake + our_node_stake) as f64 / total_stake as f64 + > DUPLICATE_THRESHOLD + ); + + // Important that the partition node stake is the smallest so that it gets selected + // for the partition. + assert!(partition_node_stake < our_node_stake && partition_node_stake < good_node_stake); + + // 1) Set up the cluster + let (mut cluster, validator_keys) = test_faulty_node( + BroadcastStageType::BroadcastDuplicates(BroadcastDuplicatesConfig { + stake_partition: partition_node_stake, + }), + node_stakes, + ); + + // This is why it's important our node was last in `node_stakes` + let our_id = validator_keys.last().unwrap().pubkey(); + + // 2) Kill our node and start up a thread to simulate votes to control our voting behavior + let our_info = cluster.exit_node(&our_id); + let node_keypair = our_info.info.keypair; + let vote_keypair = our_info.info.voting_keypair; + let bad_leader_id = cluster.entry_point_info.id; + let bad_leader_ledger_path = cluster.validators[&bad_leader_id].info.ledger_path.clone(); + info!("our node id: {}", node_keypair.pubkey()); + + // 3) Start up a spy to listen for votes + let exit = Arc::new(AtomicBool::new(false)); + let (gossip_service, _tcp_listener, cluster_info) = gossip_service::make_gossip_node( + // Need to use our validator's keypair to gossip EpochSlots and votes for our + // node later. + Keypair::from_bytes(&node_keypair.to_bytes()).unwrap(), + Some(&cluster.entry_point_info.gossip), + &exit, + None, + 0, + false, + SocketAddrSpace::Unspecified, + ); + + let t_voter = { + let exit = exit.clone(); + std::thread::spawn(move || { + let mut cursor = Cursor::default(); + let mut max_vote_slot = 0; + let mut gossip_vote_index = 0; + loop { + if exit.load(Ordering::Relaxed) { + return; + } + + let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor); + let mut parsed_vote_iter: Vec<_> = labels + .into_iter() + .zip(votes.into_iter()) + .filter_map(|(label, leader_vote_tx)| { + // Filter out votes not from the bad leader + if label.pubkey() == bad_leader_id { + let vote = vote_transaction::parse_vote_transaction(&leader_vote_tx) + .map(|(_, vote, _)| vote) + .unwrap(); + // Filter out empty votes + if !vote.slots.is_empty() { + Some((vote, leader_vote_tx)) + } else { + None + } + } else { + None + } + }) + .collect(); + + parsed_vote_iter.sort_by(|(vote, _), (vote2, _)| { + vote.slots.last().unwrap().cmp(vote2.slots.last().unwrap()) + }); + + for (parsed_vote, leader_vote_tx) in &parsed_vote_iter { + if let Some(latest_vote_slot) = parsed_vote.slots.last() { + info!("received vote for {}", latest_vote_slot); + // Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot. + if *latest_vote_slot > max_vote_slot { + let new_epoch_slots: Vec = + (max_vote_slot + 1..latest_vote_slot + 1).collect(); + info!( + "Simulating epoch slots from our node: {:?}", + new_epoch_slots + ); + cluster_info.push_epoch_slots(&new_epoch_slots); + max_vote_slot = *latest_vote_slot; + } + + // Only vote on even slots. Note this may violate lockouts if the + // validator started voting on a different fork before we could exit + // it above. + let vote_hash = parsed_vote.hash; + if latest_vote_slot % 2 == 0 { + info!( + "Simulating vote from our node on slot {}, hash {}", + latest_vote_slot, vote_hash + ); + + // Add all recent vote slots on this fork to allow cluster to pass + // vote threshold checks in replay. Note this will instantly force a + // root by this validator, but we're not concerned with lockout violations + // by this validator so it's fine. + let leader_blockstore = open_blockstore(&bad_leader_ledger_path); + let mut vote_slots: Vec = AncestorIterator::new_inclusive( + *latest_vote_slot, + &leader_blockstore, + ) + .take(MAX_LOCKOUT_HISTORY) + .collect(); + vote_slots.reverse(); + let vote_tx = vote_transaction::new_vote_transaction( + vote_slots, + vote_hash, + leader_vote_tx.message.recent_blockhash, + &node_keypair, + &vote_keypair, + &vote_keypair, + None, + ); + gossip_vote_index += 1; + gossip_vote_index %= MAX_LOCKOUT_HISTORY; + cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8) + } + } + // Give vote some time to propagate + sleep(Duration::from_millis(100)); + } + + if parsed_vote_iter.is_empty() { + sleep(Duration::from_millis(100)); + } + } + }) + }; + + // 4) Check that the cluster is making progress + cluster.check_for_new_roots( + 16, + "test_duplicate_shreds_broadcast_leader", + SocketAddrSpace::Unspecified, + ); + + // Clean up threads + exit.store(true, Ordering::Relaxed); + t_voter.join().unwrap(); + gossip_service.join().unwrap(); +} + +#[test] +#[serial] +fn test_switch_threshold_uses_gossip_votes() { + solana_logger::setup_with_default(RUST_LOG_FILTER); + let total_stake = 100; + + // Minimum stake needed to generate a switching proof + let minimum_switch_stake = (SWITCH_FORK_THRESHOLD as f64 * total_stake as f64) as u64; + + // Make the heavier stake insufficient for switching so tha the lighter validator + // cannot switch without seeing a vote from the dead/failure_stake validator. + let heavier_stake = minimum_switch_stake; + let lighter_stake = heavier_stake - 1; + let failures_stake = total_stake - heavier_stake - lighter_stake; + + let partitions: &[&[(usize, usize)]] = &[ + &[(heavier_stake as usize, 8)], + &[(lighter_stake as usize, 8)], + ]; + + #[derive(Default)] + struct PartitionContext { + heaviest_validator_key: Pubkey, + lighter_validator_key: Pubkey, + dead_validator_info: Option, + } + + let on_partition_start = |_cluster: &mut LocalCluster, + validator_keys: &[Pubkey], + mut dead_validator_infos: Vec, + context: &mut PartitionContext| { + assert_eq!(dead_validator_infos.len(), 1); + context.dead_validator_info = Some(dead_validator_infos.pop().unwrap()); + // validator_keys[0] is the validator that will be killed, i.e. the validator with + // stake == `failures_stake` + context.heaviest_validator_key = validator_keys[1]; + context.lighter_validator_key = validator_keys[2]; + }; + + let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut PartitionContext| {}; + + // Check that new roots were set after the partition resolves (gives time + // for lockouts built during partition to resolve and gives validators an opportunity + // to try and switch forks) + let on_partition_resolved = |cluster: &mut LocalCluster, context: &mut PartitionContext| { + let lighter_validator_ledger_path = cluster.ledger_path(&context.lighter_validator_key); + let heavier_validator_ledger_path = cluster.ledger_path(&context.heaviest_validator_key); + + let (lighter_validator_latest_vote, _) = last_vote_in_tower( + &lighter_validator_ledger_path, + &context.lighter_validator_key, + ) + .unwrap(); + + info!( + "Lighter validator's latest vote is for slot {}", + lighter_validator_latest_vote + ); + + // Lighter partition should stop voting after detecting the heavier partition and try + // to switch. Loop until we see a greater vote by the heavier validator than the last + // vote made by the lighter validator on the lighter fork. + let mut heavier_validator_latest_vote; + let mut heavier_validator_latest_vote_hash; + let heavier_blockstore = open_blockstore(&heavier_validator_ledger_path); + loop { + let (sanity_check_lighter_validator_latest_vote, _) = last_vote_in_tower( + &lighter_validator_ledger_path, + &context.lighter_validator_key, + ) + .unwrap(); + + // Lighter validator should stop voting, because `on_partition_resolved` is only + // called after a propagation time where blocks from the other fork should have + // finished propagating + assert_eq!( + sanity_check_lighter_validator_latest_vote, + lighter_validator_latest_vote + ); + + let (new_heavier_validator_latest_vote, new_heavier_validator_latest_vote_hash) = + last_vote_in_tower( + &heavier_validator_ledger_path, + &context.heaviest_validator_key, + ) + .unwrap(); + + heavier_validator_latest_vote = new_heavier_validator_latest_vote; + heavier_validator_latest_vote_hash = new_heavier_validator_latest_vote_hash; + + // Latest vote for each validator should be on different forks + assert_ne!(lighter_validator_latest_vote, heavier_validator_latest_vote); + if heavier_validator_latest_vote > lighter_validator_latest_vote { + let heavier_ancestors: HashSet = + AncestorIterator::new(heavier_validator_latest_vote, &heavier_blockstore) + .collect(); + assert!(!heavier_ancestors.contains(&lighter_validator_latest_vote)); + break; + } + } + + info!("Checking to make sure lighter validator doesn't switch"); + let mut latest_slot = lighter_validator_latest_vote; + + // Number of chances the validator had to switch votes but didn't + let mut total_voting_opportunities = 0; + while total_voting_opportunities <= 5 { + let (new_latest_slot, latest_slot_ancestors) = + find_latest_replayed_slot_from_ledger(&lighter_validator_ledger_path, latest_slot); + latest_slot = new_latest_slot; + // Ensure `latest_slot` is on the other fork + if latest_slot_ancestors.contains(&heavier_validator_latest_vote) { + let tower = restore_tower( + &lighter_validator_ledger_path, + &context.lighter_validator_key, + ) + .unwrap(); + // Check that there was an opportunity to vote + if !tower.is_locked_out(latest_slot, &latest_slot_ancestors) { + // Ensure the lighter blockstore has not voted again + let new_lighter_validator_latest_vote = tower.last_voted_slot().unwrap(); + assert_eq!( + new_lighter_validator_latest_vote, + lighter_validator_latest_vote + ); + info!( + "Incrementing voting opportunities: {}", + total_voting_opportunities + ); + total_voting_opportunities += 1; + } else { + info!( + "Tower still locked out, can't vote for slot: {}", + latest_slot + ); + } + } else if latest_slot > heavier_validator_latest_vote { + warn!( + "validator is still generating blocks on its own fork, last processed slot: {}", + latest_slot + ); + } + sleep(Duration::from_millis(50)); + } + + // Make a vote from the killed validator for slot `heavier_validator_latest_vote` in gossip + info!( + "Simulate vote for slot: {} from dead validator", + heavier_validator_latest_vote + ); + let vote_keypair = &context + .dead_validator_info + .as_ref() + .unwrap() + .info + .voting_keypair + .clone(); + let node_keypair = &context + .dead_validator_info + .as_ref() + .unwrap() + .info + .keypair + .clone(); + + cluster_tests::submit_vote_to_cluster_gossip( + node_keypair, + vote_keypair, + heavier_validator_latest_vote, + heavier_validator_latest_vote_hash, + // Make the vote transaction with a random blockhash. Thus, the vote only lives in gossip but + // never makes it into a block + Hash::new_unique(), + cluster + .get_contact_info(&context.heaviest_validator_key) + .unwrap() + .gossip, + &SocketAddrSpace::Unspecified, + ) + .unwrap(); + + loop { + // Wait for the lighter validator to switch to the heavier fork + let (new_lighter_validator_latest_vote, _) = last_vote_in_tower( + &lighter_validator_ledger_path, + &context.lighter_validator_key, + ) + .unwrap(); + + if new_lighter_validator_latest_vote != lighter_validator_latest_vote { + info!( + "Lighter validator switched forks at slot: {}", + new_lighter_validator_latest_vote + ); + let (heavier_validator_latest_vote, _) = last_vote_in_tower( + &heavier_validator_ledger_path, + &context.heaviest_validator_key, + ) + .unwrap(); + let (smaller, larger) = + if new_lighter_validator_latest_vote > heavier_validator_latest_vote { + ( + heavier_validator_latest_vote, + new_lighter_validator_latest_vote, + ) + } else { + ( + new_lighter_validator_latest_vote, + heavier_validator_latest_vote, + ) + }; + + // Check the new vote is on the same fork as the heaviest fork + let heavier_blockstore = open_blockstore(&heavier_validator_ledger_path); + let larger_slot_ancestors: HashSet = + AncestorIterator::new(larger, &heavier_blockstore) + .chain(std::iter::once(larger)) + .collect(); + assert!(larger_slot_ancestors.contains(&smaller)); + break; + } else { + sleep(Duration::from_millis(50)); + } + } + }; + + let ticks_per_slot = 8; + run_kill_partition_switch_threshold( + &[&[(failures_stake as usize, 0)]], + partitions, + // Partition long enough such that the first vote made by validator with + // `alive_stake_3` won't be ingested due to BlockhashTooOld, + None, + Some(ticks_per_slot), + PartitionContext::default(), + on_partition_start, + on_before_partition_resolved, + on_partition_resolved, + ); +} + +#[test] +#[serial] +fn test_listener_startup() { + let mut config = ClusterConfig { + node_stakes: vec![100; 1], + cluster_lamports: 1_000, + num_listeners: 3, + validator_configs: make_identical_validator_configs(&ValidatorConfig::default(), 1), + ..ClusterConfig::default() + }; + let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + let cluster_nodes = discover_cluster( + &cluster.entry_point_info.gossip, + 4, + SocketAddrSpace::Unspecified, + ) + .unwrap(); + assert_eq!(cluster_nodes.len(), 4); +} + +fn find_latest_replayed_slot_from_ledger( + ledger_path: &Path, + mut latest_slot: Slot, +) -> (Slot, HashSet) { + loop { + let mut blockstore = open_blockstore(ledger_path); + // This is kind of a hack because we can't query for new frozen blocks over RPC + // since the validator is not voting. + let new_latest_slots: Vec = blockstore + .slot_meta_iterator(latest_slot) + .unwrap() + .filter_map(|(s, _)| if s > latest_slot { Some(s) } else { None }) + .collect(); + + for new_latest_slot in new_latest_slots { + latest_slot = new_latest_slot; + info!("Checking latest_slot {}", latest_slot); + // Wait for the slot to be fully received by the validator + let entries; + loop { + info!("Waiting for slot {} to be full", latest_slot); + if blockstore.is_full(latest_slot) { + entries = blockstore.get_slot_entries(latest_slot, 0).unwrap(); + assert!(!entries.is_empty()); + break; + } else { + sleep(Duration::from_millis(50)); + blockstore = open_blockstore(ledger_path); + } + } + // Check the slot has been replayed + let non_tick_entry = entries.into_iter().find(|e| !e.transactions.is_empty()); + if let Some(non_tick_entry) = non_tick_entry { + // Wait for the slot to be replayed + loop { + info!("Waiting for slot {} to be replayed", latest_slot); + if !blockstore + .map_transactions_to_statuses( + latest_slot, + non_tick_entry.transactions.clone().into_iter(), + ) + .unwrap() + .is_empty() + { + return ( + latest_slot, + AncestorIterator::new(latest_slot, &blockstore).collect(), + ); + } else { + sleep(Duration::from_millis(50)); + blockstore = open_blockstore(ledger_path); + } + } + } else { + info!( + "No transactions in slot {}, can't tell if it was replayed", + latest_slot + ); + } + } + sleep(Duration::from_millis(50)); + } +}