Ingest votes from gossip into fork choice (#16560)

This commit is contained in:
carllin
2021-04-21 14:40:35 -07:00
committed by GitHub
parent 63957f0677
commit 4c94f8933f
10 changed files with 1156 additions and 336 deletions

View File

@ -28,7 +28,7 @@ use solana_ledger::{
leader_schedule::LeaderSchedule,
};
use solana_local_cluster::{
cluster::Cluster,
cluster::{Cluster, ClusterValidatorInfo},
cluster_tests,
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::*,
@ -40,7 +40,7 @@ use solana_runtime::{
use solana_sdk::{
account::AccountSharedData,
client::{AsyncClient, SyncClient},
clock::{self, Slot},
clock::{self, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES},
commitment_config::CommitmentConfig,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
genesis_config::ClusterType,
@ -284,16 +284,17 @@ fn test_leader_failure_4() {
/// * `leader_schedule` - An option that specifies whether the cluster should
/// run with a fixed, predetermined leader schedule
#[allow(clippy::cognitive_complexity)]
fn run_cluster_partition<E, F>(
partitions: &[&[usize]],
fn run_cluster_partition<C>(
partitions: &[Vec<usize>],
leader_schedule: Option<(LeaderSchedule, Vec<Arc<Keypair>>)>,
on_partition_start: E,
on_partition_resolved: F,
mut context: C,
on_partition_start: impl FnOnce(&mut LocalCluster, &mut C),
on_before_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C),
on_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C),
partition_duration: Option<u64>,
ticks_per_slot: Option<u64>,
additional_accounts: Vec<(Pubkey, AccountSharedData)>,
) where
E: FnOnce(&mut LocalCluster),
F: FnOnce(&mut LocalCluster),
{
) {
solana_logger::setup_with_default(RUST_LOG_FILTER);
info!("PARTITION_TEST!");
let num_nodes = partitions.len();
@ -351,6 +352,7 @@ fn run_cluster_partition<E, F>(
stakers_slot_offset: slots_per_epoch,
skip_warmup_slots: true,
additional_accounts,
ticks_per_slot: ticks_per_slot.unwrap_or(DEFAULT_TICKS_PER_SLOT),
..ClusterConfig::default()
};
@ -379,11 +381,14 @@ fn run_cluster_partition<E, F>(
}
info!("PARTITION_TEST start partition");
on_partition_start(&mut cluster, &mut context);
enable_partition.store(false, Ordering::Relaxed);
on_partition_start(&mut cluster);
sleep(Duration::from_millis(leader_schedule_time));
sleep(Duration::from_millis(
partition_duration.unwrap_or(leader_schedule_time),
));
on_before_partition_resolved(&mut cluster, &mut context);
info!("PARTITION_TEST remove partition");
enable_partition.store(true, Ordering::Relaxed);
@ -402,7 +407,7 @@ fn run_cluster_partition<E, F>(
);
sleep(Duration::from_millis(propagation_time));
info!("PARTITION_TEST resuming normal operation");
on_partition_resolved(&mut cluster);
on_partition_resolved(&mut cluster, &mut context);
}
#[allow(unused_attributes)]
@ -410,57 +415,72 @@ fn run_cluster_partition<E, F>(
#[test]
#[serial]
fn test_cluster_partition_1_2() {
let empty = |_: &mut LocalCluster| {};
let on_partition_resolved = |cluster: &mut LocalCluster| {
let empty = |_: &mut LocalCluster, _: &mut ()| {};
let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| {
cluster.check_for_new_roots(16, &"PARTITION_TEST");
};
run_cluster_partition(&[&[1], &[1, 1]], None, empty, on_partition_resolved, vec![])
run_cluster_partition(
&[vec![1], vec![1, 1]],
None,
(),
empty,
empty,
on_partition_resolved,
None,
None,
vec![],
)
}
#[test]
#[serial]
fn test_cluster_partition_1_1() {
let empty = |_: &mut LocalCluster| {};
let on_partition_resolved = |cluster: &mut LocalCluster| {
let empty = |_: &mut LocalCluster, _: &mut ()| {};
let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| {
cluster.check_for_new_roots(16, &"PARTITION_TEST");
};
run_cluster_partition(&[&[1], &[1]], None, empty, on_partition_resolved, vec![])
run_cluster_partition(
&[vec![1], vec![1]],
None,
(),
empty,
empty,
on_partition_resolved,
None,
None,
vec![],
)
}
#[test]
#[serial]
fn test_cluster_partition_1_1_1() {
let empty = |_: &mut LocalCluster| {};
let on_partition_resolved = |cluster: &mut LocalCluster| {
let empty = |_: &mut LocalCluster, _: &mut ()| {};
let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| {
cluster.check_for_new_roots(16, &"PARTITION_TEST");
};
run_cluster_partition(
&[&[1], &[1], &[1]],
&[vec![1], vec![1], vec![1]],
None,
(),
empty,
empty,
on_partition_resolved,
None,
None,
vec![],
)
}
fn create_custom_leader_schedule(
num_validators: usize,
num_slots_per_validator: usize,
validator_num_slots: &[usize],
) -> (LeaderSchedule, Vec<Arc<Keypair>>) {
let mut leader_schedule = vec![];
let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new()))
.take(num_validators)
.take(validator_num_slots.len())
.collect();
for (i, k) in validator_keys.iter().enumerate() {
let num_slots = {
if i == 0 {
// Set up the leader to have 50% of the slots
num_slots_per_validator * (num_validators - 1)
} else {
num_slots_per_validator
}
};
for _ in 0..num_slots {
for (k, num_slots) in validator_keys.iter().zip(validator_num_slots.iter()) {
for _ in 0..*num_slots {
leader_schedule.push(k.pubkey())
}
}
@ -484,13 +504,17 @@ fn test_kill_heaviest_partition() {
// eventually choose the major partition
// 4) Check for recovery
let num_slots_per_validator = 8;
let partitions: [&[usize]; 4] = [&[11], &[10], &[10], &[10]];
let (leader_schedule, validator_keys) =
create_custom_leader_schedule(partitions.len(), num_slots_per_validator);
let partitions: [Vec<usize>; 4] = [vec![11], vec![10], vec![10], vec![10]];
let (leader_schedule, validator_keys) = create_custom_leader_schedule(&[
num_slots_per_validator * (partitions.len() - 1),
num_slots_per_validator,
num_slots_per_validator,
num_slots_per_validator,
]);
let empty = |_: &mut LocalCluster| {};
let empty = |_: &mut LocalCluster, _: &mut ()| {};
let validator_to_kill = validator_keys[0].pubkey();
let on_partition_resolved = |cluster: &mut LocalCluster| {
let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| {
info!("Killing validator with id: {}", validator_to_kill);
cluster.exit_node(&validator_to_kill);
cluster.check_for_new_roots(16, &"PARTITION_TEST");
@ -498,52 +522,80 @@ fn test_kill_heaviest_partition() {
run_cluster_partition(
&partitions,
Some((leader_schedule, validator_keys)),
(),
empty,
empty,
on_partition_resolved,
None,
None,
vec![],
)
}
#[allow(clippy::assertions_on_constants)]
fn run_kill_partition_switch_threshold<F>(
failures_stake: u64,
alive_stake_1: u64,
alive_stake_2: u64,
on_partition_resolved: F,
) where
F: Fn(&mut LocalCluster),
{
fn run_kill_partition_switch_threshold<C>(
stakes_to_kill: &[&[(usize, usize)]],
alive_stakes: &[&[(usize, usize)]],
partition_duration: Option<u64>,
ticks_per_slot: Option<u64>,
partition_context: C,
on_partition_start: impl Fn(&mut LocalCluster, &[Pubkey], &mut C),
on_before_partition_resolved: impl Fn(&mut LocalCluster, &mut C),
on_partition_resolved: impl Fn(&mut LocalCluster, &mut C),
) {
// Needs to be at least 1/3 or there will be no overlap
// with the confirmation supermajority 2/3
assert!(SWITCH_FORK_THRESHOLD >= 1f64 / 3f64);
info!(
"stakes: {} {} {}",
failures_stake, alive_stake_1, alive_stake_2
"stakes_to_kill: {:?}, alive_stakes: {:?}",
stakes_to_kill, alive_stakes
);
// This test:
// 1) Spins up three partitions
// 2) Kills the first partition with the stake `failures_stake`
// 5) runs `on_partition_resolved`
let num_slots_per_validator = 8;
let partitions: [&[usize]; 3] = [
&[(failures_stake as usize)],
&[(alive_stake_1 as usize)],
&[(alive_stake_2 as usize)],
];
let (leader_schedule, validator_keys) =
create_custom_leader_schedule(partitions.len(), num_slots_per_validator);
let partitions: Vec<&[(usize, usize)]> = stakes_to_kill
.iter()
.cloned()
.chain(alive_stakes.iter().cloned())
.collect();
let validator_to_kill = validator_keys[0].pubkey();
let on_partition_start = |cluster: &mut LocalCluster| {
info!("Killing validator with id: {}", validator_to_kill);
cluster.exit_node(&validator_to_kill);
let stake_partitions: Vec<Vec<usize>> = partitions
.iter()
.map(|stakes_and_slots| stakes_and_slots.iter().map(|(stake, _)| *stake).collect())
.collect();
let num_slots_per_validator: Vec<usize> = partitions
.iter()
.flat_map(|stakes_and_slots| stakes_and_slots.iter().map(|(_, num_slots)| *num_slots))
.collect();
let (leader_schedule, validator_keys) = create_custom_leader_schedule(&num_slots_per_validator);
info!(
"Validator ids: {:?}",
validator_keys
.iter()
.map(|k| k.pubkey())
.collect::<Vec<_>>()
);
let validator_pubkeys: Vec<Pubkey> = validator_keys.iter().map(|k| k.pubkey()).collect();
let on_partition_start = |cluster: &mut LocalCluster, partition_context: &mut C| {
for validator_to_kill in &validator_pubkeys[0..stakes_to_kill.len()] {
info!("Killing validator with id: {}", validator_to_kill);
cluster.exit_node(&validator_to_kill);
}
on_partition_start(cluster, &validator_pubkeys, partition_context);
};
run_cluster_partition(
&partitions,
&stake_partitions,
Some((leader_schedule, validator_keys)),
partition_context,
on_partition_start,
on_before_partition_resolved,
on_partition_resolved,
partition_duration,
ticks_per_slot,
vec![],
)
}
@ -563,15 +615,24 @@ fn test_kill_partition_switch_threshold_no_progress() {
// Check that no new roots were set 400 slots after partition resolves (gives time
// for lockouts built during partition to resolve and gives validators an opportunity
// to try and switch forks)
let on_partition_resolved = |cluster: &mut LocalCluster| {
let on_partition_start = |_: &mut LocalCluster, _: &[Pubkey], _: &mut ()| {};
let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut ()| {};
let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| {
cluster.check_no_new_roots(400, &"PARTITION_TEST");
};
// This kills `max_failures_stake`, so no progress should be made
run_kill_partition_switch_threshold(
failures_stake,
alive_stake_1,
alive_stake_2,
&[&[(failures_stake as usize, 16)]],
&[
&[(alive_stake_1 as usize, 8)],
&[(alive_stake_2 as usize, 8)],
],
None,
None,
(),
on_partition_start,
on_before_partition_resolved,
on_partition_resolved,
);
}
@ -606,13 +667,247 @@ fn test_kill_partition_switch_threshold_progress() {
&& smaller as f64 / total_stake as f64 <= SWITCH_FORK_THRESHOLD
);
let on_partition_resolved = |cluster: &mut LocalCluster| {
let on_partition_start = |_: &mut LocalCluster, _: &[Pubkey], _: &mut ()| {};
let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut ()| {};
let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| {
cluster.check_for_new_roots(16, &"PARTITION_TEST");
};
run_kill_partition_switch_threshold(
failures_stake,
alive_stake_1,
alive_stake_2,
&[&[(failures_stake as usize, 16)]],
&[
&[(alive_stake_1 as usize, 8)],
&[(alive_stake_2 as usize, 8)],
],
None,
None,
(),
on_partition_start,
on_before_partition_resolved,
on_partition_resolved,
);
}
#[test]
#[serial]
#[ignore]
// Steps in this test:
// We want to create a situation like:
/*
1 (2%, killed and restarted) --- 200 (37%, lighter fork)
/
0
\-------- 4 (38%, heavier fork)
*/
// where the 2% that voted on slot 1 don't see their votes land in a block
// and thus without integrating votes from gossip into fork choice, will
// deem slot 4 the heavier fork and try to switch to slot 4, which doesn't pass the
// switch threshold. This stalls the network.
// We do this by:
// 1) Creating a partition so all three nodes don't see each other
// 2) Kill the validator with 2%
// 3) Wait for longer than blockhash expiration
// 4) Copy in the lighter fork's blocks up, *only* up to the first slot in the lighter fork
// (not all the blocks on the lighter fork!), call this slot `L`
// 5) Restart the validator with 2% so that he votes on `L`, but the vote doesn't land
// due to blockhash expiration
// 6) Resolve the partition so that the 2% repairs the other fork, and tries to switch,
// stalling the network.
fn test_fork_choice_ingest_votes_from_gossip() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let max_switch_threshold_failure_pct = 1.0 - 2.0 * SWITCH_FORK_THRESHOLD;
let total_stake = 100;
let max_failures_stake = (max_switch_threshold_failure_pct * total_stake as f64) as u64;
// 1% less than the failure stake, where the 2% is allocated to a validator that
// has no leader slots and thus won't be able to vote on its own fork.
let failures_stake = max_failures_stake;
let total_alive_stake = total_stake - failures_stake;
let alive_stake_1 = total_alive_stake / 2 - 1;
let alive_stake_2 = total_alive_stake - alive_stake_1 - 1;
// Heavier fork still doesn't have enough stake to switch. Both branches need
// the vote to land from the validator with `alive_stake_3` to allow the other
// fork to switch.
let alive_stake_3 = 2;
assert!(alive_stake_1 < alive_stake_2);
assert!(alive_stake_1 + alive_stake_3 > alive_stake_2);
let partitions: &[&[(usize, usize)]] = &[
&[(alive_stake_1 as usize, 8)],
&[(alive_stake_2 as usize, 8)],
&[(alive_stake_3 as usize, 0)],
];
#[derive(Default)]
struct PartitionContext {
alive_stake3_info: Option<ClusterValidatorInfo>,
smallest_validator_key: Pubkey,
lighter_fork_validator_key: Pubkey,
heaviest_validator_key: Pubkey,
}
let on_partition_start =
|cluster: &mut LocalCluster, validator_keys: &[Pubkey], context: &mut PartitionContext| {
// Kill validator with alive_stake_3, second in `partitions` slice
let smallest_validator_key = &validator_keys[3];
let info = cluster.exit_node(smallest_validator_key);
context.alive_stake3_info = Some(info);
context.smallest_validator_key = *smallest_validator_key;
context.lighter_fork_validator_key = validator_keys[1];
// Third in `partitions` slice
context.heaviest_validator_key = validator_keys[2];
};
let ticks_per_slot = 8;
let on_before_partition_resolved =
|cluster: &mut LocalCluster, context: &mut PartitionContext| {
// Equal to ms_per_slot * MAX_RECENT_BLOCKHASHES, rounded up
let sleep_time_ms =
((ticks_per_slot * DEFAULT_MS_PER_SLOT * MAX_RECENT_BLOCKHASHES as u64)
+ DEFAULT_TICKS_PER_SLOT
- 1)
/ DEFAULT_TICKS_PER_SLOT;
info!("Wait for blockhashes to expire, {} ms", sleep_time_ms);
// Wait for blockhashes to expire
sleep(Duration::from_millis(sleep_time_ms));
let smallest_ledger_path = context
.alive_stake3_info
.as_ref()
.unwrap()
.info
.ledger_path
.clone();
let lighter_fork_ledger_path = cluster.ledger_path(&context.lighter_fork_validator_key);
let heaviest_ledger_path = cluster.ledger_path(&context.heaviest_validator_key);
// Open ledgers
let smallest_blockstore = open_blockstore(&smallest_ledger_path);
let lighter_fork_blockstore = open_blockstore(&lighter_fork_ledger_path);
let heaviest_blockstore = open_blockstore(&heaviest_ledger_path);
info!("Opened blockstores");
// Find the first slot on the smaller fork
let mut first_slot_in_lighter_partition = 0;
for ((heavier_slot, heavier_slot_meta), (lighter_slot, _lighter_slot_meta)) in
heaviest_blockstore
.slot_meta_iterator(0)
.unwrap()
.zip(lighter_fork_blockstore.slot_meta_iterator(0).unwrap())
{
if heavier_slot != lighter_slot {
// Find the parent of the fork point
let last_common_ancestor = heavier_slot_meta.parent_slot;
let lighter_fork_parent_meta = lighter_fork_blockstore
.meta(last_common_ancestor)
.unwrap()
.unwrap();
// Lighter fork should only see one next slots, since only two validators
// could have generated childrenof `parent`, and the lighter fork *definitely*
// doesn't see the other fork's child, otherwise `heavier_slot != lighter_slot`
// would not have triggere above.
assert_eq!(lighter_fork_parent_meta.next_slots.len(), 1);
let lighter_fork_child = lighter_fork_parent_meta.next_slots[0];
assert_ne!(first_slot_in_lighter_partition, heavier_slot);
first_slot_in_lighter_partition = lighter_fork_child;
info!(
"First slot in lighter partition is {}",
first_slot_in_lighter_partition
);
break;
}
}
assert!(first_slot_in_lighter_partition != 0);
// Copy all the blocks from the smaller partition up to `first_slot_in_lighter_partition`
// into the smallest validator's blockstore
for lighter_slot in std::iter::once(first_slot_in_lighter_partition).chain(
AncestorIterator::new(first_slot_in_lighter_partition, &lighter_fork_blockstore),
) {
let lighter_slot_meta =
lighter_fork_blockstore.meta(lighter_slot).unwrap().unwrap();
assert!(lighter_slot_meta.is_full());
// Get the shreds from the leader of the smaller fork
let lighter_fork_data_shreds = lighter_fork_blockstore
.get_data_shreds_for_slot(lighter_slot, 0)
.unwrap();
// Insert those shreds into the smallest validator's blockstore
smallest_blockstore
.insert_shreds(lighter_fork_data_shreds, None, false)
.unwrap();
// Check insert succeeded
let new_meta = smallest_blockstore.meta(lighter_slot).unwrap().unwrap();
assert!(new_meta.is_full());
assert_eq!(new_meta.last_index, lighter_slot_meta.last_index);
}
// Restart the smallest validator that we killed earlier in `on_partition_start()`
drop(smallest_blockstore);
cluster.restart_node(
&context.smallest_validator_key,
context.alive_stake3_info.take().unwrap(),
);
loop {
// Wait for node to vote on the first slot on the less heavy fork, so it'll need
// a switch proof to flip to the other fork.
// However, this vote won't land because it's using an expired blockhash. The
// fork structure will look something like this after the vote:
/*
1 (2%, killed and restarted) --- 200 (37%, lighter fork)
/
0
\-------- 4 (38%, heavier fork)
*/
if let Some(last_vote) =
last_vote_in_tower(&smallest_ledger_path, &context.smallest_validator_key)
{
// Check that the heaviest validator on the other fork doesn't have this slot,
// this must mean we voted on a unique slot on this fork
if last_vote == first_slot_in_lighter_partition {
info!(
"Saw vote on first slot in lighter partition {}",
first_slot_in_lighter_partition
);
break;
} else {
info!(
"Haven't seen vote on first slot in lighter partition, latest vote is: {}",
last_vote
);
}
}
sleep(Duration::from_millis(20));
}
// Now resolve partition, allow validator to see the fork with the heavier validator,
// but the fork it's currently on is the heaviest, if only its own vote landed!
};
// Check that new roots were set after the partition resolves (gives time
// for lockouts built during partition to resolve and gives validators an opportunity
// to try and switch forks)
let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut PartitionContext| {
cluster.check_for_new_roots(16, &"PARTITION_TEST");
};
run_kill_partition_switch_threshold(
&[&[(failures_stake as usize - 1, 16)]],
partitions,
// Partition long enough such that the first vote made by validator with
// `alive_stake_3` won't be ingested due to BlockhashTooOld,
None,
Some(ticks_per_slot),
PartitionContext::default(),
on_partition_start,
on_before_partition_resolved,
on_partition_resolved,
);
}
@ -1674,7 +1969,7 @@ fn test_validator_saves_tower() {
}
fn open_blockstore(ledger_path: &Path) -> Blockstore {
Blockstore::open_with_access_type(ledger_path, AccessType::PrimaryOnly, None, true)
Blockstore::open_with_access_type(ledger_path, AccessType::TryPrimaryThenSecondary, None, true)
.unwrap_or_else(|e| {
panic!("Failed to open ledger at {:?}, err: {}", ledger_path, e);
})
@ -2196,9 +2491,9 @@ fn test_run_test_load_program_accounts_partition_root() {
fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {
let num_slots_per_validator = 8;
let partitions: [&[usize]; 2] = [&[(1)], &[(1)]];
let partitions: [Vec<usize>; 2] = [vec![1], vec![1]];
let (leader_schedule, validator_keys) =
create_custom_leader_schedule(partitions.len(), num_slots_per_validator);
create_custom_leader_schedule(&[num_slots_per_validator, num_slots_per_validator]);
let (update_client_sender, update_client_receiver) = unbounded();
let (scan_client_sender, scan_client_receiver) = unbounded();
@ -2212,7 +2507,7 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {
scan_client_receiver,
);
let on_partition_start = |cluster: &mut LocalCluster| {
let on_partition_start = |cluster: &mut LocalCluster, _: &mut ()| {
let update_client = cluster
.get_validator_client(&cluster.entry_point_info.id)
.unwrap();
@ -2223,7 +2518,9 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {
scan_client_sender.send(scan_client).unwrap();
};
let on_partition_resolved = |cluster: &mut LocalCluster| {
let on_partition_before_resolved = |_: &mut LocalCluster, _: &mut ()| {};
let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| {
cluster.check_for_new_roots(20, &"run_test_load_program_accounts_partition");
exit.store(true, Ordering::Relaxed);
t_update.join().unwrap();
@ -2233,8 +2530,12 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {
run_cluster_partition(
&partitions,
Some((leader_schedule, validator_keys)),
(),
on_partition_start,
on_partition_before_resolved,
on_partition_resolved,
None,
None,
additional_accounts,
);
}