diff --git a/core/src/cluster_tests.rs b/core/src/cluster_tests.rs index abc9a58b54..8df551a075 100644 --- a/core/src/cluster_tests.rs +++ b/core/src/cluster_tests.rs @@ -7,9 +7,12 @@ use crate::contact_info::ContactInfo; use crate::gossip_service::discover; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; +use solana_sdk::timing::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND}; use std::thread::sleep; use std::time::Duration; +const SLOT_MILLIS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / NUM_TICKS_PER_SECOND; + /// Spend and verify from every node in the network pub fn spend_and_verify_all_nodes( entry_point_info: &ContactInfo, @@ -49,9 +52,56 @@ pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) { let mut client = mk_client(&node); assert!(client.fullnode_exit().unwrap()); } - sleep(Duration::from_millis(250)); + sleep(Duration::from_millis(SLOT_MILLIS)); for node in &cluster_nodes { let mut client = mk_client(&node); assert!(client.fullnode_exit().is_err()); } } + +pub fn kill_entry_and_spend_and_verify_rest( + entry_point_info: &ContactInfo, + funding_keypair: &Keypair, + nodes: usize, +) { + solana_logger::setup(); + let cluster_nodes = discover(&entry_point_info, nodes); + assert!(cluster_nodes.len() >= nodes); + let mut client = mk_client(&entry_point_info); + info!("sleeping for an epoch"); + sleep(Duration::from_millis(SLOT_MILLIS * DEFAULT_SLOTS_PER_EPOCH)); + info!("done sleeping for an epoch"); + info!("killing entry point"); + assert!(client.fullnode_exit().unwrap()); + info!("sleeping for a slot"); + sleep(Duration::from_millis(SLOT_MILLIS)); + info!("done sleeping for a slot"); + for ingress_node in &cluster_nodes { + if ingress_node.id == entry_point_info.id { + continue; + } + let random_keypair = Keypair::new(); + let mut client = mk_client(&ingress_node); + let bal = client + .poll_get_balance(&funding_keypair.pubkey()) + .expect("balance in source"); + assert!(bal > 0); + let mut transaction = SystemTransaction::new_move( + &funding_keypair, + random_keypair.pubkey(), + 1, + client.get_recent_blockhash(), + 0, + ); + let sig = client + .retry_transfer(&funding_keypair, &mut transaction, 5) + .unwrap(); + for validator in &cluster_nodes { + if validator.id == entry_point_info.id { + continue; + } + let mut client = mk_client(&validator); + client.poll_for_signature(&sig).unwrap(); + } + } +} diff --git a/core/src/leader_schedule_utils.rs b/core/src/leader_schedule_utils.rs index 8104cbf6b1..977f1d0d10 100644 --- a/core/src/leader_schedule_utils.rs +++ b/core/src/leader_schedule_utils.rs @@ -45,6 +45,10 @@ pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 { bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1 } +pub fn tick_height_to_slot(bank: &Bank, tick_height: u64) -> u64 { + tick_height / bank.ticks_per_slot() +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index b6541b6894..d297ab4a42 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -41,6 +41,8 @@ impl LocalCluster { lamports_per_node: u64, fullnode_config: &FullnodeConfig, ) -> Self { + // Must have enough tokens to fund vote account and set delegate + assert!(lamports_per_node > 2); let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); @@ -83,8 +85,13 @@ impl LocalCluster { validator_pubkey, validator_balance ); - Self::create_and_fund_vote_account(&mut client, &voting_keypair, &validator_keypair, 1) - .unwrap(); + Self::create_and_fund_vote_account( + &mut client, + &voting_keypair, + &validator_keypair, + lamports_per_node - 1, + ) + .unwrap(); let validator_server = Fullnode::new( validator_node, &validator_keypair, diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index b6291daaed..85ca3002f2 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -61,6 +61,7 @@ impl PohRecorder { } // synchronize PoH with a bank pub fn reset(&mut self, tick_height: u64, blockhash: Hash) { + self.clear_bank(); let existing = self.tick_cache.iter().any(|(entry, entry_tick_height)| { if entry.hash == blockhash { assert_eq!(*entry_tick_height, tick_height); @@ -489,4 +490,19 @@ mod tests { poh_recorder.tick(); assert_eq!(poh_recorder.poh.tick_height, 2); } + + #[test] + fn test_reset_clear_bank() { + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); + let working_bank = WorkingBank { + bank, + min_tick_height: 2, + max_tick_height: 3, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.reset(1, hash(b"hello")); + assert!(poh_recorder.working_bank.is_none()); + } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index a0a610d849..ec668c8d57 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -79,9 +79,7 @@ impl ReplayStage { .name("solana-replay-stage".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_.clone()); - let mut first_block = false; let mut progress = HashMap::new(); - loop { let now = Instant::now(); // Stop getting entries if we get exit signal @@ -115,16 +113,6 @@ impl ReplayStage { } } } - let frozen = bank_forks.read().unwrap().frozen_banks(); - if votable.is_empty() - && frozen.len() == 1 - && active_banks.is_empty() - && !first_block - { - first_block = true; - votable.extend(frozen.keys()); - info!("voting on first block {:?}", votable); - } // TODO: fork selection // vote on the latest one for now votable.sort(); @@ -136,10 +124,6 @@ impl ReplayStage { .get(*latest_slot_vote) .unwrap() .clone(); - let next_slot = *latest_slot_vote + 1; - let next_leader_id = - leader_schedule_utils::slot_leader_at(next_slot, &parent); - cluster_info.write().unwrap().set_leader(next_leader_id); subscriptions.notify_subscribers(&parent); @@ -157,31 +141,9 @@ impl ReplayStage { .lock() .unwrap() .reset(parent.tick_height(), parent.last_blockhash()); - - if next_leader_id == my_id { - let frozen = bank_forks.read().unwrap().frozen_banks(); - assert!(frozen.get(&next_slot).is_none()); - assert!(bank_forks.read().unwrap().get(next_slot).is_none()); - - let tpu_bank = Bank::new_from_parent(&parent, my_id, next_slot); - bank_forks.write().unwrap().insert(next_slot, tpu_bank); - if let Some(tpu_bank) = - bank_forks.read().unwrap().get(next_slot).cloned() - { - assert_eq!( - bank_forks.read().unwrap().working_bank().slot(), - tpu_bank.slot() - ); - debug!( - "new working bank: me: {} next_slot: {} next_leader: {}", - my_id, - tpu_bank.slot(), - next_leader_id - ); - poh_recorder.lock().unwrap().set_bank(&tpu_bank); - } - } } + + Self::start_leader(my_id, &bank_forks, &poh_recorder, &cluster_info); inc_new_counter_info!( "replicate_stage-duration", duration_as_ms(&now.elapsed()) as usize @@ -203,7 +165,52 @@ impl ReplayStage { forward_entry_receiver, ) } + pub fn start_leader( + my_id: Pubkey, + bank_forks: &Arc>, + poh_recorder: &Arc>, + cluster_info: &Arc>, + ) { + let frozen = bank_forks.read().unwrap().frozen_banks(); + // TODO: fork selection + let mut newest_frozen: Vec<(&u64, &Arc)> = frozen.iter().collect(); + newest_frozen.sort_by_key(|x| *x.0); + if let Some((_, parent)) = newest_frozen.last() { + let poh_tick_height = poh_recorder.lock().unwrap().tick_height(); + let poh_slot = leader_schedule_utils::tick_height_to_slot(parent, poh_tick_height + 1); + assert!(frozen.get(&poh_slot).is_none()); + trace!("checking poh slot for leader {}", poh_slot); + if bank_forks.read().unwrap().get(poh_slot).is_none() { + let next_leader = leader_schedule_utils::slot_leader_at(poh_slot, parent); + debug!( + "me: {} leader {} at poh slot {}", + my_id, next_leader, poh_slot + ); + cluster_info.write().unwrap().set_leader(next_leader); + if next_leader == my_id { + debug!("starting tpu for slot {}", poh_slot); + let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot); + bank_forks.write().unwrap().insert(poh_slot, tpu_bank); + if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() { + assert_eq!( + bank_forks.read().unwrap().working_bank().slot(), + tpu_bank.slot() + ); + debug!( + "poh_recorder new working bank: me: {} next_slot: {} next_leader: {}", + my_id, + tpu_bank.slot(), + next_leader + ); + poh_recorder.lock().unwrap().set_bank(&tpu_bank); + } + } + } + } else { + error!("No frozen banks available!"); + } + } pub fn replay_blocktree_into_bank( bank: &Bank, blocktree: &Blocktree, diff --git a/sdk/src/timing.rs b/sdk/src/timing.rs index f992143c43..f25cde2944 100644 --- a/sdk/src/timing.rs +++ b/sdk/src/timing.rs @@ -6,8 +6,8 @@ pub const NUM_TICKS_PER_SECOND: u64 = 10; // At 10 ticks/s, 8 ticks per slot implies that leader rotation and voting will happen // every 800 ms. A fast voting cadence ensures faster finality and convergence -pub const DEFAULT_TICKS_PER_SLOT: u64 = 80; -pub const DEFAULT_SLOTS_PER_EPOCH: u64 = 64; +pub const DEFAULT_TICKS_PER_SLOT: u64 = 16; +pub const DEFAULT_SLOTS_PER_EPOCH: u64 = 16; /// The time window of recent block hash values that the bank will track the signatures /// of over. Once the bank discards a block hash, it will reject any transactions that use diff --git a/tests/local_cluster.rs b/tests/local_cluster.rs index a79294647b..0f13bface2 100644 --- a/tests/local_cluster.rs +++ b/tests/local_cluster.rs @@ -6,7 +6,7 @@ use solana::local_cluster::LocalCluster; use solana::rpc::JsonRpcConfig; #[test] -fn test_spend_and_verify_all_nodes_1() -> () { +fn test_spend_and_verify_all_nodes_1() { solana_logger::setup(); let num_nodes = 1; let local = LocalCluster::new(num_nodes, 10_000, 100); @@ -18,7 +18,7 @@ fn test_spend_and_verify_all_nodes_1() -> () { } #[test] -fn test_spend_and_verify_all_nodes_2() -> () { +fn test_spend_and_verify_all_nodes_2() { solana_logger::setup(); let num_nodes = 2; let local = LocalCluster::new(num_nodes, 10_000, 100); @@ -30,7 +30,7 @@ fn test_spend_and_verify_all_nodes_2() -> () { } #[test] -fn test_spend_and_verify_all_nodes_3() -> () { +fn test_spend_and_verify_all_nodes_3() { solana_logger::setup(); let num_nodes = 3; let local = LocalCluster::new(num_nodes, 10_000, 100); @@ -43,7 +43,7 @@ fn test_spend_and_verify_all_nodes_3() -> () { #[test] #[should_panic] -fn test_fullnode_exit_safe_config_should_panic_2() -> () { +fn test_fullnode_exit_default_config_should_panic() { solana_logger::setup(); let num_nodes = 2; let local = LocalCluster::new(num_nodes, 10_000, 100); @@ -51,7 +51,7 @@ fn test_fullnode_exit_safe_config_should_panic_2() -> () { } #[test] -fn test_fullnode_exit_unsafe_config_2() -> () { +fn test_fullnode_exit_2() { solana_logger::setup(); let num_nodes = 2; let mut fullnode_exit = FullnodeConfig::default(); @@ -59,3 +59,29 @@ fn test_fullnode_exit_unsafe_config_2() -> () { let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_exit); cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes); } + +#[test] +fn test_leader_failure_2() { + let num_nodes = 2; + let mut fullnode_exit = FullnodeConfig::default(); + fullnode_exit.rpc_config = JsonRpcConfig::TestOnlyAllowRpcFullnodeExit; + let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_exit); + cluster_tests::kill_entry_and_spend_and_verify_rest( + &local.entry_point_info, + &local.funding_keypair, + num_nodes, + ); +} + +#[test] +fn test_leader_failure_3() { + let num_nodes = 3; + let mut fullnode_exit = FullnodeConfig::default(); + fullnode_exit.rpc_config = JsonRpcConfig::TestOnlyAllowRpcFullnodeExit; + let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_exit); + cluster_tests::kill_entry_and_spend_and_verify_rest( + &local.entry_point_info, + &local.funding_keypair, + num_nodes, + ); +}