diff --git a/Cargo.lock b/Cargo.lock index 95abed21c5..70f1f87a46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1583,6 +1583,11 @@ dependencies = [ "unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "indexed" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "indexmap" version = "1.3.1" @@ -3779,6 +3784,7 @@ dependencies = [ "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-fs 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "trees 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "untrusted 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -5523,6 +5529,14 @@ name = "treeline" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "trees" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "indexed 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "try-lock" version = "0.2.2" @@ -6129,6 +6143,7 @@ dependencies = [ "checksum hyper-rustls 0.19.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b89109920197f2c90d75e82addbb96bf424570790d310cc2b18f0b33f4a9cc43" "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" "checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" +"checksum indexed 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d480125acf340d6a6e59dab69ae19d6fca3a906e1eade277671272cc8f73794b" "checksum indexmap 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0b54058f0a6ff80b6803da8faf8997cde53872b38f4023728f6830b06cd3c0dc" "checksum indicatif 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8572bccfb0665e70b7faf44ee28841b8e0823450cd4ad562a76b5a3c4bf48487" "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" @@ -6416,6 +6431,7 @@ dependencies = [ "checksum toml 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c04dffffeac90885436d23c692517bb5b8b3f8863e4afc15023628d067d667b7" "checksum tower-service 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" "checksum treeline 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41" +"checksum trees 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "afa1821e85be4f56cc5bd08bdbc32c0e26d105c90bed9c637992f6c7f747c180" "checksum try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" "checksum try_from 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "283d3b89e1368717881a9d51dad843cc435380d8109c9e47d38780a324698d8b" "checksum typed-arena 1.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9c0704a799d314795d3d847d519b284bae681ef9b1f3da99f7ebc7b47ba2e607" diff --git a/core/Cargo.toml b/core/Cargo.toml index 8c93f02d74..af226ca34d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -71,6 +71,7 @@ tokio-io = "0.1" untrusted = "0.7.0" solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.24.0" } reed-solomon-erasure = { package = "solana-reed-solomon-erasure", version = "4.0.1-3", features = ["simd-accel"] } +trees = "0.2.1" [dev-dependencies] matches = "0.1.6" diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 16b16f7e8d..4867fb0a68 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -63,6 +63,17 @@ impl Tower { tower } + pub fn new_with_key(node_pubkey: &Pubkey) -> Self { + Self { + node_pubkey: *node_pubkey, + threshold_depth: VOTE_THRESHOLD_DEPTH, + threshold_size: VOTE_THRESHOLD_SIZE, + lockouts: VoteState::default(), + last_vote: Vote::default(), + last_timestamp: BlockTimestamp::default(), + } + } + #[cfg(test)] pub fn new_for_tests(threshold_depth: usize, threshold_size: f64) -> Self { Self { @@ -284,7 +295,6 @@ impl Tower { assert!(ancestors.contains_key(&slot)); if !self.is_recent(slot) { - trace!("slot is not recent: {}", slot); return true; } @@ -463,7 +473,257 @@ impl Tower { #[cfg(test)] mod test { use super::*; + use crate::replay_stage::{ForkProgress, ReplayStage}; + use solana_ledger::bank_forks::BankForks; + use solana_runtime::{ + bank::Bank, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }; + use solana_sdk::{ + clock::Slot, + hash::Hash, + pubkey::Pubkey, + signature::{Keypair, KeypairUtil}, + transaction::Transaction, + }; + use solana_stake_program::stake_state; + use solana_vote_program::vote_state; + use solana_vote_program::{vote_instruction, vote_state::Vote}; + use std::collections::{HashMap, VecDeque}; + use std::sync::RwLock; use std::{thread::sleep, time::Duration}; + use trees::{tr, Node, Tree}; + + struct ValidatorKeypairs { + node_keypair: Keypair, + vote_keypair: Keypair, + } + + impl ValidatorKeypairs { + fn new(node_keypair: Keypair, vote_keypair: Keypair) -> Self { + Self { + node_keypair, + vote_keypair, + } + } + } + + struct VoteSimulator<'a> { + searchable_nodes: HashMap>, + } + + impl<'a> VoteSimulator<'a> { + pub fn new(forks: &'a Tree) -> Self { + let mut searchable_nodes = HashMap::new(); + let root = forks.root(); + searchable_nodes.insert(root.data, root); + Self { searchable_nodes } + } + + pub fn simulate_vote( + &mut self, + vote_slot: Slot, + bank_forks: &RwLock, + cluster_votes: &mut HashMap>, + validator_keypairs: &HashMap, + my_keypairs: &ValidatorKeypairs, + progress: &mut HashMap, + tower: &mut Tower, + ) -> VoteResult { + let node = self + .find_node_and_update_simulation(vote_slot) + .expect("Vote to simulate must be for a slot in the tree"); + + let mut missing_nodes = VecDeque::new(); + let mut current = node; + loop { + let current_slot = current.data; + if bank_forks.read().unwrap().get(current_slot).is_some() + || tower.root().map(|r| current_slot < r).unwrap_or(false) + { + break; + } else { + missing_nodes.push_front(current); + } + + if let Some(parent) = current.parent() { + current = parent; + } else { + break; + } + } + + // Create any missing banks along the path + for missing_node in missing_nodes { + let missing_slot = missing_node.data; + let parent = missing_node.parent().unwrap().data; + let parent_bank = bank_forks + .read() + .unwrap() + .get(parent) + .expect("parent bank must exist") + .clone(); + info!("parent of {} is {}", missing_slot, parent_bank.slot(),); + progress + .entry(missing_slot) + .or_insert_with(|| ForkProgress::new(parent_bank.last_blockhash())); + + // Create the missing bank + let new_bank = + Bank::new_from_parent(&parent_bank, &Pubkey::default(), missing_slot); + + // Simulate ingesting the cluster's votes for the parent into this bank + for (pubkey, vote) in cluster_votes.iter() { + if vote.contains(&parent_bank.slot()) { + let keypairs = validator_keypairs.get(pubkey).unwrap(); + let node_pubkey = keypairs.node_keypair.pubkey(); + let vote_pubkey = keypairs.vote_keypair.pubkey(); + let last_blockhash = parent_bank.last_blockhash(); + let votes = Vote::new(vec![parent_bank.slot()], parent_bank.hash()); + info!("voting {} {}", parent_bank.slot(), parent_bank.hash()); + let vote_ix = vote_instruction::vote(&vote_pubkey, &vote_pubkey, votes); + let mut vote_tx = + Transaction::new_with_payer(vec![vote_ix], Some(&node_pubkey)); + vote_tx.partial_sign(&[&keypairs.node_keypair], last_blockhash); + vote_tx.partial_sign(&[&keypairs.vote_keypair], last_blockhash); + new_bank.process_transaction(&vote_tx).unwrap(); + } + } + new_bank.freeze(); + bank_forks.write().unwrap().insert(new_bank); + } + + // Now try to simulate the vote + let my_pubkey = my_keypairs.node_keypair.pubkey(); + let my_vote_pubkey = my_keypairs.vote_keypair.pubkey(); + let ancestors = bank_forks.read().unwrap().ancestors(); + ReplayStage::select_fork(&my_pubkey, &ancestors, &bank_forks, tower, progress); + + let bank = bank_forks + .read() + .unwrap() + .get(vote_slot) + .expect("Bank must have been created before vote simulation") + .clone(); + // Make sure this slot isn't locked out or failing threshold + let fork_progress = progress + .get(&vote_slot) + .expect("Slot for vote must exist in progress map"); + info!("Checking vote: {}", vote_slot); + info!("lockouts: {:?}", fork_progress.fork_stats.stake_lockouts); + if fork_progress.fork_stats.is_locked_out && !fork_progress.fork_stats.vote_threshold { + return VoteResult::FailedAllChecks(vote_slot); + } else if fork_progress.fork_stats.is_locked_out { + return VoteResult::LockedOut(vote_slot); + } else if !fork_progress.fork_stats.vote_threshold { + return VoteResult::FailedThreshold(vote_slot); + } + let vote = tower.new_vote_from_bank(&bank, &my_vote_pubkey).0; + if let Some(new_root) = tower.record_bank_vote(vote) { + ReplayStage::handle_new_root(new_root, bank_forks, progress, &None); + } + + // Mark the vote for this bank under this node's pubkey so it will be + // integrated into any future child banks + cluster_votes.entry(my_pubkey).or_default().push(vote_slot); + VoteResult::Ok + } + + // Find a node representing the given slot + fn find_node_and_update_simulation(&mut self, slot: u64) -> Option<&'a Node> { + let mut successful_search_node: Option<&'a Node> = None; + let mut found_node = None; + for search_node in self.searchable_nodes.values() { + if let Some((target, new_searchable_nodes)) = Self::find_node(search_node, slot) { + successful_search_node = Some(search_node); + found_node = Some(target); + for node in new_searchable_nodes { + self.searchable_nodes.insert(node.data, node); + } + break; + } + } + successful_search_node.map(|node| { + self.searchable_nodes.remove(&node.data); + }); + found_node + } + + fn find_node( + node: &'a Node, + slot: u64, + ) -> Option<(&'a Node, Vec<&'a Node>)> { + if node.data == slot { + Some((node, node.iter().collect())) + } else { + let mut search_result: Option<(&'a Node, Vec<&'a Node>)> = None; + for child in node.iter() { + if let Some((_, ref mut new_searchable_nodes)) = search_result { + new_searchable_nodes.push(child); + continue; + } + search_result = Self::find_node(child, slot); + } + + search_result + } + } + } + + #[derive(PartialEq, Debug)] + enum VoteResult { + LockedOut(u64), + FailedThreshold(u64), + FailedAllChecks(u64), + Ok, + } + + // Setup BankForks with banks including all the votes per validator as + // specified in the input `validator_votes` + fn initialize_state( + validator_votes: &HashMap>, + validator_keypairs: &HashMap, + ) -> (BankForks, HashMap) { + assert!(validator_votes.len() < 1_000_000); + + let GenesisConfigInfo { + mut genesis_config, + mint_keypair, + voting_keypair: _, + } = create_genesis_config(1_000_000_000); + + // Initialize BankForks + for keypairs in validator_keypairs.values() { + let node_pubkey = keypairs.node_keypair.pubkey(); + let vote_pubkey = keypairs.vote_keypair.pubkey(); + + let stake_key = Pubkey::new_rand(); + let vote_account = vote_state::create_account(&vote_pubkey, &node_pubkey, 0, 100); + let stake_account = stake_state::create_account( + &Pubkey::new_rand(), + &vote_pubkey, + &vote_account, + &genesis_config.rent, + 100, + ); + + genesis_config.accounts.extend(vec![ + (vote_pubkey, vote_account.clone()), + (stake_key, stake_account), + ]); + } + + let bank0 = Bank::new(&genesis_config); + + for pubkey in validator_keypairs.keys() { + bank0.transfer(10_000, &mint_keypair, pubkey).unwrap(); + } + + bank0.freeze(); + let mut progress = HashMap::new(); + progress.insert(0, ForkProgress::new(bank0.last_blockhash())); + (BankForks::new(0, bank0), progress) + } fn gen_stakes(stake_votes: &[(u64, &[u64])]) -> Vec<(Pubkey, (u64, Account))> { let mut stakes = vec![]; @@ -483,6 +743,198 @@ mod test { stakes } + fn can_progress_on_fork( + my_pubkey: &Pubkey, + tower: &mut Tower, + start_slot: u64, + num_slots: u64, + bank_forks: &RwLock, + cluster_votes: &mut HashMap>, + keypairs: &HashMap, + progress: &mut HashMap, + ) -> bool { + // Check that within some reasonable time, validator can make a new + // root on this fork + let old_root = tower.root(); + let mut main_fork = tr(start_slot); + let mut tip = main_fork.root_mut(); + + for i in 1..num_slots { + tip.push_front(tr(start_slot + i)); + tip = tip.first_mut().unwrap(); + } + let mut voting_simulator = VoteSimulator::new(&main_fork); + for i in 1..num_slots { + voting_simulator.simulate_vote( + i + start_slot, + &bank_forks, + cluster_votes, + &keypairs, + keypairs.get(&my_pubkey).unwrap(), + progress, + tower, + ); + if old_root != tower.root() { + return true; + } + } + + false + } + + #[test] + fn test_simple_votes() { + let node_keypair = Keypair::new(); + let vote_keypair = Keypair::new(); + let node_pubkey = node_keypair.pubkey(); + + let mut keypairs = HashMap::new(); + keypairs.insert( + node_pubkey, + ValidatorKeypairs::new(node_keypair, vote_keypair), + ); + + // Initialize BankForks + let (bank_forks, mut progress) = initialize_state(&HashMap::new(), &keypairs); + let bank_forks = RwLock::new(bank_forks); + + // Create the tree of banks + let forks = tr(0) / (tr(1) / (tr(2) / (tr(3) / (tr(4) / tr(5))))); + + // Set the voting behavior + let mut voting_simulator = VoteSimulator::new(&forks); + let votes = vec![0, 1, 2, 3, 4, 5]; + + // Simulate the votes + let mut tower = Tower::new_with_key(&node_pubkey); + + let mut cluster_votes = HashMap::new(); + for vote in votes { + assert_eq!( + VoteResult::Ok, + voting_simulator.simulate_vote( + vote, + &bank_forks, + &mut cluster_votes, + &keypairs, + keypairs.get(&node_pubkey).unwrap(), + &mut progress, + &mut tower, + ) + ); + } + + for i in 0..5 { + assert_eq!(tower.lockouts.votes[i].slot as usize, i); + assert_eq!(tower.lockouts.votes[i].confirmation_count as usize, 6 - i); + } + } + + #[test] + fn test_double_partition() { + let node_keypair = Keypair::new(); + let vote_keypair = Keypair::new(); + let node_pubkey = node_keypair.pubkey(); + let vote_pubkey = vote_keypair.pubkey(); + + let mut keypairs = HashMap::new(); + info!("my_pubkey: {}", node_pubkey); + keypairs.insert( + node_pubkey, + ValidatorKeypairs::new(node_keypair, vote_keypair), + ); + + // Create the tree of banks in a BankForks object + let forks = tr(0) + / (tr(1) + / (tr(2) + / (tr(3) + / (tr(4) + / (tr(5) + / (tr(6) + / (tr(7) + / (tr(8) + / (tr(9) + // Minor fork 1 + / (tr(10) / (tr(11) / (tr(12) / (tr(13) / (tr(14)))))) + / (tr(43) + / (tr(44) + // Minor fork 2 + / (tr(45) / (tr(46) / (tr(47) / (tr(48) / (tr(49) / (tr(50))))))) + / (tr(110))))))))))))); + + // Set the voting behavior + let mut voting_simulator = VoteSimulator::new(&forks); + let mut votes: Vec = vec![]; + // Vote on the first minor fork + votes.extend((0..=14).into_iter()); + // Come back to the main fork + votes.extend((43..=44).into_iter()); + // Vote on the second minor fork + votes.extend((45..=50).into_iter()); + + let mut cluster_votes: HashMap> = HashMap::new(); + cluster_votes.insert(node_pubkey, votes.clone()); + let (bank_forks, mut progress) = initialize_state(&cluster_votes, &keypairs); + let bank_forks = RwLock::new(bank_forks); + + // Simulate the votes. Should fail on trying to come back to the main fork + // at 106 exclusively due to threshold failure + let mut tower = Tower::new_with_key(&node_pubkey); + for vote in &votes { + // All these votes should be ok + assert_eq!( + voting_simulator.simulate_vote( + *vote, + &bank_forks, + &mut cluster_votes, + &keypairs, + keypairs.get(&node_pubkey).unwrap(), + &mut progress, + &mut tower, + ), + VoteResult::Ok + ); + } + + // Try to come back to main fork + let next_unlocked_slot = 110; + assert_eq!( + voting_simulator.simulate_vote( + next_unlocked_slot, + &bank_forks, + &mut cluster_votes, + &keypairs, + keypairs.get(&node_pubkey).unwrap(), + &mut progress, + &mut tower, + ), + VoteResult::Ok + ); + + info!("local tower: {:#?}", tower.lockouts.votes); + let vote_accounts = bank_forks + .read() + .unwrap() + .get(next_unlocked_slot) + .unwrap() + .vote_accounts(); + let observed = vote_accounts.get(&vote_pubkey).unwrap(); + let state = VoteState::from(&observed.1).unwrap(); + info!("observed tower: {:#?}", state.votes); + + assert!(can_progress_on_fork( + &node_pubkey, + &mut tower, + next_unlocked_slot, + 200, + &bank_forks, + &mut cluster_votes, + &keypairs, + &mut progress + )); + } + #[test] fn test_collect_vote_lockouts_sums() { //two accounts voting for slot 0 with 1 token staked diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 188fef87fb..db718ae904 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -87,7 +87,7 @@ pub struct ReplayStage { } #[derive(Default)] -pub struct ReplaySlotStats(ConfirmationTiming); +pub(crate) struct ReplaySlotStats(ConfirmationTiming); impl std::ops::Deref for ReplaySlotStats { type Target = ConfirmationTiming; fn deref(&self) -> &Self::Target { @@ -101,7 +101,7 @@ impl std::ops::DerefMut for ReplaySlotStats { } #[derive(Debug, Clone, Default)] -struct ForkStats { +pub(crate) struct ForkStats { weight: u128, fork_weight: u128, total_staked: u64, @@ -110,9 +110,9 @@ struct ForkStats { has_voted: bool, is_recent: bool, is_empty: bool, - vote_threshold: bool, - is_locked_out: bool, - stake_lockouts: HashMap, + pub(crate) vote_threshold: bool, + pub(crate) is_locked_out: bool, + pub(crate) stake_lockouts: HashMap, computed: bool, confirmation_reported: bool, } @@ -141,9 +141,9 @@ impl ReplaySlotStats { } } -struct ForkProgress { +pub(crate) struct ForkProgress { is_dead: bool, - fork_stats: ForkStats, + pub(crate) fork_stats: ForkStats, replay_stats: ReplaySlotStats, replay_progress: ConfirmationProgress, } @@ -588,11 +588,7 @@ impl ReplayStage { blockstore .set_roots(&rooted_slots) .expect("Ledger set roots failed"); - bank_forks - .write() - .unwrap() - .set_root(new_root, snapshot_package_sender); - Self::handle_new_root(&bank_forks, progress); + Self::handle_new_root(new_root, &bank_forks, progress, snapshot_package_sender); latest_root_senders.iter().for_each(|s| { if let Err(e) = s.send(new_root) { trace!("latest root send failed: {:?}", e); @@ -742,10 +738,10 @@ impl ReplayStage { did_complete_bank } - fn select_fork( + pub(crate) fn select_fork( my_pubkey: &Pubkey, ancestors: &HashMap>, - bank_forks: &Arc>, + bank_forks: &RwLock, tower: &Tower, progress: &mut HashMap, ) -> VoteAndPoHBank { @@ -786,7 +782,13 @@ impl ReplayStage { bank.vote_accounts().into_iter(), &ancestors, ); - Self::confirm_forks(tower, &stake_lockouts, total_staked, progress, bank_forks); + Self::confirm_forks( + tower, + &stake_lockouts, + total_staked, + progress, + &bank_forks, + ); stats.total_staked = total_staked; stats.weight = bank_weight; stats.fork_weight = stats.weight @@ -880,7 +882,7 @@ impl ReplayStage { stake_lockouts: &HashMap, total_staked: u64, progress: &mut HashMap, - bank_forks: &Arc>, + bank_forks: &RwLock, ) { for (slot, prog) in progress.iter_mut() { if !prog.fork_stats.confirmation_reported { @@ -908,10 +910,16 @@ impl ReplayStage { } } - fn handle_new_root( - bank_forks: &Arc>, + pub(crate) fn handle_new_root( + new_root: u64, + bank_forks: &RwLock, progress: &mut HashMap, + snapshot_package_sender: &Option, ) { + bank_forks + .write() + .unwrap() + .set_root(new_root, snapshot_package_sender); let r_bank_forks = bank_forks.read().unwrap(); progress.retain(|k, _| r_bank_forks.get(*k).is_some()); } @@ -1354,10 +1362,21 @@ pub(crate) mod tests { let genesis_config = create_genesis_config(10_000).genesis_config; let bank0 = Bank::new(&genesis_config); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0))); + let root = 3; + let root_bank = Bank::new_from_parent( + bank_forks.read().unwrap().get(0).unwrap(), + &Pubkey::default(), + root, + ); + bank_forks.write().unwrap().insert(root_bank); let mut progress = HashMap::new(); - progress.insert(5, ForkProgress::new(Hash::default())); - ReplayStage::handle_new_root(&bank_forks, &mut progress); - assert!(progress.is_empty()); + for i in 0..=root { + progress.insert(i, ForkProgress::new(Hash::default())); + } + ReplayStage::handle_new_root(root, &bank_forks, &mut progress, &None); + assert_eq!(bank_forks.read().unwrap().root(), root); + assert_eq!(progress.len(), 1); + assert!(progress.get(&root).is_some()); } #[test]