automerge
This commit is contained in:
@ -1,13 +1,11 @@
|
||||
use assert_matches::assert_matches;
|
||||
use log::*;
|
||||
use serial_test_derive::serial;
|
||||
use solana_client::rpc_client::RpcClient;
|
||||
use solana_client::thin_client::create_client;
|
||||
use solana_core::{
|
||||
broadcast_stage::BroadcastStageType,
|
||||
consensus::VOTE_THRESHOLD_DEPTH,
|
||||
gossip_service::discover_cluster,
|
||||
partition_cfg::{Partition, PartitionCfg},
|
||||
validator::ValidatorConfig,
|
||||
broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH,
|
||||
gossip_service::discover_cluster, validator::ValidatorConfig,
|
||||
};
|
||||
use solana_ledger::{
|
||||
bank_forks::SnapshotConfig, blockstore::Blockstore, leader_schedule::FixedSchedule,
|
||||
@ -18,7 +16,6 @@ use solana_local_cluster::{
|
||||
cluster_tests,
|
||||
local_cluster::{ClusterConfig, LocalCluster},
|
||||
};
|
||||
use solana_sdk::timing::timestamp;
|
||||
use solana_sdk::{
|
||||
client::SyncClient,
|
||||
clock,
|
||||
@ -28,6 +25,7 @@ use solana_sdk::{
|
||||
poh_config::PohConfig,
|
||||
signature::{Keypair, KeypairUtil},
|
||||
};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
fs, iter,
|
||||
@ -248,7 +246,7 @@ fn run_cluster_partition(
|
||||
};
|
||||
|
||||
let validator_pubkeys: Vec<_> = validator_keys.iter().map(|v| v.pubkey()).collect();
|
||||
let mut config = ClusterConfig {
|
||||
let config = ClusterConfig {
|
||||
cluster_lamports,
|
||||
node_stakes,
|
||||
validator_configs: vec![validator_config.clone(); num_nodes],
|
||||
@ -256,71 +254,65 @@ fn run_cluster_partition(
|
||||
..ClusterConfig::default()
|
||||
};
|
||||
|
||||
let now = timestamp();
|
||||
// Partition needs to start after the first few shorter warmup epochs, otherwise
|
||||
// no root will be set before the partition is resolved, the leader schedule will
|
||||
// not be computable, and the cluster wll halt.
|
||||
let partition_epoch_start_offset = cluster_tests::time_until_nth_epoch(
|
||||
partition_start_epoch,
|
||||
config.slots_per_epoch,
|
||||
config.stakers_slot_offset,
|
||||
let enable_partition = Some(Arc::new(AtomicBool::new(true)));
|
||||
info!(
|
||||
"PARTITION_TEST starting cluster with {:?} partitions slots_per_epoch: {}",
|
||||
partitions, config.slots_per_epoch,
|
||||
);
|
||||
// Assume it takes <= 10 seconds for `LocalCluster::new` to boot up.
|
||||
let local_cluster_boot_time = 10_000;
|
||||
let partition_start = now + partition_epoch_start_offset + local_cluster_boot_time;
|
||||
let partition_end = partition_start + leader_schedule_time as u64;
|
||||
let mut validator_index = 0;
|
||||
for (i, partition) in partitions.iter().enumerate() {
|
||||
for _ in partition.iter() {
|
||||
let mut p1 = Partition::default();
|
||||
p1.num_partitions = partitions.len();
|
||||
p1.my_partition = i;
|
||||
p1.start_ts = partition_start;
|
||||
p1.end_ts = partition_end;
|
||||
config.validator_configs[validator_index].partition_cfg =
|
||||
Some(PartitionCfg::new(vec![p1]));
|
||||
validator_index += 1;
|
||||
let mut cluster = LocalCluster::new(&config);
|
||||
|
||||
let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, num_nodes).unwrap();
|
||||
|
||||
info!("PARTITION_TEST sleeping until partition starting condition",);
|
||||
loop {
|
||||
let mut reached_epoch = true;
|
||||
for node in &cluster_nodes {
|
||||
let node_client = RpcClient::new_socket(node.rpc);
|
||||
if let Ok(epoch_info) = node_client.get_epoch_info() {
|
||||
info!("slots_per_epoch: {:?}", epoch_info);
|
||||
if epoch_info.slots_in_epoch <= (1 << VOTE_THRESHOLD_DEPTH) {
|
||||
reached_epoch = false;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
reached_epoch = false;
|
||||
}
|
||||
}
|
||||
|
||||
if reached_epoch {
|
||||
info!("PARTITION_TEST start partition");
|
||||
enable_partition
|
||||
.clone()
|
||||
.unwrap()
|
||||
.store(false, Ordering::Relaxed);
|
||||
break;
|
||||
} else {
|
||||
sleep(Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"PARTITION_TEST starting cluster with {:?} partitions",
|
||||
partitions
|
||||
);
|
||||
let now = Instant::now();
|
||||
let mut cluster = LocalCluster::new(&config);
|
||||
let elapsed = now.elapsed();
|
||||
assert!(elapsed.as_millis() < local_cluster_boot_time as u128);
|
||||
sleep(Duration::from_millis(leader_schedule_time));
|
||||
|
||||
info!("PARTITION_TEST remove partition");
|
||||
enable_partition.unwrap().store(true, Ordering::Relaxed);
|
||||
|
||||
let now = timestamp();
|
||||
let timeout = partition_start as u64 - now as u64;
|
||||
info!(
|
||||
"PARTITION_TEST sleeping until partition start timeout {}",
|
||||
timeout
|
||||
);
|
||||
let mut dead_nodes = HashSet::new();
|
||||
if timeout > 0 {
|
||||
sleep(Duration::from_millis(timeout as u64));
|
||||
}
|
||||
info!("PARTITION_TEST done sleeping until partition start timeout");
|
||||
let now = timestamp();
|
||||
let timeout = partition_end as u64 - now as u64;
|
||||
info!(
|
||||
"PARTITION_TEST sleeping until partition end timeout {}",
|
||||
timeout
|
||||
);
|
||||
let mut alive_node_contact_infos = vec![];
|
||||
let should_exits: Vec<_> = partitions
|
||||
.iter()
|
||||
.flat_map(|p| p.iter().map(|(_, should_exit)| should_exit))
|
||||
.collect();
|
||||
assert_eq!(should_exits.len(), validator_pubkeys.len());
|
||||
let timeout = 10;
|
||||
if timeout > 0 {
|
||||
// Give partitions time to propagate their blocks from durinig the partition
|
||||
// Give partitions time to propagate their blocks from during the partition
|
||||
// after the partition resolves
|
||||
let propagation_time = leader_schedule_time;
|
||||
info!("PARTITION_TEST resolving partition");
|
||||
sleep(Duration::from_millis(timeout));
|
||||
info!("PARTITION_TEST waiting for blocks to propagate after partition");
|
||||
info!("PARTITION_TEST resolving partition. sleeping {}ms", timeout);
|
||||
sleep(Duration::from_millis(10_000));
|
||||
info!(
|
||||
"PARTITION_TEST waiting for blocks to propagate after partition {}ms",
|
||||
propagation_time
|
||||
);
|
||||
sleep(Duration::from_millis(propagation_time));
|
||||
info!("PARTITION_TEST resuming normal operation");
|
||||
for (pubkey, should_exit) in validator_pubkeys.iter().zip(should_exits) {
|
||||
@ -353,6 +345,7 @@ fn run_cluster_partition(
|
||||
info!("PARTITION_TEST looking for new roots on all nodes");
|
||||
let mut roots = vec![HashSet::new(); alive_node_contact_infos.len()];
|
||||
let mut done = false;
|
||||
let mut last_print = Instant::now();
|
||||
while !done {
|
||||
for (i, ingress_node) in alive_node_contact_infos.iter().enumerate() {
|
||||
let client = create_client(
|
||||
@ -362,12 +355,15 @@ fn run_cluster_partition(
|
||||
let slot = client.get_slot().unwrap_or(0);
|
||||
roots[i].insert(slot);
|
||||
let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0);
|
||||
info!("PARTITION_TEST min observed roots {}/16", min_node);
|
||||
if last_print.elapsed().as_secs() > 3 {
|
||||
info!("PARTITION_TEST min observed roots {}/16", min_node);
|
||||
last_print = Instant::now();
|
||||
}
|
||||
done = min_node >= 16;
|
||||
}
|
||||
sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2));
|
||||
}
|
||||
info!("PARTITION_TEST done spending on all node");
|
||||
info!("PARTITION_TEST done waiting for roots");
|
||||
}
|
||||
|
||||
#[allow(unused_attributes)]
|
||||
@ -424,6 +420,7 @@ fn test_kill_partition() {
|
||||
leader_schedule.push(k.pubkey())
|
||||
}
|
||||
}
|
||||
info!("leader_schedule: {}", leader_schedule.len());
|
||||
|
||||
run_cluster_partition(
|
||||
&partitions,
|
||||
|
Reference in New Issue
Block a user