Support local cluster edge case testing (#7135)

* Refactor local cluster to support killing a partition

* Rework run_network_partition

* Introduce fixed leader schedule

* Plumb fixed schedule into test
This commit is contained in:
carllin
2019-12-03 16:31:59 -08:00
committed by GitHub
parent f75c51ff71
commit f0a40862d6
10 changed files with 311 additions and 94 deletions

View File

@ -1,5 +1,6 @@
use solana_client::thin_client::ThinClient;
use solana_core::contact_info::ContactInfo;
use solana_core::validator::Validator;
use solana_core::validator::ValidatorConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
@ -17,13 +18,19 @@ pub struct ValidatorInfo {
pub struct ClusterValidatorInfo {
pub info: ValidatorInfo,
pub config: ValidatorConfig,
pub validator: Option<Validator>,
}
impl ClusterValidatorInfo {
pub fn new(validator_info: ValidatorInfo, config: ValidatorConfig) -> Self {
pub fn new(
validator_info: ValidatorInfo,
config: ValidatorConfig,
validator: Validator,
) -> Self {
Self {
info: validator_info,
config,
validator: Some(validator),
}
}
}

View File

@ -15,9 +15,12 @@ use solana_ledger::{
};
use solana_sdk::{
client::SyncClient,
clock::{Slot, DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS},
clock::{
Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT,
NUM_CONSECUTIVE_LEADER_SLOTS,
},
commitment_config::CommitmentConfig,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH},
hash::Hash,
poh_config::PohConfig,
pubkey::Pubkey,
@ -169,6 +172,11 @@ pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) {
}
}
pub fn time_until_nth_epoch(epoch: u64, slots_per_epoch: u64, stakers_slot_offset: u64) -> u64 {
let epoch_schedule = EpochSchedule::custom(slots_per_epoch, stakers_slot_offset, true);
epoch_schedule.get_last_slot_in_epoch(epoch) * DEFAULT_MS_PER_SLOT
}
pub fn sleep_n_epochs(
num_epochs: f64,
config: &PohConfig,

View File

@ -1,4 +1,5 @@
use crate::cluster::{Cluster, ClusterValidatorInfo, ValidatorInfo};
use itertools::izip;
use log::*;
use solana_client::thin_client::{create_client, ThinClient};
use solana_core::{
@ -39,6 +40,7 @@ use std::{
collections::HashMap,
fs::remove_dir_all,
io::{Error, ErrorKind, Result},
iter,
path::PathBuf,
sync::Arc,
};
@ -66,6 +68,8 @@ pub struct ClusterConfig {
pub num_archivers: usize,
/// Number of nodes that are unstaked and not voting (a.k.a listening)
pub num_listeners: u64,
/// The specific pubkeys of each node if specified
pub validator_keys: Option<Vec<Arc<Keypair>>>,
/// The stakes of each node
pub node_stakes: Vec<u64>,
/// The total lamports available to the cluster
@ -85,6 +89,7 @@ impl Default for ClusterConfig {
validator_configs: vec![],
num_archivers: 0,
num_listeners: 0,
validator_keys: None,
node_stakes: vec![],
cluster_lamports: 0,
ticks_per_slot: DEFAULT_TICKS_PER_SLOT,
@ -103,9 +108,7 @@ pub struct LocalCluster {
pub funding_keypair: Keypair,
/// Entry point from which the rest of the network can be discovered
pub entry_point_info: ContactInfo,
pub validator_infos: HashMap<Pubkey, ClusterValidatorInfo>,
pub listener_infos: HashMap<Pubkey, ClusterValidatorInfo>,
validators: HashMap<Pubkey, Validator>,
pub validators: HashMap<Pubkey, ClusterValidatorInfo>,
pub genesis_config: GenesisConfig,
archivers: Vec<Archiver>,
pub archiver_infos: HashMap<Pubkey, ArchiverInfo>,
@ -129,9 +132,20 @@ impl LocalCluster {
pub fn new(config: &ClusterConfig) -> Self {
assert_eq!(config.validator_configs.len(), config.node_stakes.len());
let leader_keypair = Arc::new(Keypair::new());
let validator_keys = {
if let Some(ref keys) = config.validator_keys {
assert_eq!(config.validator_configs.len(), keys.len());
keys.clone()
} else {
iter::repeat_with(|| Arc::new(Keypair::new()))
.take(config.validator_configs.len())
.collect()
}
};
let leader_keypair = &validator_keys[0];
let leader_pubkey = leader_keypair.pubkey();
let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
let leader_node = Node::new_localhost_with_pubkey(&leader_pubkey);
let GenesisConfigInfo {
mut genesis_config,
mint_keypair,
@ -208,20 +222,22 @@ impl LocalCluster {
);
let mut validators = HashMap::new();
let mut validator_infos = HashMap::new();
validators.insert(leader_pubkey, leader_server);
error!("leader_pubkey: {}", leader_pubkey);
let leader_info = ValidatorInfo {
keypair: leader_keypair,
keypair: leader_keypair.clone(),
voting_keypair: leader_voting_keypair,
storage_keypair: leader_storage_keypair,
ledger_path: leader_ledger_path,
contact_info: leader_contact_info.clone(),
};
let cluster_leader =
ClusterValidatorInfo::new(leader_info, config.validator_configs[0].clone());
let cluster_leader = ClusterValidatorInfo::new(
leader_info,
config.validator_configs[0].clone(),
leader_server,
);
validator_infos.insert(leader_pubkey, cluster_leader);
validators.insert(leader_pubkey, cluster_leader);
let mut cluster = Self {
funding_keypair: mint_keypair,
@ -229,23 +245,24 @@ impl LocalCluster {
validators,
archivers: vec![],
genesis_config,
validator_infos,
archiver_infos: HashMap::new(),
listener_infos: HashMap::new(),
};
for (stake, validator_config) in (&config.node_stakes[1..])
.iter()
.zip((&config.validator_configs[1..]).iter())
{
cluster.add_validator(validator_config, *stake);
for (stake, validator_config, key) in izip!(
(&config.node_stakes[1..]).iter(),
config.validator_configs[1..].iter(),
validator_keys[1..].iter(),
) {
cluster.add_validator(validator_config, *stake, key.clone());
}
let listener_config = ValidatorConfig {
voting_disabled: true,
..config.validator_configs[0].clone()
};
(0..config.num_listeners).for_each(|_| cluster.add_validator(&listener_config, 0));
(0..config.num_listeners).for_each(|_| {
cluster.add_validator(&listener_config, 0, Arc::new(Keypair::new()));
});
discover_cluster(
&cluster.entry_point_info.gossip,
@ -268,14 +285,18 @@ impl LocalCluster {
pub fn exit(&mut self) {
for node in self.validators.values_mut() {
node.exit();
if let Some(ref mut v) = node.validator {
v.exit();
}
}
}
pub fn close_preserve_ledgers(&mut self) {
self.exit();
for (_, node) in self.validators.drain() {
node.join().unwrap();
for (_, node) in self.validators.iter_mut() {
if let Some(v) = node.validator.take() {
v.join().unwrap();
}
}
while let Some(archiver) = self.archivers.pop() {
@ -283,14 +304,18 @@ impl LocalCluster {
}
}
pub fn add_validator(&mut self, validator_config: &ValidatorConfig, stake: u64) {
pub fn add_validator(
&mut self,
validator_config: &ValidatorConfig,
stake: u64,
validator_keypair: Arc<Keypair>,
) -> Pubkey {
let client = create_client(
self.entry_point_info.client_facing_addr(),
VALIDATOR_PORT_RANGE,
);
// Must have enough tokens to fund vote account and set delegate
let validator_keypair = Arc::new(Keypair::new());
let voting_keypair = Keypair::new();
let storage_keypair = Arc::new(Keypair::new());
let validator_pubkey = validator_keypair.pubkey();
@ -341,8 +366,6 @@ impl LocalCluster {
&config,
);
self.validators
.insert(validator_keypair.pubkey(), validator_server);
let validator_pubkey = validator_keypair.pubkey();
let validator_info = ClusterValidatorInfo::new(
ValidatorInfo {
@ -353,14 +376,11 @@ impl LocalCluster {
contact_info,
},
validator_config.clone(),
validator_server,
);
if validator_config.voting_disabled {
self.listener_infos.insert(validator_pubkey, validator_info);
} else {
self.validator_infos
.insert(validator_pubkey, validator_info);
}
self.validators.insert(validator_pubkey, validator_info);
validator_pubkey
}
fn add_archiver(&mut self) {
@ -405,7 +425,7 @@ impl LocalCluster {
fn close(&mut self) {
self.close_preserve_ledgers();
for ledger_path in self
.validator_infos
.validators
.values()
.map(|f| &f.info.ledger_path)
.chain(self.archiver_infos.values().map(|info| &info.ledger_path))
@ -616,7 +636,7 @@ impl Cluster for LocalCluster {
}
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient> {
self.validator_infos.get(pubkey).map(|f| {
self.validators.get(pubkey).map(|f| {
create_client(
f.info.contact_info.client_facing_addr(),
VALIDATOR_PORT_RANGE,
@ -628,10 +648,10 @@ impl Cluster for LocalCluster {
let mut node = self.validators.remove(&pubkey).unwrap();
// Shut down the validator
node.exit();
node.join().unwrap();
self.validator_infos.remove(&pubkey).unwrap()
let mut validator = node.validator.take().expect("Validator must be running");
validator.exit();
validator.join().unwrap();
node
}
fn restart_node(&mut self, pubkey: &Pubkey, mut cluster_validator_info: ClusterValidatorInfo) {
@ -666,8 +686,8 @@ impl Cluster for LocalCluster {
&cluster_validator_info.config,
);
self.validators.insert(*pubkey, restarted_node);
self.validator_infos.insert(*pubkey, cluster_validator_info);
cluster_validator_info.validator = Some(restarted_node);
self.validators.insert(*pubkey, cluster_validator_info);
}
fn exit_restart_node(&mut self, pubkey: &Pubkey, validator_config: ValidatorConfig) {