From 68a8b955bc9dcc5bd77285584d888381db59219f Mon Sep 17 00:00:00 2001 From: Carl Date: Fri, 22 Mar 2019 18:05:52 -0700 Subject: [PATCH] Add test to local_cluster for restarting a node --- core/src/cluster_tests.rs | 20 ++++++++++ core/src/fullnode.rs | 10 +---- core/src/lib.rs | 1 + core/src/local_cluster.rs | 84 +++++++++++++++++++++++++++++++++------ tests/local_cluster.rs | 49 +++++++++++++++++++---- 5 files changed, 135 insertions(+), 29 deletions(-) diff --git a/core/src/cluster_tests.rs b/core/src/cluster_tests.rs index 1eaff2b944..8ad0ca716d 100644 --- a/core/src/cluster_tests.rs +++ b/core/src/cluster_tests.rs @@ -7,6 +7,7 @@ use crate::cluster_info::FULLNODE_PORT_RANGE; use crate::contact_info::ContactInfo; use crate::entry::{Entry, EntrySlice}; use crate::gossip_service::discover; +use crate::poh_service::PohServiceConfig; use solana_client::client::create_client; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -117,6 +118,25 @@ pub fn verify_ledger_ticks(ledger_path: &str, ticks_per_slot: usize) { } } +pub fn sleep_n_epochs( + num_epochs: u64, + config: &PohServiceConfig, + ticks_per_slot: u64, + slots_per_epoch: u64, +) { + let num_ticks_per_second = { + match config { + PohServiceConfig::Sleep(d) => (1000 / d.as_millis()) as u64, + _ => panic!("Unsuppported tick config for testing"), + } + }; + + let num_ticks_to_sleep = num_epochs * ticks_per_slot * slots_per_epoch; + sleep(Duration::from_secs( + (num_ticks_to_sleep + num_ticks_per_second - 1) / num_ticks_per_second, + )); +} + pub fn kill_entry_and_spend_and_verify_rest( entry_point_info: &ContactInfo, funding_keypair: &Keypair, diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index a3c9426c92..08715ab6ba 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -37,6 +37,7 @@ use std::thread::JoinHandle; use std::thread::{spawn, Result}; use std::time::Duration; +#[derive(Clone)] pub struct FullnodeConfig { pub sigverify_disabled: bool, pub voting_disabled: bool, @@ -265,15 +266,6 @@ impl Fullnode { // Used for notifying many nodes in parallel to exit pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); - - // Need to force the poh_recorder to drop the WorkingBank, - // which contains the channel to BroadcastStage. This should be - // sufficient as long as no other rotations are happening that - // can cause the Tpu to restart a BankingStage and reset a - // WorkingBank in poh_recorder. It follows no other rotations can be - // in motion because exit()/close() are only called by the run() loop - // which is the sole initiator of rotations. - self.poh_recorder.lock().unwrap().clear_bank(); } pub fn close(self) -> Result<()> { diff --git a/core/src/lib.rs b/core/src/lib.rs index 065a0ae8ae..3b8e656dde 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -27,6 +27,7 @@ pub mod blocktree; pub mod blockstream; pub mod blockstream_service; pub mod blocktree_processor; +pub mod cluster; pub mod cluster_info; pub mod cluster_tests; pub mod db_window; diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index 267e9cbe35..7daaf760f9 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -1,4 +1,5 @@ use crate::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree}; +use crate::cluster::Cluster; use crate::cluster_info::{Node, FULLNODE_PORT_RANGE}; use crate::contact_info::ContactInfo; use crate::fullnode::{Fullnode, FullnodeConfig}; @@ -14,17 +15,33 @@ use solana_sdk::timing::DEFAULT_SLOTS_PER_EPOCH; use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; use solana_vote_api::vote_state::VoteState; use solana_vote_api::vote_transaction::VoteTransaction; +use std::collections::HashMap; use std::fs::remove_dir_all; use std::io::{Error, ErrorKind, Result}; use std::sync::Arc; +pub struct FullnodeInfo { + pub keypair: Arc, + pub ledger_path: String, +} + +impl FullnodeInfo { + fn new(keypair: Arc, ledger_path: String) -> Self { + Self { + keypair, + ledger_path, + } + } +} + pub struct LocalCluster { /// Keypair with funding to particpiate in the network pub funding_keypair: Keypair, + pub fullnode_config: FullnodeConfig, /// Entry point from which the rest of the network can be discovered pub entry_point_info: ContactInfo, - pub ledger_paths: Vec, - fullnodes: Vec, + pub fullnodes: HashMap, + pub fullnode_infos: HashMap, } impl LocalCluster { @@ -63,9 +80,6 @@ impl LocalCluster { genesis_block.slots_per_epoch = slots_per_epoch; let (genesis_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); let leader_ledger_path = tmp_copy_blocktree!(&genesis_ledger_path); - let mut ledger_paths = vec![]; - ledger_paths.push(genesis_ledger_path.clone()); - ledger_paths.push(leader_ledger_path.clone()); let voting_keypair = Keypair::new(); let leader_contact_info = leader_node.info.clone(); let leader_server = Fullnode::new( @@ -77,7 +91,14 @@ impl LocalCluster { None, fullnode_config, ); - let mut fullnodes = vec![leader_server]; + let mut fullnodes = HashMap::new(); + let mut fullnode_infos = HashMap::new(); + fullnodes.insert(leader_pubkey, leader_server); + fullnode_infos.insert( + leader_pubkey, + FullnodeInfo::new(leader_keypair.clone(), leader_ledger_path), + ); + let mut client = create_client( leader_contact_info.client_facing_addr(), FULLNODE_PORT_RANGE, @@ -90,7 +111,6 @@ impl LocalCluster { let validator_pubkey = validator_keypair.pubkey(); let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey()); let ledger_path = tmp_copy_blocktree!(&genesis_ledger_path); - ledger_paths.push(ledger_path.clone()); // Send each validator some lamports to vote let validator_balance = @@ -116,34 +136,40 @@ impl LocalCluster { Some(&leader_contact_info), fullnode_config, ); - fullnodes.push(validator_server); + fullnodes.insert(validator_keypair.pubkey(), validator_server); + fullnode_infos.insert( + validator_keypair.pubkey(), + FullnodeInfo::new(leader_keypair.clone(), ledger_path), + ); } discover(&leader_contact_info.gossip, node_stakes.len()).unwrap(); Self { funding_keypair: mint_keypair, entry_point_info: leader_contact_info, fullnodes, - ledger_paths, + fullnode_config: fullnode_config.clone(), + fullnode_infos, } } pub fn exit(&self) { - for node in &self.fullnodes { + for node in self.fullnodes.values() { node.exit(); } } pub fn close_preserve_ledgers(&mut self) { self.exit(); - while let Some(node) = self.fullnodes.pop() { + for (_, node) in self.fullnodes.drain() { node.join().unwrap(); } } fn close(&mut self) { self.close_preserve_ledgers(); - for path in &self.ledger_paths { - remove_dir_all(path).unwrap_or_else(|_| panic!("Unable to remove {}", path)); + for (_, info) in &self.fullnode_infos { + remove_dir_all(&info.ledger_path) + .unwrap_or_else(|_| panic!("Unable to remove {}", info.ledger_path)); } } @@ -223,6 +249,38 @@ impl LocalCluster { } } +impl Cluster for LocalCluster { + fn restart_node(&mut self, pubkey: Pubkey) { + // Shut down the fullnode + let node = self.fullnodes.remove(&pubkey).unwrap(); + node.exit(); + node.join().unwrap(); + + // Restart the node + let fullnode_info = &self.fullnode_infos[&pubkey]; + let node = Node::new_localhost_with_pubkey(&fullnode_info.keypair.pubkey()); + if pubkey == self.entry_point_info.id { + self.entry_point_info = node.info.clone(); + } + let new_voting_keypair = Keypair::new(); + let restarted_node = Fullnode::new( + node, + &fullnode_info.keypair, + &fullnode_info.ledger_path, + &new_voting_keypair.pubkey(), + new_voting_keypair, + None, + &self.fullnode_config, + ); + + self.fullnodes.insert(pubkey, restarted_node); + } + + fn get_node_ids(&self) -> Vec { + self.fullnodes.keys().cloned().collect() + } +} + impl Drop for LocalCluster { fn drop(&mut self) { self.close(); diff --git a/tests/local_cluster.rs b/tests/local_cluster.rs index 8efe60bf08..bf5cdeaf69 100644 --- a/tests/local_cluster.rs +++ b/tests/local_cluster.rs @@ -1,11 +1,12 @@ extern crate solana; +use solana::cluster::Cluster; use solana::cluster_tests; use solana::fullnode::FullnodeConfig; use solana::gossip_service::discover; use solana::local_cluster::LocalCluster; use solana::poh_service::PohServiceConfig; -use std::thread::sleep; +use solana_sdk::timing; use std::time::Duration; #[test] @@ -109,14 +110,16 @@ fn test_two_unbalanced_stakes() { num_ticks_per_slot, num_slots_per_epoch, ); - let num_epochs_to_sleep = 10; - let num_ticks_to_sleep = num_epochs_to_sleep * num_ticks_per_slot * num_slots_per_epoch; - sleep(Duration::from_millis( - num_ticks_to_sleep / num_ticks_per_second as u64 * 100, - )); + cluster_tests::sleep_n_epochs( + 10, + &fullnode_config.tick_config, + num_ticks_per_slot, + num_slots_per_epoch, + ); cluster.close_preserve_ledgers(); - let leader_ledger = cluster.ledger_paths[1].clone(); + let leader_id = cluster.entry_point_info.id; + let leader_ledger = cluster.fullnode_infos[&leader_id].ledger_path.clone(); cluster_tests::verify_ledger_ticks(&leader_ledger, num_ticks_per_slot as usize); } @@ -137,3 +140,35 @@ fn test_forwarding() { // Confirm that transactions were forwarded to and processed by the leader. cluster_tests::send_many_transactions(&validator_info, &cluster.funding_keypair, 20); } + +#[test] +fn test_restart_node() { + let fullnode_config = FullnodeConfig::default(); + let slots_per_epoch = 8; + let mut cluster = LocalCluster::new_with_tick_config( + &[3], + 100, + &fullnode_config, + timing::DEFAULT_TICKS_PER_SLOT, + slots_per_epoch, + ); + let nodes = cluster.get_node_ids(); + cluster_tests::sleep_n_epochs( + 1, + &fullnode_config.tick_config, + timing::DEFAULT_TICKS_PER_SLOT, + slots_per_epoch, + ); + cluster.restart_node(nodes[0]); + cluster_tests::sleep_n_epochs( + 1, + &fullnode_config.tick_config, + timing::DEFAULT_TICKS_PER_SLOT, + slots_per_epoch, + ); + cluster_tests::spend_and_verify_all_nodes( + &cluster.entry_point_info, + &cluster.funding_keypair, + 1, + ); +}