* Fix fork race condition in optimistic violation tower tests (#19192) * Fix fork race condition in optimistic violation tower tests * clippy * pr comments * Fixup flaky tests (#21617) * Fixup flaky tests * Fixup listeners Co-authored-by: Ashwin Sekar <ashwin@solana.com> Co-authored-by: carllin <carl@solana.com>
This commit is contained in:
@ -273,7 +273,7 @@ impl LocalCluster {
|
||||
let mut listener_config = safe_clone_config(&config.validator_configs[0]);
|
||||
listener_config.voting_disabled = true;
|
||||
(0..config.num_listeners).for_each(|_| {
|
||||
cluster.add_validator(
|
||||
cluster.add_validator_listener(
|
||||
&listener_config,
|
||||
0,
|
||||
Arc::new(Keypair::new()),
|
||||
@ -316,11 +316,50 @@ impl LocalCluster {
|
||||
}
|
||||
}
|
||||
|
||||
/// Set up validator without voting or staking accounts
|
||||
pub fn add_validator_listener(
|
||||
&mut self,
|
||||
validator_config: &ValidatorConfig,
|
||||
stake: u64,
|
||||
validator_keypair: Arc<Keypair>,
|
||||
voting_keypair: Option<Arc<Keypair>>,
|
||||
socket_addr_space: SocketAddrSpace,
|
||||
) -> Pubkey {
|
||||
self.do_add_validator(
|
||||
validator_config,
|
||||
true,
|
||||
stake,
|
||||
validator_keypair,
|
||||
voting_keypair,
|
||||
socket_addr_space,
|
||||
)
|
||||
}
|
||||
|
||||
/// Set up validator with voting and staking accounts
|
||||
pub fn add_validator(
|
||||
&mut self,
|
||||
validator_config: &ValidatorConfig,
|
||||
stake: u64,
|
||||
validator_keypair: Arc<Keypair>,
|
||||
voting_keypair: Option<Arc<Keypair>>,
|
||||
socket_addr_space: SocketAddrSpace,
|
||||
) -> Pubkey {
|
||||
self.do_add_validator(
|
||||
validator_config,
|
||||
false,
|
||||
stake,
|
||||
validator_keypair,
|
||||
voting_keypair,
|
||||
socket_addr_space,
|
||||
)
|
||||
}
|
||||
|
||||
fn do_add_validator(
|
||||
&mut self,
|
||||
validator_config: &ValidatorConfig,
|
||||
is_listener: bool,
|
||||
stake: u64,
|
||||
validator_keypair: Arc<Keypair>,
|
||||
mut voting_keypair: Option<Arc<Keypair>>,
|
||||
socket_addr_space: SocketAddrSpace,
|
||||
) -> Pubkey {
|
||||
@ -339,30 +378,28 @@ impl LocalCluster {
|
||||
let contact_info = validator_node.info.clone();
|
||||
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&self.genesis_config);
|
||||
|
||||
if validator_config.voting_disabled {
|
||||
// Give the validator some lamports to setup vote accounts
|
||||
if is_listener {
|
||||
// setup as a listener
|
||||
info!("listener {} ", validator_pubkey,);
|
||||
} else {
|
||||
// Give the validator some lamports to setup vote accounts
|
||||
if should_create_vote_pubkey {
|
||||
let validator_balance = Self::transfer_with_client(
|
||||
&client,
|
||||
&self.funding_keypair,
|
||||
&validator_pubkey,
|
||||
stake * 2 + 2,
|
||||
);
|
||||
info!(
|
||||
"validator {} balance {}",
|
||||
validator_pubkey, validator_balance
|
||||
);
|
||||
Self::setup_vote_and_stake_accounts(
|
||||
&client,
|
||||
voting_keypair.as_ref().unwrap(),
|
||||
&validator_keypair,
|
||||
stake,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
} else if should_create_vote_pubkey {
|
||||
let validator_balance = Self::transfer_with_client(
|
||||
&client,
|
||||
&self.funding_keypair,
|
||||
&validator_pubkey,
|
||||
stake * 2 + 2,
|
||||
);
|
||||
info!(
|
||||
"validator {} balance {}",
|
||||
validator_pubkey, validator_balance
|
||||
);
|
||||
Self::setup_vote_and_stake_accounts(
|
||||
&client,
|
||||
voting_keypair.as_ref().unwrap(),
|
||||
&validator_keypair,
|
||||
stake,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let mut config = safe_clone_config(validator_config);
|
||||
|
@ -1153,6 +1153,18 @@ fn test_fork_choice_refresh_old_votes() {
|
||||
let lighter_fork_ledger_path = cluster.ledger_path(&context.lighter_fork_validator_key);
|
||||
let heaviest_ledger_path = cluster.ledger_path(&context.heaviest_validator_key);
|
||||
|
||||
// Get latest votes. We make sure to wait until the vote has landed in
|
||||
// blockstore. This is important because if we were the leader for the block there
|
||||
// is a possibility of voting before broadcast has inserted in blockstore.
|
||||
let lighter_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger(
|
||||
&lighter_fork_ledger_path,
|
||||
&context.lighter_fork_validator_key,
|
||||
);
|
||||
let heaviest_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger(
|
||||
&heaviest_ledger_path,
|
||||
&context.heaviest_validator_key,
|
||||
);
|
||||
|
||||
// Open ledgers
|
||||
let smallest_blockstore = open_blockstore(&smallest_ledger_path);
|
||||
let lighter_fork_blockstore = open_blockstore(&lighter_fork_ledger_path);
|
||||
@ -1160,15 +1172,6 @@ fn test_fork_choice_refresh_old_votes() {
|
||||
|
||||
info!("Opened blockstores");
|
||||
|
||||
// Get latest votes
|
||||
let (lighter_fork_latest_vote, _) = last_vote_in_tower(
|
||||
&lighter_fork_ledger_path,
|
||||
&context.lighter_fork_validator_key,
|
||||
)
|
||||
.unwrap();
|
||||
let (heaviest_fork_latest_vote, _) =
|
||||
last_vote_in_tower(&heaviest_ledger_path, &context.heaviest_validator_key).unwrap();
|
||||
|
||||
// Find the first slot on the smaller fork
|
||||
let lighter_ancestors: BTreeSet<Slot> = std::iter::once(lighter_fork_latest_vote)
|
||||
.chain(AncestorIterator::new(
|
||||
@ -1198,27 +1201,11 @@ fn test_fork_choice_refresh_old_votes() {
|
||||
|
||||
// Copy all the blocks from the smaller partition up to `first_slot_in_lighter_partition`
|
||||
// into the smallest validator's blockstore
|
||||
for lighter_slot in std::iter::once(first_slot_in_lighter_partition).chain(
|
||||
AncestorIterator::new(first_slot_in_lighter_partition, &lighter_fork_blockstore),
|
||||
) {
|
||||
let lighter_slot_meta =
|
||||
lighter_fork_blockstore.meta(lighter_slot).unwrap().unwrap();
|
||||
assert!(lighter_slot_meta.is_full());
|
||||
// Get the shreds from the leader of the smaller fork
|
||||
let lighter_fork_data_shreds = lighter_fork_blockstore
|
||||
.get_data_shreds_for_slot(lighter_slot, 0)
|
||||
.unwrap();
|
||||
|
||||
// Insert those shreds into the smallest validator's blockstore
|
||||
smallest_blockstore
|
||||
.insert_shreds(lighter_fork_data_shreds, None, false)
|
||||
.unwrap();
|
||||
|
||||
// Check insert succeeded
|
||||
let new_meta = smallest_blockstore.meta(lighter_slot).unwrap().unwrap();
|
||||
assert!(new_meta.is_full());
|
||||
assert_eq!(new_meta.last_index, lighter_slot_meta.last_index);
|
||||
}
|
||||
copy_blocks(
|
||||
first_slot_in_lighter_partition,
|
||||
&lighter_fork_blockstore,
|
||||
&smallest_blockstore,
|
||||
);
|
||||
|
||||
// Restart the smallest validator that we killed earlier in `on_partition_start()`
|
||||
drop(smallest_blockstore);
|
||||
@ -2442,6 +2429,20 @@ fn purge_slots(blockstore: &Blockstore, start_slot: Slot, slot_count: Slot) {
|
||||
blockstore.purge_slots(start_slot, start_slot + slot_count, PurgeType::Exact);
|
||||
}
|
||||
|
||||
fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) {
|
||||
for slot in std::iter::once(end_slot).chain(AncestorIterator::new(end_slot, source)) {
|
||||
let source_meta = source.meta(slot).unwrap().unwrap();
|
||||
assert!(source_meta.is_full());
|
||||
|
||||
let shreds = source.get_data_shreds_for_slot(slot, 0).unwrap();
|
||||
dest.insert_shreds(shreds, None, false).unwrap();
|
||||
|
||||
let dest_meta = dest.meta(slot).unwrap().unwrap();
|
||||
assert!(dest_meta.is_full());
|
||||
assert_eq!(dest_meta.last_index, source_meta.last_index);
|
||||
}
|
||||
}
|
||||
|
||||
fn restore_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option<Tower> {
|
||||
let tower = Tower::restore(ledger_path, node_pubkey);
|
||||
if let Err(tower_err) = tower {
|
||||
@ -2459,8 +2460,23 @@ fn last_vote_in_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option<(Slot,
|
||||
restore_tower(ledger_path, node_pubkey).map(|tower| tower.last_voted_slot_hash().unwrap())
|
||||
}
|
||||
|
||||
fn root_in_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option<Slot> {
|
||||
restore_tower(ledger_path, node_pubkey).map(|tower| tower.root())
|
||||
// Fetches the last vote in the tower, blocking until it has also appeared in blockstore.
|
||||
// Fails if tower is empty
|
||||
fn wait_for_last_vote_in_tower_to_land_in_ledger(ledger_path: &Path, node_pubkey: &Pubkey) -> Slot {
|
||||
let (last_vote, _) = last_vote_in_tower(ledger_path, node_pubkey).unwrap();
|
||||
loop {
|
||||
// We reopen in a loop to make sure we get updates
|
||||
let blockstore = open_blockstore(ledger_path);
|
||||
if blockstore.is_full(last_vote) {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_millis(100));
|
||||
}
|
||||
last_vote
|
||||
}
|
||||
|
||||
fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<Slot> {
|
||||
restore_tower(tower_path, node_pubkey).map(|tower| tower.root())
|
||||
}
|
||||
|
||||
fn remove_tower(ledger_path: &Path, node_pubkey: &Pubkey) {
|
||||
@ -2469,12 +2485,19 @@ fn remove_tower(ledger_path: &Path, node_pubkey: &Pubkey) {
|
||||
|
||||
// A bit convoluted test case; but this roughly follows this test theoretical scenario:
|
||||
//
|
||||
// Step 1: You have validator A + B with 31% and 36% of the stake:
|
||||
// Step 1: You have validator A + B with 31% and 36% of the stake. Run only validator B:
|
||||
//
|
||||
// S0 -> S1 -> S2 -> S3 (A + B vote, optimistically confirmed)
|
||||
// S0 -> S1 -> S2 -> S3 (B vote)
|
||||
//
|
||||
// Step 2: Turn off A + B, and truncate the ledger after slot `S3` (simulate votes not
|
||||
// Step 2: Turn off B, and truncate the ledger after slot `S3` (simulate votes not
|
||||
// landing in next slot).
|
||||
// Copy ledger fully to validator A and validator C
|
||||
//
|
||||
// Step 3: Turn on A, and have it vote up to S3. Truncate anything past slot `S3`.
|
||||
//
|
||||
// S0 -> S1 -> S2 -> S3 (A & B vote, optimistically confirmed)
|
||||
//
|
||||
// Step 4:
|
||||
// Start validator C with 33% of the stake with same ledger, but only up to slot S2.
|
||||
// Have `C` generate some blocks like:
|
||||
//
|
||||
@ -2493,7 +2516,7 @@ fn remove_tower(ledger_path: &Path, node_pubkey: &Pubkey) {
|
||||
// |
|
||||
// -> S4 (C) -> S5
|
||||
//
|
||||
// Step 4:
|
||||
// Step 5:
|
||||
// Without the persisted tower:
|
||||
// `A` would choose to vote on the fork with `S4 -> S5`. This is true even if `A`
|
||||
// generates a new fork starting at slot `S3` because `C` has more stake than `A`
|
||||
@ -2535,13 +2558,27 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
|
||||
let (validator_a_pubkey, validator_b_pubkey, validator_c_pubkey) =
|
||||
(validators[0], validators[1], validators[2]);
|
||||
|
||||
// Disable voting on all validators other than validator B to ensure neither of the below two
|
||||
// scenarios occur:
|
||||
// 1. If the cluster immediately forks on restart while we're killing validators A and C,
|
||||
// with Validator B on one side, and `A` and `C` on a heavier fork, it's possible that the lockouts
|
||||
// on `A` and `C`'s latest votes do not extend past validator B's latest vote. Then validator B
|
||||
// will be stuck unable to vote, but also unable generate a switching proof to the heavier fork.
|
||||
//
|
||||
// 2. Validator A doesn't vote past `next_slot_on_a` before we can kill it. This is essential
|
||||
// because if validator A votes past `next_slot_on_a`, and then we copy over validator B's ledger
|
||||
// below only for slots <= `next_slot_on_a`, validator A will not know how it's last vote chains
|
||||
// to the otehr forks, and may violate switching proofs on restart.
|
||||
let mut validator_configs =
|
||||
make_identical_validator_configs(&ValidatorConfig::default(), node_stakes.len());
|
||||
|
||||
validator_configs[0].voting_disabled = true;
|
||||
validator_configs[2].voting_disabled = true;
|
||||
|
||||
let mut config = ClusterConfig {
|
||||
cluster_lamports: 100_000,
|
||||
node_stakes: node_stakes.clone(),
|
||||
validator_configs: make_identical_validator_configs(
|
||||
&ValidatorConfig::default(),
|
||||
node_stakes.len(),
|
||||
),
|
||||
node_stakes,
|
||||
validator_configs,
|
||||
validator_keys: Some(validator_keys),
|
||||
slots_per_epoch,
|
||||
stakers_slot_offset: slots_per_epoch,
|
||||
@ -2558,54 +2595,114 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
|
||||
let val_b_ledger_path = cluster.ledger_path(&validator_b_pubkey);
|
||||
let val_c_ledger_path = cluster.ledger_path(&validator_c_pubkey);
|
||||
|
||||
// Immediately kill validator C
|
||||
let validator_c_info = cluster.exit_node(&validator_c_pubkey);
|
||||
info!(
|
||||
"val_a {} ledger path {:?}",
|
||||
validator_a_pubkey, val_a_ledger_path
|
||||
);
|
||||
info!(
|
||||
"val_b {} ledger path {:?}",
|
||||
validator_b_pubkey, val_b_ledger_path
|
||||
);
|
||||
info!(
|
||||
"val_c {} ledger path {:?}",
|
||||
validator_c_pubkey, val_c_ledger_path
|
||||
);
|
||||
|
||||
// Immediately kill validator A, and C
|
||||
info!("Exiting validators A and C");
|
||||
let mut validator_a_info = cluster.exit_node(&validator_a_pubkey);
|
||||
let mut validator_c_info = cluster.exit_node(&validator_c_pubkey);
|
||||
|
||||
// Step 1:
|
||||
// Let validator A, B, (D) run for a while.
|
||||
let (mut validator_a_finished, mut validator_b_finished) = (false, false);
|
||||
// Let validator B, (D) run for a while.
|
||||
let now = Instant::now();
|
||||
while !(validator_a_finished && validator_b_finished) {
|
||||
loop {
|
||||
let elapsed = now.elapsed();
|
||||
if elapsed > Duration::from_secs(30) {
|
||||
panic!(
|
||||
"LocalCluster nodes failed to log enough tower votes in {} secs",
|
||||
elapsed.as_secs()
|
||||
);
|
||||
}
|
||||
assert!(
|
||||
elapsed <= Duration::from_secs(30),
|
||||
"Validator B failed to vote on any slot >= {} in {} secs",
|
||||
next_slot_on_a,
|
||||
elapsed.as_secs()
|
||||
);
|
||||
sleep(Duration::from_millis(100));
|
||||
|
||||
if let Some((last_vote, _)) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) {
|
||||
if !validator_a_finished && last_vote >= next_slot_on_a {
|
||||
validator_a_finished = true;
|
||||
}
|
||||
}
|
||||
if let Some((last_vote, _)) = last_vote_in_tower(&val_b_ledger_path, &validator_b_pubkey) {
|
||||
if !validator_b_finished && last_vote >= next_slot_on_a {
|
||||
validator_b_finished = true;
|
||||
if last_vote >= next_slot_on_a {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// kill them at once after the above loop; otherwise one might stall the other!
|
||||
let validator_a_info = cluster.exit_node(&validator_a_pubkey);
|
||||
// kill B
|
||||
let _validator_b_info = cluster.exit_node(&validator_b_pubkey);
|
||||
|
||||
// Step 2:
|
||||
// Stop validator and truncate ledger
|
||||
// Stop validator and truncate ledger, copy over B's ledger to A
|
||||
info!("truncate validator C's ledger");
|
||||
{
|
||||
// first copy from validator A's ledger
|
||||
// first copy from validator B's ledger
|
||||
std::fs::remove_dir_all(&validator_c_info.info.ledger_path).unwrap();
|
||||
let mut opt = fs_extra::dir::CopyOptions::new();
|
||||
opt.copy_inside = true;
|
||||
fs_extra::dir::copy(&val_a_ledger_path, &val_c_ledger_path, &opt).unwrap();
|
||||
// Remove A's tower in the C's new copied ledger
|
||||
remove_tower(&validator_c_info.info.ledger_path, &validator_a_pubkey);
|
||||
fs_extra::dir::copy(&val_b_ledger_path, &val_c_ledger_path, &opt).unwrap();
|
||||
// Remove B's tower in the C's new copied ledger
|
||||
remove_tower(&val_c_ledger_path, &validator_b_pubkey);
|
||||
|
||||
let blockstore = open_blockstore(&validator_c_info.info.ledger_path);
|
||||
let blockstore = open_blockstore(&val_c_ledger_path);
|
||||
purge_slots(&blockstore, base_slot + 1, truncated_slots);
|
||||
}
|
||||
info!("truncate validator A's ledger");
|
||||
info!("Create validator A's ledger");
|
||||
{
|
||||
// Find latest vote in B, and wait for it to reach blockstore
|
||||
let b_last_vote =
|
||||
wait_for_last_vote_in_tower_to_land_in_ledger(&val_b_ledger_path, &validator_b_pubkey);
|
||||
|
||||
// Now we copy these blocks to A
|
||||
let b_blockstore = open_blockstore(&val_b_ledger_path);
|
||||
let a_blockstore = open_blockstore(&val_a_ledger_path);
|
||||
copy_blocks(b_last_vote, &b_blockstore, &a_blockstore);
|
||||
|
||||
// Purge uneccessary slots
|
||||
purge_slots(&a_blockstore, next_slot_on_a + 1, truncated_slots);
|
||||
}
|
||||
|
||||
// Step 3:
|
||||
// Restart A with voting enabled so that it can vote on B's fork
|
||||
// up to `next_slot_on_a`, thereby optimistcally confirming `next_slot_on_a`
|
||||
info!("Restarting A");
|
||||
validator_a_info.config.voting_disabled = false;
|
||||
cluster.restart_node(
|
||||
&validator_a_pubkey,
|
||||
validator_a_info,
|
||||
SocketAddrSpace::Unspecified,
|
||||
);
|
||||
|
||||
info!("Waiting for A to vote on slot descended from slot `next_slot_on_a`");
|
||||
let now = Instant::now();
|
||||
loop {
|
||||
if let Some((last_vote_slot, _)) =
|
||||
last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey)
|
||||
{
|
||||
if last_vote_slot >= next_slot_on_a {
|
||||
info!(
|
||||
"Validator A has caught up and voted on slot: {}",
|
||||
last_vote_slot
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if now.elapsed().as_secs() >= 30 {
|
||||
panic!(
|
||||
"Validator A has not seen optimistic confirmation slot > {} in 30 seconds",
|
||||
next_slot_on_a
|
||||
);
|
||||
}
|
||||
|
||||
sleep(Duration::from_millis(20));
|
||||
}
|
||||
|
||||
info!("Killing A");
|
||||
let validator_a_info = cluster.exit_node(&validator_a_pubkey);
|
||||
{
|
||||
let blockstore = open_blockstore(&val_a_ledger_path);
|
||||
purge_slots(&blockstore, next_slot_on_a + 1, truncated_slots);
|
||||
@ -2625,10 +2722,10 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3:
|
||||
// Step 4:
|
||||
// Run validator C only to make it produce and vote on its own fork.
|
||||
info!("Restart validator C again!!!");
|
||||
let val_c_ledger_path = validator_c_info.info.ledger_path.clone();
|
||||
validator_c_info.config.voting_disabled = false;
|
||||
cluster.restart_node(
|
||||
&validator_c_pubkey,
|
||||
validator_c_info,
|
||||
@ -2652,7 +2749,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
|
||||
assert!(!votes_on_c_fork.is_empty());
|
||||
info!("collected validator C's votes: {:?}", votes_on_c_fork);
|
||||
|
||||
// Step 4:
|
||||
// Step 5:
|
||||
// verify whether there was violation or not
|
||||
info!("Restart validator A again!!!");
|
||||
cluster.restart_node(
|
||||
|
Reference in New Issue
Block a user