Restart node test (#3459) (#3465)

* Restart node test (#3459)

* Add test to local_cluster for restarting a node

* fix so that we don't hit end of epoch - leader not found before trying to transfer

* Do not look for confirmations, b/c nobody is voting on empty transmissions in this single node test
This commit is contained in:
carllin
2019-03-23 19:19:55 -07:00
committed by GitHub
parent e1a3708844
commit 68c35bfde6
7 changed files with 149 additions and 33 deletions

6
core/src/cluster.rs Normal file
View File

@ -0,0 +1,6 @@
use solana_sdk::pubkey::Pubkey;
pub trait Cluster {
fn get_node_ids(&self) -> Vec<Pubkey>;
fn restart_node(&mut self, pubkey: Pubkey);
}

View File

@ -8,12 +8,13 @@ use crate::contact_info::ContactInfo;
use crate::entry::{Entry, EntrySlice}; use crate::entry::{Entry, EntrySlice};
use crate::gossip_service::discover; use crate::gossip_service::discover;
use crate::locktower::VOTE_THRESHOLD_DEPTH; use crate::locktower::VOTE_THRESHOLD_DEPTH;
use crate::poh_service::PohServiceConfig;
use solana_client::thin_client::create_client; use solana_client::thin_client::create_client;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::{ use solana_sdk::timing::{
DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS, NUM_TICKS_PER_SECOND, duration_as_ms, DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS, NUM_TICKS_PER_SECOND,
}; };
use std::io; use std::io;
use std::thread::sleep; use std::thread::sleep;
@ -122,6 +123,25 @@ pub fn verify_ledger_ticks(ledger_path: &str, ticks_per_slot: usize) {
} }
} }
pub fn sleep_n_epochs(
num_epochs: f64,
config: &PohServiceConfig,
ticks_per_slot: u64,
slots_per_epoch: u64,
) {
let num_ticks_per_second = {
match config {
PohServiceConfig::Sleep(d) => (1000 / duration_as_ms(d)) as f64,
_ => panic!("Unsuppported tick config for testing"),
}
};
let num_ticks_to_sleep = num_epochs * ticks_per_slot as f64 * slots_per_epoch as f64;
sleep(Duration::from_secs(
((num_ticks_to_sleep + num_ticks_per_second - 1.0) / num_ticks_per_second) as u64,
));
}
pub fn kill_entry_and_spend_and_verify_rest( pub fn kill_entry_and_spend_and_verify_rest(
entry_point_info: &ContactInfo, entry_point_info: &ContactInfo,
funding_keypair: &Keypair, funding_keypair: &Keypair,

View File

@ -34,6 +34,7 @@ use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::Result; use std::thread::Result;
#[derive(Clone)]
pub struct FullnodeConfig { pub struct FullnodeConfig {
pub sigverify_disabled: bool, pub sigverify_disabled: bool,
pub voting_disabled: bool, pub voting_disabled: bool,
@ -248,15 +249,6 @@ impl Fullnode {
// Used for notifying many nodes in parallel to exit // Used for notifying many nodes in parallel to exit
pub fn exit(&self) { pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
// Need to force the poh_recorder to drop the WorkingBank,
// which contains the channel to BroadcastStage. This should be
// sufficient as long as no other rotations are happening that
// can cause the Tpu to restart a BankingStage and reset a
// WorkingBank in poh_recorder. It follows no other rotations can be
// in motion because exit()/close() are only called by the run() loop
// which is the sole initiator of rotations.
self.poh_recorder.lock().unwrap().clear_bank();
} }
pub fn close(self) -> Result<()> { pub fn close(self) -> Result<()> {

View File

@ -27,6 +27,7 @@ pub mod blocktree;
pub mod blockstream; pub mod blockstream;
pub mod blockstream_service; pub mod blockstream_service;
pub mod blocktree_processor; pub mod blocktree_processor;
pub mod cluster;
pub mod cluster_info; pub mod cluster_info;
pub mod cluster_tests; pub mod cluster_tests;
pub mod db_window; pub mod db_window;

View File

@ -1,4 +1,5 @@
use crate::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree}; use crate::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree};
use crate::cluster::Cluster;
use crate::cluster_info::{Node, FULLNODE_PORT_RANGE}; use crate::cluster_info::{Node, FULLNODE_PORT_RANGE};
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::fullnode::{Fullnode, FullnodeConfig}; use crate::fullnode::{Fullnode, FullnodeConfig};
@ -15,20 +16,37 @@ use solana_sdk::timing::DEFAULT_SLOTS_PER_EPOCH;
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
use solana_vote_api::vote_state::VoteState; use solana_vote_api::vote_state::VoteState;
use solana_vote_api::vote_transaction::VoteTransaction; use solana_vote_api::vote_transaction::VoteTransaction;
use std::collections::HashMap;
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
use std::io::{Error, ErrorKind, Result}; use std::io::{Error, ErrorKind, Result};
use std::sync::Arc; use std::sync::Arc;
pub struct FullnodeInfo {
pub keypair: Arc<Keypair>,
pub ledger_path: String,
}
impl FullnodeInfo {
fn new(keypair: Arc<Keypair>, ledger_path: String) -> Self {
Self {
keypair,
ledger_path,
}
}
}
pub struct LocalCluster { pub struct LocalCluster {
/// Keypair with funding to particpiate in the network /// Keypair with funding to particpiate in the network
pub funding_keypair: Keypair, pub funding_keypair: Keypair,
pub fullnode_config: FullnodeConfig,
/// Entry point from which the rest of the network can be discovered /// Entry point from which the rest of the network can be discovered
pub entry_point_info: ContactInfo, pub entry_point_info: ContactInfo,
pub ledger_paths: Vec<String>, pub fullnode_infos: HashMap<Pubkey, FullnodeInfo>,
fullnodes: Vec<Fullnode>, fullnodes: HashMap<Pubkey, Fullnode>,
replicators: Vec<Replicator>,
genesis_ledger_path: String, genesis_ledger_path: String,
genesis_block: GenesisBlock, genesis_block: GenesisBlock,
replicators: Vec<Replicator>,
pub replicator_ledger_paths: Vec<String>,
} }
impl LocalCluster { impl LocalCluster {
@ -86,9 +104,6 @@ impl LocalCluster {
genesis_block.slots_per_epoch = slots_per_epoch; genesis_block.slots_per_epoch = slots_per_epoch;
let (genesis_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); let (genesis_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
let leader_ledger_path = tmp_copy_blocktree!(&genesis_ledger_path); let leader_ledger_path = tmp_copy_blocktree!(&genesis_ledger_path);
let mut ledger_paths = vec![];
ledger_paths.push(genesis_ledger_path.clone());
ledger_paths.push(leader_ledger_path.clone());
let voting_keypair = Keypair::new(); let voting_keypair = Keypair::new();
let leader_contact_info = leader_node.info.clone(); let leader_contact_info = leader_node.info.clone();
@ -102,16 +117,24 @@ impl LocalCluster {
fullnode_config, fullnode_config,
); );
let fullnodes = vec![leader_server]; let mut fullnodes = HashMap::new();
let mut fullnode_infos = HashMap::new();
fullnodes.insert(leader_pubkey, leader_server);
fullnode_infos.insert(
leader_pubkey,
FullnodeInfo::new(leader_keypair.clone(), leader_ledger_path),
);
let mut cluster = Self { let mut cluster = Self {
funding_keypair: mint_keypair, funding_keypair: mint_keypair,
entry_point_info: leader_contact_info, entry_point_info: leader_contact_info,
fullnodes, fullnodes,
replicators: vec![], replicators: vec![],
ledger_paths, replicator_ledger_paths: vec![],
genesis_ledger_path, genesis_ledger_path,
genesis_block, genesis_block,
fullnode_infos,
fullnode_config: fullnode_config.clone(),
}; };
for stake in &node_stakes[1..] { for stake in &node_stakes[1..] {
@ -128,14 +151,14 @@ impl LocalCluster {
} }
pub fn exit(&self) { pub fn exit(&self) {
for node in &self.fullnodes { for node in self.fullnodes.values() {
node.exit(); node.exit();
} }
} }
pub fn close_preserve_ledgers(&mut self) { pub fn close_preserve_ledgers(&mut self) {
self.exit(); self.exit();
while let Some(node) = self.fullnodes.pop() { for (_, node) in self.fullnodes.drain() {
node.join().unwrap(); node.join().unwrap();
} }
@ -157,7 +180,6 @@ impl LocalCluster {
let validator_pubkey = validator_keypair.pubkey(); let validator_pubkey = validator_keypair.pubkey();
let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey()); let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
let ledger_path = tmp_copy_blocktree!(&self.genesis_ledger_path); let ledger_path = tmp_copy_blocktree!(&self.genesis_ledger_path);
self.ledger_paths.push(ledger_path.clone());
// Send each validator some lamports to vote // Send each validator some lamports to vote
let validator_balance = let validator_balance =
@ -180,7 +202,12 @@ impl LocalCluster {
fullnode_config, fullnode_config,
); );
self.fullnodes.push(validator_server); self.fullnodes
.insert(validator_keypair.pubkey(), validator_server);
self.fullnode_infos.insert(
validator_keypair.pubkey(),
FullnodeInfo::new(validator_keypair.clone(), ledger_path),
);
} }
fn add_replicator(&mut self) { fn add_replicator(&mut self) {
@ -208,15 +235,20 @@ impl LocalCluster {
) )
.unwrap(); .unwrap();
self.ledger_paths.push(replicator_ledger_path); self.replicator_ledger_paths.push(replicator_ledger_path);
self.replicators.push(replicator); self.replicators.push(replicator);
} }
fn close(&mut self) { fn close(&mut self) {
self.close_preserve_ledgers(); self.close_preserve_ledgers();
for path in &self.ledger_paths { for ledger_path in self
remove_dir_all(path).unwrap_or_else(|_| panic!("Unable to remove {}", path)); .fullnode_infos
.values()
.map(|f| &f.ledger_path)
.chain(self.replicator_ledger_paths.iter())
{
remove_dir_all(&ledger_path)
.unwrap_or_else(|_| panic!("Unable to remove {}", ledger_path));
} }
} }
@ -300,6 +332,38 @@ impl LocalCluster {
} }
} }
impl Cluster for LocalCluster {
fn restart_node(&mut self, pubkey: Pubkey) {
// Shut down the fullnode
let node = self.fullnodes.remove(&pubkey).unwrap();
node.exit();
node.join().unwrap();
// Restart the node
let fullnode_info = &self.fullnode_infos[&pubkey];
let node = Node::new_localhost_with_pubkey(&fullnode_info.keypair.pubkey());
if pubkey == self.entry_point_info.id {
self.entry_point_info = node.info.clone();
}
let new_voting_keypair = Keypair::new();
let restarted_node = Fullnode::new(
node,
&fullnode_info.keypair,
&fullnode_info.ledger_path,
&new_voting_keypair.pubkey(),
new_voting_keypair,
None,
&self.fullnode_config,
);
self.fullnodes.insert(pubkey, restarted_node);
}
fn get_node_ids(&self) -> Vec<Pubkey> {
self.fullnodes.keys().cloned().collect()
}
}
impl Drop for LocalCluster { impl Drop for LocalCluster {
fn drop(&mut self) { fn drop(&mut self) {
self.close(); self.close();

View File

@ -232,6 +232,7 @@ impl PohRecorder {
working_bank.bank.slot() working_bank.bank.slot()
); );
self.start_slot = working_bank.max_tick_height / working_bank.bank.ticks_per_slot(); self.start_slot = working_bank.max_tick_height / working_bank.bank.ticks_per_slot();
self.start_tick = (self.start_slot + 1) * working_bank.bank.ticks_per_slot();
self.clear_bank(); self.clear_bank();
} }
if e.is_err() { if e.is_err() {

View File

@ -1,11 +1,12 @@
extern crate solana; extern crate solana;
use solana::cluster::Cluster;
use solana::cluster_tests; use solana::cluster_tests;
use solana::fullnode::FullnodeConfig; use solana::fullnode::FullnodeConfig;
use solana::gossip_service::discover; use solana::gossip_service::discover;
use solana::local_cluster::LocalCluster; use solana::local_cluster::LocalCluster;
use solana::poh_service::PohServiceConfig; use solana::poh_service::PohServiceConfig;
use std::thread::sleep; use solana_sdk::timing;
use std::time::Duration; use std::time::Duration;
#[test] #[test]
@ -93,14 +94,16 @@ fn test_two_unbalanced_stakes() {
num_ticks_per_slot, num_ticks_per_slot,
num_slots_per_epoch, num_slots_per_epoch,
); );
let num_epochs_to_sleep = 10; cluster_tests::sleep_n_epochs(
let num_ticks_to_sleep = num_epochs_to_sleep * num_ticks_per_slot * num_slots_per_epoch; 10.0,
sleep(Duration::from_millis( &fullnode_config.tick_config,
num_ticks_to_sleep / num_ticks_per_second as u64 * 100, num_ticks_per_slot,
)); num_slots_per_epoch,
);
cluster.close_preserve_ledgers(); cluster.close_preserve_ledgers();
let leader_ledger = cluster.ledger_paths[1].clone(); let leader_id = cluster.entry_point_info.id;
let leader_ledger = cluster.fullnode_infos[&leader_id].ledger_path.clone();
cluster_tests::verify_ledger_ticks(&leader_ledger, num_ticks_per_slot as usize); cluster_tests::verify_ledger_ticks(&leader_ledger, num_ticks_per_slot as usize);
} }
@ -122,3 +125,32 @@ fn test_forwarding() {
// Confirm that transactions were forwarded to and processed by the leader. // Confirm that transactions were forwarded to and processed by the leader.
cluster_tests::send_many_transactions(&validator_info, &cluster.funding_keypair, 20); cluster_tests::send_many_transactions(&validator_info, &cluster.funding_keypair, 20);
} }
#[test]
fn test_restart_node() {
let fullnode_config = FullnodeConfig::default();
let slots_per_epoch = 8;
let ticks_per_slot = 16;
let mut cluster = LocalCluster::new_with_tick_config(
&[3],
100,
&fullnode_config,
ticks_per_slot,
slots_per_epoch,
);
let nodes = cluster.get_node_ids();
cluster_tests::sleep_n_epochs(
1.0,
&fullnode_config.tick_config,
timing::DEFAULT_TICKS_PER_SLOT,
slots_per_epoch,
);
cluster.restart_node(nodes[0]);
cluster_tests::sleep_n_epochs(
0.5,
&fullnode_config.tick_config,
timing::DEFAULT_TICKS_PER_SLOT,
slots_per_epoch,
);
cluster_tests::send_many_transactions(&cluster.entry_point_info, &cluster.funding_keypair, 1);
}