diff --git a/core/src/cluster.rs b/core/src/cluster.rs new file mode 100644 index 0000000000..7142792355 --- /dev/null +++ b/core/src/cluster.rs @@ -0,0 +1,6 @@ +use solana_sdk::pubkey::Pubkey; + +pub trait Cluster { + fn get_node_ids(&self) -> Vec; + fn restart_node(&mut self, pubkey: Pubkey); +} diff --git a/core/src/cluster_tests.rs b/core/src/cluster_tests.rs index e9153c675c..577bc563eb 100644 --- a/core/src/cluster_tests.rs +++ b/core/src/cluster_tests.rs @@ -8,12 +8,13 @@ use crate::contact_info::ContactInfo; use crate::entry::{Entry, EntrySlice}; use crate::gossip_service::discover; use crate::locktower::VOTE_THRESHOLD_DEPTH; +use crate::poh_service::PohServiceConfig; use solana_client::thin_client::create_client; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::timing::{ - DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS, NUM_TICKS_PER_SECOND, + duration_as_ms, DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS, NUM_TICKS_PER_SECOND, }; use std::io; use std::thread::sleep; @@ -122,6 +123,25 @@ pub fn verify_ledger_ticks(ledger_path: &str, ticks_per_slot: usize) { } } +pub fn sleep_n_epochs( + num_epochs: f64, + config: &PohServiceConfig, + ticks_per_slot: u64, + slots_per_epoch: u64, +) { + let num_ticks_per_second = { + match config { + PohServiceConfig::Sleep(d) => (1000 / duration_as_ms(d)) as f64, + _ => panic!("Unsuppported tick config for testing"), + } + }; + + let num_ticks_to_sleep = num_epochs * ticks_per_slot as f64 * slots_per_epoch as f64; + sleep(Duration::from_secs( + ((num_ticks_to_sleep + num_ticks_per_second - 1.0) / num_ticks_per_second) as u64, + )); +} + pub fn kill_entry_and_spend_and_verify_rest( entry_point_info: &ContactInfo, funding_keypair: &Keypair, diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 785777b648..df11b1ed13 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -34,6 +34,7 @@ use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex, RwLock}; use std::thread::Result; +#[derive(Clone)] pub struct FullnodeConfig { pub sigverify_disabled: bool, pub voting_disabled: bool, @@ -248,15 +249,6 @@ impl Fullnode { // Used for notifying many nodes in parallel to exit pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); - - // Need to force the poh_recorder to drop the WorkingBank, - // which contains the channel to BroadcastStage. This should be - // sufficient as long as no other rotations are happening that - // can cause the Tpu to restart a BankingStage and reset a - // WorkingBank in poh_recorder. It follows no other rotations can be - // in motion because exit()/close() are only called by the run() loop - // which is the sole initiator of rotations. - self.poh_recorder.lock().unwrap().clear_bank(); } pub fn close(self) -> Result<()> { diff --git a/core/src/lib.rs b/core/src/lib.rs index e3b7325224..96d0b6d679 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -27,6 +27,7 @@ pub mod blocktree; pub mod blockstream; pub mod blockstream_service; pub mod blocktree_processor; +pub mod cluster; pub mod cluster_info; pub mod cluster_tests; pub mod db_window; diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index ecb4f2a040..49a036183f 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -1,4 +1,5 @@ use crate::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree}; +use crate::cluster::Cluster; use crate::cluster_info::{Node, FULLNODE_PORT_RANGE}; use crate::contact_info::ContactInfo; use crate::fullnode::{Fullnode, FullnodeConfig}; @@ -15,20 +16,37 @@ use solana_sdk::timing::DEFAULT_SLOTS_PER_EPOCH; use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; use solana_vote_api::vote_state::VoteState; use solana_vote_api::vote_transaction::VoteTransaction; +use std::collections::HashMap; use std::fs::remove_dir_all; use std::io::{Error, ErrorKind, Result}; use std::sync::Arc; +pub struct FullnodeInfo { + pub keypair: Arc, + pub ledger_path: String, +} + +impl FullnodeInfo { + fn new(keypair: Arc, ledger_path: String) -> Self { + Self { + keypair, + ledger_path, + } + } +} + pub struct LocalCluster { /// Keypair with funding to particpiate in the network pub funding_keypair: Keypair, + pub fullnode_config: FullnodeConfig, /// Entry point from which the rest of the network can be discovered pub entry_point_info: ContactInfo, - pub ledger_paths: Vec, - fullnodes: Vec, - replicators: Vec, + pub fullnode_infos: HashMap, + fullnodes: HashMap, genesis_ledger_path: String, genesis_block: GenesisBlock, + replicators: Vec, + pub replicator_ledger_paths: Vec, } impl LocalCluster { @@ -86,9 +104,6 @@ impl LocalCluster { genesis_block.slots_per_epoch = slots_per_epoch; let (genesis_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); let leader_ledger_path = tmp_copy_blocktree!(&genesis_ledger_path); - let mut ledger_paths = vec![]; - ledger_paths.push(genesis_ledger_path.clone()); - ledger_paths.push(leader_ledger_path.clone()); let voting_keypair = Keypair::new(); let leader_contact_info = leader_node.info.clone(); @@ -102,16 +117,24 @@ impl LocalCluster { fullnode_config, ); - let fullnodes = vec![leader_server]; + let mut fullnodes = HashMap::new(); + let mut fullnode_infos = HashMap::new(); + fullnodes.insert(leader_pubkey, leader_server); + fullnode_infos.insert( + leader_pubkey, + FullnodeInfo::new(leader_keypair.clone(), leader_ledger_path), + ); let mut cluster = Self { funding_keypair: mint_keypair, entry_point_info: leader_contact_info, fullnodes, replicators: vec![], - ledger_paths, + replicator_ledger_paths: vec![], genesis_ledger_path, genesis_block, + fullnode_infos, + fullnode_config: fullnode_config.clone(), }; for stake in &node_stakes[1..] { @@ -128,14 +151,14 @@ impl LocalCluster { } pub fn exit(&self) { - for node in &self.fullnodes { + for node in self.fullnodes.values() { node.exit(); } } pub fn close_preserve_ledgers(&mut self) { self.exit(); - while let Some(node) = self.fullnodes.pop() { + for (_, node) in self.fullnodes.drain() { node.join().unwrap(); } @@ -157,7 +180,6 @@ impl LocalCluster { let validator_pubkey = validator_keypair.pubkey(); let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey()); let ledger_path = tmp_copy_blocktree!(&self.genesis_ledger_path); - self.ledger_paths.push(ledger_path.clone()); // Send each validator some lamports to vote let validator_balance = @@ -180,7 +202,12 @@ impl LocalCluster { fullnode_config, ); - self.fullnodes.push(validator_server); + self.fullnodes + .insert(validator_keypair.pubkey(), validator_server); + self.fullnode_infos.insert( + validator_keypair.pubkey(), + FullnodeInfo::new(validator_keypair.clone(), ledger_path), + ); } fn add_replicator(&mut self) { @@ -208,15 +235,20 @@ impl LocalCluster { ) .unwrap(); - self.ledger_paths.push(replicator_ledger_path); - + self.replicator_ledger_paths.push(replicator_ledger_path); self.replicators.push(replicator); } fn close(&mut self) { self.close_preserve_ledgers(); - for path in &self.ledger_paths { - remove_dir_all(path).unwrap_or_else(|_| panic!("Unable to remove {}", path)); + for ledger_path in self + .fullnode_infos + .values() + .map(|f| &f.ledger_path) + .chain(self.replicator_ledger_paths.iter()) + { + remove_dir_all(&ledger_path) + .unwrap_or_else(|_| panic!("Unable to remove {}", ledger_path)); } } @@ -300,6 +332,38 @@ impl LocalCluster { } } +impl Cluster for LocalCluster { + fn restart_node(&mut self, pubkey: Pubkey) { + // Shut down the fullnode + let node = self.fullnodes.remove(&pubkey).unwrap(); + node.exit(); + node.join().unwrap(); + + // Restart the node + let fullnode_info = &self.fullnode_infos[&pubkey]; + let node = Node::new_localhost_with_pubkey(&fullnode_info.keypair.pubkey()); + if pubkey == self.entry_point_info.id { + self.entry_point_info = node.info.clone(); + } + let new_voting_keypair = Keypair::new(); + let restarted_node = Fullnode::new( + node, + &fullnode_info.keypair, + &fullnode_info.ledger_path, + &new_voting_keypair.pubkey(), + new_voting_keypair, + None, + &self.fullnode_config, + ); + + self.fullnodes.insert(pubkey, restarted_node); + } + + fn get_node_ids(&self) -> Vec { + self.fullnodes.keys().cloned().collect() + } +} + impl Drop for LocalCluster { fn drop(&mut self) { self.close(); diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 84f52d7f40..2e389ae842 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -232,6 +232,7 @@ impl PohRecorder { working_bank.bank.slot() ); self.start_slot = working_bank.max_tick_height / working_bank.bank.ticks_per_slot(); + self.start_tick = (self.start_slot + 1) * working_bank.bank.ticks_per_slot(); self.clear_bank(); } if e.is_err() { diff --git a/core/tests/local_cluster.rs b/core/tests/local_cluster.rs index 44a345cdc7..24346a1090 100644 --- a/core/tests/local_cluster.rs +++ b/core/tests/local_cluster.rs @@ -1,11 +1,12 @@ extern crate solana; +use solana::cluster::Cluster; use solana::cluster_tests; use solana::fullnode::FullnodeConfig; use solana::gossip_service::discover; use solana::local_cluster::LocalCluster; use solana::poh_service::PohServiceConfig; -use std::thread::sleep; +use solana_sdk::timing; use std::time::Duration; #[test] @@ -93,14 +94,16 @@ fn test_two_unbalanced_stakes() { num_ticks_per_slot, num_slots_per_epoch, ); - let num_epochs_to_sleep = 10; - let num_ticks_to_sleep = num_epochs_to_sleep * num_ticks_per_slot * num_slots_per_epoch; - sleep(Duration::from_millis( - num_ticks_to_sleep / num_ticks_per_second as u64 * 100, - )); + cluster_tests::sleep_n_epochs( + 10.0, + &fullnode_config.tick_config, + num_ticks_per_slot, + num_slots_per_epoch, + ); cluster.close_preserve_ledgers(); - let leader_ledger = cluster.ledger_paths[1].clone(); + let leader_id = cluster.entry_point_info.id; + let leader_ledger = cluster.fullnode_infos[&leader_id].ledger_path.clone(); cluster_tests::verify_ledger_ticks(&leader_ledger, num_ticks_per_slot as usize); } @@ -122,3 +125,32 @@ fn test_forwarding() { // Confirm that transactions were forwarded to and processed by the leader. cluster_tests::send_many_transactions(&validator_info, &cluster.funding_keypair, 20); } + +#[test] +fn test_restart_node() { + let fullnode_config = FullnodeConfig::default(); + let slots_per_epoch = 8; + let ticks_per_slot = 16; + let mut cluster = LocalCluster::new_with_tick_config( + &[3], + 100, + &fullnode_config, + ticks_per_slot, + slots_per_epoch, + ); + let nodes = cluster.get_node_ids(); + cluster_tests::sleep_n_epochs( + 1.0, + &fullnode_config.tick_config, + timing::DEFAULT_TICKS_PER_SLOT, + slots_per_epoch, + ); + cluster.restart_node(nodes[0]); + cluster_tests::sleep_n_epochs( + 0.5, + &fullnode_config.tick_config, + timing::DEFAULT_TICKS_PER_SLOT, + slots_per_epoch, + ); + cluster_tests::send_many_transactions(&cluster.entry_point_info, &cluster.funding_keypair, 1); +}