Prevent scans on unrooted slots from seeing partial clean (#13628)
Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
@ -1,10 +1,14 @@
|
||||
use assert_matches::assert_matches;
|
||||
use crossbeam_channel::{unbounded, Receiver};
|
||||
use gag::BufferRedirect;
|
||||
use log::*;
|
||||
use serial_test_derive::serial;
|
||||
use solana_client::{
|
||||
pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::RpcSignatureResult,
|
||||
thin_client::create_client,
|
||||
pubsub_client::PubsubClient,
|
||||
rpc_client::RpcClient,
|
||||
rpc_config::RpcProgramAccountsConfig,
|
||||
rpc_response::RpcSignatureResult,
|
||||
thin_client::{create_client, ThinClient},
|
||||
};
|
||||
use solana_core::{
|
||||
broadcast_stage::BroadcastStageType,
|
||||
@ -42,7 +46,7 @@ use solana_sdk::{
|
||||
poh_config::PohConfig,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signer},
|
||||
system_transaction,
|
||||
system_program, system_transaction,
|
||||
};
|
||||
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
|
||||
use std::{
|
||||
@ -53,7 +57,7 @@ use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
sync::Arc,
|
||||
thread::sleep,
|
||||
thread::{sleep, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
use tempfile::TempDir;
|
||||
@ -301,8 +305,8 @@ fn run_cluster_partition<E, F>(
|
||||
on_partition_resolved: F,
|
||||
additional_accounts: Vec<(Pubkey, Account)>,
|
||||
) where
|
||||
E: Fn(&mut LocalCluster),
|
||||
F: Fn(&mut LocalCluster),
|
||||
E: FnOnce(&mut LocalCluster),
|
||||
F: FnOnce(&mut LocalCluster),
|
||||
{
|
||||
solana_logger::setup();
|
||||
info!("PARTITION_TEST!");
|
||||
@ -454,6 +458,35 @@ fn test_cluster_partition_1_1_1() {
|
||||
)
|
||||
}
|
||||
|
||||
fn create_custom_leader_schedule(
|
||||
num_validators: usize,
|
||||
num_slots_per_validator: usize,
|
||||
) -> (LeaderSchedule, Vec<Arc<Keypair>>) {
|
||||
let mut leader_schedule = vec![];
|
||||
let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new()))
|
||||
.take(num_validators)
|
||||
.collect();
|
||||
for (i, k) in validator_keys.iter().enumerate() {
|
||||
let num_slots = {
|
||||
if i == 0 {
|
||||
// Set up the leader to have 50% of the slots
|
||||
num_slots_per_validator * (num_validators - 1)
|
||||
} else {
|
||||
num_slots_per_validator
|
||||
}
|
||||
};
|
||||
for _ in 0..num_slots {
|
||||
leader_schedule.push(k.pubkey())
|
||||
}
|
||||
}
|
||||
|
||||
info!("leader_schedule: {}", leader_schedule.len());
|
||||
(
|
||||
LeaderSchedule::new_from_schedule(leader_schedule),
|
||||
validator_keys,
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_kill_heaviest_partition() {
|
||||
@ -465,26 +498,10 @@ fn test_kill_heaviest_partition() {
|
||||
// 3) Kills the most staked partition. Validators are locked out, but should all
|
||||
// eventually choose the major partition
|
||||
// 4) Check for recovery
|
||||
let mut leader_schedule = vec![];
|
||||
let num_slots_per_validator = 8;
|
||||
let partitions: [&[usize]; 4] = [&[11], &[10], &[10], &[10]];
|
||||
let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new()))
|
||||
.take(partitions.len())
|
||||
.collect();
|
||||
for (i, k) in validator_keys.iter().enumerate() {
|
||||
let num_slots = {
|
||||
if i == 0 {
|
||||
// Set up the leader to have 50% of the slots
|
||||
num_slots_per_validator * (partitions.len() - 1)
|
||||
} else {
|
||||
num_slots_per_validator
|
||||
}
|
||||
};
|
||||
for _ in 0..num_slots {
|
||||
leader_schedule.push(k.pubkey())
|
||||
}
|
||||
}
|
||||
info!("leader_schedule: {}", leader_schedule.len());
|
||||
let (leader_schedule, validator_keys) =
|
||||
create_custom_leader_schedule(partitions.len(), num_slots_per_validator);
|
||||
|
||||
let empty = |_: &mut LocalCluster| {};
|
||||
let validator_to_kill = validator_keys[0].pubkey();
|
||||
@ -495,10 +512,7 @@ fn test_kill_heaviest_partition() {
|
||||
};
|
||||
run_cluster_partition(
|
||||
&partitions,
|
||||
Some((
|
||||
LeaderSchedule::new_from_schedule(leader_schedule),
|
||||
validator_keys,
|
||||
)),
|
||||
Some((leader_schedule, validator_keys)),
|
||||
empty,
|
||||
on_partition_resolved,
|
||||
vec![],
|
||||
@ -526,30 +540,14 @@ fn run_kill_partition_switch_threshold<F>(
|
||||
// 1) Spins up three partitions
|
||||
// 2) Kills the first partition with the stake `failures_stake`
|
||||
// 5) runs `on_partition_resolved`
|
||||
let mut leader_schedule = vec![];
|
||||
let num_slots_per_validator = 8;
|
||||
let partitions: [&[usize]; 3] = [
|
||||
&[(failures_stake as usize)],
|
||||
&[(alive_stake_1 as usize)],
|
||||
&[(alive_stake_2 as usize)],
|
||||
];
|
||||
let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new()))
|
||||
.take(partitions.len())
|
||||
.collect();
|
||||
for (i, k) in validator_keys.iter().enumerate() {
|
||||
let num_slots = {
|
||||
if i == 0 {
|
||||
// Set up the leader to have 50% of the slots
|
||||
num_slots_per_validator * (partitions.len() - 1)
|
||||
} else {
|
||||
num_slots_per_validator
|
||||
}
|
||||
};
|
||||
for _ in 0..num_slots {
|
||||
leader_schedule.push(k.pubkey())
|
||||
}
|
||||
}
|
||||
info!("leader_schedule: {}", leader_schedule.len());
|
||||
let (leader_schedule, validator_keys) =
|
||||
create_custom_leader_schedule(partitions.len(), num_slots_per_validator);
|
||||
|
||||
let validator_to_kill = validator_keys[0].pubkey();
|
||||
let on_partition_start = |cluster: &mut LocalCluster| {
|
||||
@ -558,10 +556,7 @@ fn run_kill_partition_switch_threshold<F>(
|
||||
};
|
||||
run_cluster_partition(
|
||||
&partitions,
|
||||
Some((
|
||||
LeaderSchedule::new_from_schedule(leader_schedule),
|
||||
validator_keys,
|
||||
)),
|
||||
Some((leader_schedule, validator_keys)),
|
||||
on_partition_start,
|
||||
on_partition_resolved,
|
||||
vec![],
|
||||
@ -2143,6 +2138,231 @@ fn test_optimistic_confirmation_violation_without_tower() {
|
||||
do_test_optimistic_confirmation_violation_with_or_without_tower(false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_run_test_load_program_accounts_root() {
|
||||
run_test_load_program_accounts(CommitmentConfig::root());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_run_test_load_program_accounts_partition_root() {
|
||||
run_test_load_program_accounts_partition(CommitmentConfig::root());
|
||||
}
|
||||
|
||||
fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {
|
||||
let num_slots_per_validator = 8;
|
||||
let partitions: [&[usize]; 2] = [&[(1 as usize)], &[(1 as usize)]];
|
||||
let (leader_schedule, validator_keys) =
|
||||
create_custom_leader_schedule(partitions.len(), num_slots_per_validator);
|
||||
|
||||
let (update_client_sender, update_client_receiver) = unbounded();
|
||||
let (scan_client_sender, scan_client_receiver) = unbounded();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let (t_update, t_scan, additional_accounts) = setup_transfer_scan_threads(
|
||||
1000,
|
||||
exit.clone(),
|
||||
scan_commitment,
|
||||
update_client_receiver,
|
||||
scan_client_receiver,
|
||||
);
|
||||
|
||||
let on_partition_start = |cluster: &mut LocalCluster| {
|
||||
let update_client = cluster
|
||||
.get_validator_client(&cluster.entry_point_info.id)
|
||||
.unwrap();
|
||||
update_client_sender.send(update_client).unwrap();
|
||||
let scan_client = cluster
|
||||
.get_validator_client(&cluster.entry_point_info.id)
|
||||
.unwrap();
|
||||
scan_client_sender.send(scan_client).unwrap();
|
||||
};
|
||||
|
||||
let on_partition_resolved = |cluster: &mut LocalCluster| {
|
||||
cluster.check_for_new_roots(20, &"run_test_load_program_accounts_partition");
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_update.join().unwrap();
|
||||
t_scan.join().unwrap();
|
||||
};
|
||||
|
||||
run_cluster_partition(
|
||||
&partitions,
|
||||
Some((leader_schedule, validator_keys)),
|
||||
on_partition_start,
|
||||
on_partition_resolved,
|
||||
additional_accounts,
|
||||
);
|
||||
}
|
||||
|
||||
fn setup_transfer_scan_threads(
|
||||
num_starting_accounts: usize,
|
||||
exit: Arc<AtomicBool>,
|
||||
scan_commitment: CommitmentConfig,
|
||||
update_client_receiver: Receiver<ThinClient>,
|
||||
scan_client_receiver: Receiver<ThinClient>,
|
||||
) -> (JoinHandle<()>, JoinHandle<()>, Vec<(Pubkey, Account)>) {
|
||||
let exit_ = exit.clone();
|
||||
let starting_keypairs: Arc<Vec<Keypair>> = Arc::new(
|
||||
iter::repeat_with(Keypair::new)
|
||||
.take(num_starting_accounts)
|
||||
.collect(),
|
||||
);
|
||||
let target_keypairs: Arc<Vec<Keypair>> = Arc::new(
|
||||
iter::repeat_with(Keypair::new)
|
||||
.take(num_starting_accounts)
|
||||
.collect(),
|
||||
);
|
||||
let starting_accounts: Vec<(Pubkey, Account)> = starting_keypairs
|
||||
.iter()
|
||||
.map(|k| (k.pubkey(), Account::new(1, 0, &system_program::id())))
|
||||
.collect();
|
||||
|
||||
let starting_keypairs_ = starting_keypairs.clone();
|
||||
let target_keypairs_ = target_keypairs.clone();
|
||||
let t_update = Builder::new()
|
||||
.name("update".to_string())
|
||||
.spawn(move || {
|
||||
let client = update_client_receiver.recv().unwrap();
|
||||
loop {
|
||||
if exit_.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
let (blockhash, _fee_calculator, _last_valid_slot) = client
|
||||
.get_recent_blockhash_with_commitment(CommitmentConfig::recent())
|
||||
.unwrap();
|
||||
for i in 0..starting_keypairs_.len() {
|
||||
client
|
||||
.async_transfer(
|
||||
1,
|
||||
&starting_keypairs_[i],
|
||||
&target_keypairs_[i].pubkey(),
|
||||
blockhash,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
for i in 0..starting_keypairs_.len() {
|
||||
client
|
||||
.async_transfer(
|
||||
1,
|
||||
&target_keypairs_[i],
|
||||
&starting_keypairs_[i].pubkey(),
|
||||
blockhash,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Scan, the total funds should add up to the original
|
||||
let mut scan_commitment_config = RpcProgramAccountsConfig::default();
|
||||
scan_commitment_config.account_config.commitment = Some(scan_commitment);
|
||||
let tracked_pubkeys: HashSet<Pubkey> = starting_keypairs
|
||||
.iter()
|
||||
.chain(target_keypairs.iter())
|
||||
.map(|k| k.pubkey())
|
||||
.collect();
|
||||
let expected_total_balance = num_starting_accounts as u64;
|
||||
let t_scan = Builder::new()
|
||||
.name("scan".to_string())
|
||||
.spawn(move || {
|
||||
let client = scan_client_receiver.recv().unwrap();
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
if let Some(total_scan_balance) = client
|
||||
.get_program_accounts_with_config(
|
||||
&system_program::id(),
|
||||
scan_commitment_config.clone(),
|
||||
)
|
||||
.ok()
|
||||
.map(|result| {
|
||||
result
|
||||
.into_iter()
|
||||
.map(|(key, account)| {
|
||||
if tracked_pubkeys.contains(&key) {
|
||||
account.lamports
|
||||
} else {
|
||||
0
|
||||
}
|
||||
})
|
||||
.sum::<u64>()
|
||||
})
|
||||
{
|
||||
assert_eq!(total_scan_balance, expected_total_balance);
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
(t_update, t_scan, starting_accounts)
|
||||
}
|
||||
|
||||
fn run_test_load_program_accounts(scan_commitment: CommitmentConfig) {
|
||||
solana_logger::setup();
|
||||
// First set up the cluster with 2 nodes
|
||||
let slots_per_epoch = 2048;
|
||||
let node_stakes = vec![51, 50];
|
||||
let validator_keys: Vec<_> = vec![
|
||||
"4qhhXNTbKD1a5vxDDLZcHKj7ELNeiivtUBxn3wUK1F5VRsQVP89VUhfXqSfgiFB14GfuBgtrQ96n9NvWQADVkcCg",
|
||||
"3kHBzVwie5vTEaY6nFCPeFT8qDpoXzn7dCEioGRNBTnUDpvwnG85w8Wq63gVWpVTP8k2a8cgcWRjSXyUkEygpXWS",
|
||||
]
|
||||
.iter()
|
||||
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
|
||||
.take(node_stakes.len())
|
||||
.collect();
|
||||
|
||||
let num_starting_accounts = 1000;
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let (update_client_sender, update_client_receiver) = unbounded();
|
||||
let (scan_client_sender, scan_client_receiver) = unbounded();
|
||||
|
||||
// Setup the update/scan threads
|
||||
let (t_update, t_scan, starting_accounts) = setup_transfer_scan_threads(
|
||||
num_starting_accounts,
|
||||
exit.clone(),
|
||||
scan_commitment,
|
||||
update_client_receiver,
|
||||
scan_client_receiver,
|
||||
);
|
||||
|
||||
let mut config = ClusterConfig {
|
||||
cluster_lamports: 100_000,
|
||||
node_stakes: node_stakes.clone(),
|
||||
validator_configs: vec![ValidatorConfig::default(); node_stakes.len()],
|
||||
validator_keys: Some(validator_keys),
|
||||
slots_per_epoch,
|
||||
stakers_slot_offset: slots_per_epoch,
|
||||
skip_warmup_slots: true,
|
||||
additional_accounts: starting_accounts,
|
||||
..ClusterConfig::default()
|
||||
};
|
||||
let cluster = LocalCluster::new(&mut config);
|
||||
|
||||
// Give the threads a client to use for querying the cluster
|
||||
let all_pubkeys = cluster.get_node_pubkeys();
|
||||
let other_validator_id = all_pubkeys
|
||||
.into_iter()
|
||||
.find(|x| *x != cluster.entry_point_info.id)
|
||||
.unwrap();
|
||||
let client = cluster
|
||||
.get_validator_client(&cluster.entry_point_info.id)
|
||||
.unwrap();
|
||||
update_client_sender.send(client).unwrap();
|
||||
let scan_client = cluster.get_validator_client(&other_validator_id).unwrap();
|
||||
scan_client_sender.send(scan_client).unwrap();
|
||||
|
||||
// Wait for some roots to pass
|
||||
cluster.check_for_new_roots(40, &"run_test_load_program_accounts");
|
||||
|
||||
// Exit and ensure no violations of consistency were found
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_update.join().unwrap();
|
||||
t_scan.join().unwrap();
|
||||
}
|
||||
|
||||
fn wait_for_next_snapshot(
|
||||
cluster: &LocalCluster,
|
||||
snapshot_package_output_path: &Path,
|
||||
|
Reference in New Issue
Block a user