diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index 353868dbcc..503f4138fd 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -32,39 +32,6 @@ pub struct BankForks { root: u64, snapshot_config: Option, slots_since_snapshot: Vec, - confidence: HashMap, -} - -#[derive(Debug, Default, PartialEq)] -pub struct Confidence { - fork_stakes: u64, - epoch_stakes: u64, - lockouts: u64, - stake_weighted_lockouts: u128, -} - -impl Confidence { - pub fn new(fork_stakes: u64, epoch_stakes: u64, lockouts: u64) -> Self { - Self { - fork_stakes, - epoch_stakes, - lockouts, - stake_weighted_lockouts: 0, - } - } - pub fn new_with_stake_weighted( - fork_stakes: u64, - epoch_stakes: u64, - lockouts: u64, - stake_weighted_lockouts: u128, - ) -> Self { - Self { - fork_stakes, - epoch_stakes, - lockouts, - stake_weighted_lockouts, - } - } } impl Index for BankForks { @@ -85,7 +52,6 @@ impl BankForks { root: 0, snapshot_config: None, slots_since_snapshot: vec![bank_slot], - confidence: HashMap::new(), } } @@ -161,7 +127,6 @@ impl BankForks { working_bank, snapshot_config: None, slots_since_snapshot: rooted_path, - confidence: HashMap::new(), } } @@ -309,43 +274,6 @@ impl BankForks { let descendants = self.descendants(); self.banks .retain(|slot, _| slot == &root || descendants[&root].contains(slot)); - self.confidence - .retain(|slot, _| slot == &root || descendants[&root].contains(slot)); - } - - pub fn cache_fork_confidence( - &mut self, - fork: u64, - fork_stakes: u64, - epoch_stakes: u64, - lockouts: u64, - ) { - self.confidence - .entry(fork) - .and_modify(|entry| { - entry.fork_stakes = fork_stakes; - entry.epoch_stakes = epoch_stakes; - entry.lockouts = lockouts; - }) - .or_insert_with(|| Confidence::new(fork_stakes, epoch_stakes, lockouts)); - } - - pub fn cache_stake_weighted_lockouts(&mut self, fork: u64, stake_weighted_lockouts: u128) { - self.confidence - .entry(fork) - .and_modify(|entry| { - entry.stake_weighted_lockouts = stake_weighted_lockouts; - }) - .or_insert(Confidence { - fork_stakes: 0, - epoch_stakes: 0, - lockouts: 0, - stake_weighted_lockouts, - }); - } - - pub fn get_fork_confidence(&self, fork: u64) -> Option<&Confidence> { - self.confidence.get(&fork) } pub fn set_snapshot_config(&mut self, snapshot_config: SnapshotConfig) { @@ -442,46 +370,6 @@ mod tests { assert_eq!(bank_forks.active_banks(), vec![1]); } - #[test] - fn test_bank_forks_confidence_cache() { - let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000); - let bank = Bank::new(&genesis_block); - let fork = bank.slot(); - let mut bank_forks = BankForks::new(0, bank); - assert!(bank_forks.confidence.get(&fork).is_none()); - bank_forks.cache_fork_confidence(fork, 11, 12, 13); - assert_eq!( - bank_forks.confidence.get(&fork).unwrap(), - &Confidence { - fork_stakes: 11, - epoch_stakes: 12, - lockouts: 13, - stake_weighted_lockouts: 0, - } - ); - // Ensure that {fork_stakes, epoch_stakes, lockouts} and stake_weighted_lockouts - // can be updated separately - bank_forks.cache_stake_weighted_lockouts(fork, 20); - assert_eq!( - bank_forks.confidence.get(&fork).unwrap(), - &Confidence { - fork_stakes: 11, - epoch_stakes: 12, - lockouts: 13, - stake_weighted_lockouts: 20, - } - ); - bank_forks.cache_fork_confidence(fork, 21, 22, 23); - assert_eq!( - bank_forks - .confidence - .get(&fork) - .unwrap() - .stake_weighted_lockouts, - 20, - ); - } - fn restore_from_snapshot(old_bank_forks: &BankForks, account_paths: String) { let (snapshot_path, snapshot_package_output_path) = old_bank_forks .snapshot_config diff --git a/core/src/confidence.rs b/core/src/confidence.rs new file mode 100644 index 0000000000..6737da4e87 --- /dev/null +++ b/core/src/confidence.rs @@ -0,0 +1,119 @@ +use std::collections::{HashMap, HashSet}; + +#[derive(Debug, Default, PartialEq)] +pub struct Confidence { + fork_stakes: u64, + total_stake: u64, + lockouts: u64, + stake_weighted_lockouts: u128, +} + +impl Confidence { + pub fn new(fork_stakes: u64, total_stake: u64, lockouts: u64) -> Self { + Self { + fork_stakes, + total_stake, + lockouts, + stake_weighted_lockouts: 0, + } + } + pub fn new_with_stake_weighted( + fork_stakes: u64, + total_stake: u64, + lockouts: u64, + stake_weighted_lockouts: u128, + ) -> Self { + Self { + fork_stakes, + total_stake, + lockouts, + stake_weighted_lockouts, + } + } +} + +#[derive(Default, PartialEq)] +pub struct ForkConfidenceCache { + confidence: HashMap, +} + +impl ForkConfidenceCache { + pub fn cache_fork_confidence( + &mut self, + fork: u64, + fork_stakes: u64, + total_stake: u64, + lockouts: u64, + ) { + self.confidence + .entry(fork) + .and_modify(|entry| { + entry.fork_stakes = fork_stakes; + entry.total_stake = total_stake; + entry.lockouts = lockouts; + }) + .or_insert_with(|| Confidence::new(fork_stakes, total_stake, lockouts)); + } + + pub fn cache_stake_weighted_lockouts(&mut self, fork: u64, stake_weighted_lockouts: u128) { + self.confidence + .entry(fork) + .and_modify(|entry| { + entry.stake_weighted_lockouts = stake_weighted_lockouts; + }) + .or_insert(Confidence { + fork_stakes: 0, + total_stake: 0, + lockouts: 0, + stake_weighted_lockouts, + }); + } + + pub fn get_fork_confidence(&self, fork: u64) -> Option<&Confidence> { + self.confidence.get(&fork) + } + + pub fn prune_confidence_cache(&mut self, ancestors: &HashMap>, root: u64) { + self.confidence + .retain(|slot, _| slot == &root || ancestors[&slot].contains(&root)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fork_confidence_cache() { + let mut cache = ForkConfidenceCache::default(); + let fork = 0; + assert!(cache.confidence.get(&fork).is_none()); + cache.cache_fork_confidence(fork, 11, 12, 13); + assert_eq!( + cache.confidence.get(&fork).unwrap(), + &Confidence { + fork_stakes: 11, + total_stake: 12, + lockouts: 13, + stake_weighted_lockouts: 0, + } + ); + // Ensure that {fork_stakes, total_stake, lockouts} and stake_weighted_lockouts + // can be updated separately + cache.cache_stake_weighted_lockouts(fork, 20); + assert_eq!( + cache.confidence.get(&fork).unwrap(), + &Confidence { + fork_stakes: 11, + total_stake: 12, + lockouts: 13, + stake_weighted_lockouts: 20, + } + ); + cache.cache_fork_confidence(fork, 21, 22, 23); + assert_eq!( + cache.confidence.get(&fork).unwrap().stake_weighted_lockouts, + 20, + ); + } +} diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 3891be2e1d..ecb5718880 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -304,7 +304,7 @@ impl Tower { pub fn aggregate_stake_lockouts( root: Option, ancestors: &HashMap>, - stake_lockouts: HashMap, + stake_lockouts: &HashMap, ) -> HashMap { let mut stake_weighted_lockouts: HashMap = HashMap::new(); for (fork, lockout) in stake_lockouts.iter() { @@ -562,7 +562,7 @@ mod test { .into_iter() .collect(); let stake_weighted_lockouts = - Tower::aggregate_stake_lockouts(tower.root(), &ancestors, stakes); + Tower::aggregate_stake_lockouts(tower.root(), &ancestors, &stakes); assert!(stake_weighted_lockouts.get(&0).is_none()); assert_eq!(*stake_weighted_lockouts.get(&1).unwrap(), 8 + 16 + 24); assert_eq!(*stake_weighted_lockouts.get(&2).unwrap(), 8 + 16); diff --git a/core/src/lib.rs b/core/src/lib.rs index 56af34c8a6..ad227ff545 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -13,6 +13,7 @@ pub mod chacha; #[cfg(cuda)] pub mod chacha_cuda; pub mod cluster_info_vote_listener; +pub mod confidence; pub mod recycler; #[macro_use] pub mod contact_info; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 4957b92dc4..db06c472a6 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -4,6 +4,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::{Blocktree, BlocktreeError}; use crate::blocktree_processor; use crate::cluster_info::ClusterInfo; +use crate::confidence::ForkConfidenceCache; use crate::consensus::{StakeLockout, Tower}; use crate::entry::{Entry, EntrySlice}; use crate::leader_schedule_cache::LeaderScheduleCache; @@ -22,6 +23,7 @@ use solana_sdk::timing::{self, duration_as_ms}; use solana_sdk::transaction::Transaction; use solana_vote_api::vote_instruction; use std::collections::HashMap; +use std::collections::HashSet; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex, RwLock}; @@ -93,6 +95,7 @@ impl ReplayStage { leader_schedule_cache: &Arc, slot_full_senders: Vec>, snapshot_package_sender: Option, + fork_confidence_cache: Arc>, ) -> (Self, Receiver>>) where T: 'static + KeypairUtil + Send + Sync, @@ -110,7 +113,7 @@ impl ReplayStage { let vote_account = *vote_account; let voting_keypair = voting_keypair.cloned(); - let (lockouts_sender, t_lockouts) = aggregate_stake_lockouts(exit); + let (lockouts_sender, t_lockouts) = aggregate_stake_lockouts(exit, fork_confidence_cache); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) @@ -142,7 +145,13 @@ impl ReplayStage { &slot_full_senders, ); - let votable = Self::generate_votable_banks(&bank_forks, &tower, &mut progress); + let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); + let votable = Self::generate_votable_banks( + &ancestors, + &bank_forks, + &tower, + &mut progress, + ); if let Some((_, bank, lockouts, total_staked)) = votable.into_iter().last() { subscriptions.notify_subscribers(bank.slot(), &bank_forks); @@ -172,6 +181,7 @@ impl ReplayStage { Self::handle_votable_bank( &bank, &bank_forks, + &ancestors, &mut tower, &mut progress, &vote_account, @@ -392,6 +402,7 @@ impl ReplayStage { fn handle_votable_bank( bank: &Arc, bank_forks: &Arc>, + ancestors: &Arc>>, tower: &mut Tower, progress: &mut HashMap, vote_account: &Pubkey, @@ -440,7 +451,7 @@ impl ReplayStage { Err(e)?; } } - Self::update_confidence_cache(bank_forks, tower, lockouts, total_staked, lockouts_sender); + Self::update_confidence_cache(ancestors, tower, lockouts, total_staked, lockouts_sender); if let Some(ref voting_keypair) = voting_keypair { let node_keypair = cluster_info.read().unwrap().keypair.clone(); @@ -461,30 +472,18 @@ impl ReplayStage { } fn update_confidence_cache( - bank_forks: &Arc>, + ancestors: &Arc>>, tower: &Tower, lockouts: HashMap, total_staked: u64, lockouts_sender: &Sender, ) { - { - let mut bank_forks = bank_forks.write().unwrap(); - for (fork, stake_lockout) in lockouts.iter() { - if tower.root().is_none() || *fork >= tower.root().unwrap() { - bank_forks.cache_fork_confidence( - *fork, - stake_lockout.stake(), - total_staked, - stake_lockout.lockout(), - ); - } - } - } - - let bank_forks_clone = bank_forks.clone(); - let root = tower.root(); - - if let Err(e) = lockouts_sender.send((lockouts, root, bank_forks_clone)) { + if let Err(e) = lockouts_sender.send(LockoutAggregationData { + lockouts, + root: tower.root(), + ancestors: ancestors.clone(), + total_staked, + }) { trace!("lockouts_sender failed: {:?}", e); } } @@ -567,15 +566,13 @@ impl ReplayStage { #[allow(clippy::type_complexity)] fn generate_votable_banks( + ancestors: &HashMap>, bank_forks: &Arc>, tower: &Tower, progress: &mut HashMap, ) -> Vec<(u128, Arc, HashMap, u64)> { let tower_start = Instant::now(); - // Tower voting - let ancestors = bank_forks.read().unwrap().ancestors(); let frozen_banks = bank_forks.read().unwrap().frozen_banks(); - trace!("frozen_banks {}", frozen_banks.len()); let mut votable: Vec<(u128, Arc, HashMap, u64)> = frozen_banks .values() @@ -796,14 +793,16 @@ impl Service for ReplayStage { } } -type LockoutAggregationData = ( - HashMap, // lockouts - Option, // root - Arc>, // bank_forks -); +struct LockoutAggregationData { + lockouts: HashMap, + root: Option, + ancestors: Arc>>, + total_staked: u64, +} fn aggregate_stake_lockouts( exit: &Arc, + fork_confidence_cache: Arc>, ) -> (Sender, JoinHandle<()>) { let (lockouts_sender, lockouts_receiver): ( Sender, @@ -818,18 +817,45 @@ fn aggregate_stake_lockouts( if exit_.load(Ordering::Relaxed) { break; } - if let Ok((lockouts, root, bank_forks)) = lockouts_receiver.try_recv() { - let ancestors = bank_forks.read().unwrap().ancestors(); - let stake_weighted_lockouts = - Tower::aggregate_stake_lockouts(root, &ancestors, lockouts); - let mut w_bank_forks = bank_forks.write().unwrap(); + if let Ok(aggregation_data) = lockouts_receiver.try_recv() { + let stake_weighted_lockouts = Tower::aggregate_stake_lockouts( + aggregation_data.root, + &aggregation_data.ancestors, + &aggregation_data.lockouts, + ); + + let mut w_fork_confidence_cache = fork_confidence_cache.write().unwrap(); + + // Cache the confidence values + for (fork, stake_lockout) in aggregation_data.lockouts.iter() { + if aggregation_data.root.is_none() + || *fork >= aggregation_data.root.unwrap() + { + w_fork_confidence_cache.cache_fork_confidence( + *fork, + stake_lockout.stake(), + aggregation_data.total_staked, + stake_lockout.lockout(), + ); + } + } + + // Cache the stake weighted lockouts for (fork, stake_weighted_lockout) in stake_weighted_lockouts.iter() { - if root.is_none() || *fork >= root.unwrap() { - w_bank_forks + if aggregation_data.root.is_none() + || *fork >= aggregation_data.root.unwrap() + { + w_fork_confidence_cache .cache_stake_weighted_lockouts(*fork, *stake_weighted_lockout) } } - drop(w_bank_forks); + + if let Some(root) = aggregation_data.root { + w_fork_confidence_cache + .prune_confidence_cache(&aggregation_data.ancestors, root); + } + + drop(w_fork_confidence_cache); } }) .unwrap(), @@ -839,9 +865,9 @@ fn aggregate_stake_lockouts( #[cfg(test)] mod test { use super::*; - use crate::bank_forks::Confidence; use crate::blocktree::tests::make_slot_entries; use crate::blocktree::{entries_to_test_shreds, get_tmp_ledger_path}; + use crate::confidence::Confidence; use crate::entry; use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader}; use crate::replay_stage::ReplayStage; @@ -1036,7 +1062,11 @@ mod test { bank.store_account(&pubkey, &leader_vote_account); } - let (lockouts_sender, _) = aggregate_stake_lockouts(&Arc::new(AtomicBool::new(false))); + let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default())); + let (lockouts_sender, _) = aggregate_stake_lockouts( + &Arc::new(AtomicBool::new(false)), + fork_confidence_cache.clone(), + ); let leader_pubkey = Pubkey::new_rand(); let leader_lamports = 3; @@ -1061,10 +1091,13 @@ mod test { let mut progress = HashMap::new(); leader_vote(&arc_bank0, &leader_voting_pubkey); - let votable = ReplayStage::generate_votable_banks(&bank_forks, &tower, &mut progress); + let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); + + let votable = + ReplayStage::generate_votable_banks(&ancestors, &bank_forks, &tower, &mut progress); if let Some((_, _, lockouts, total_staked)) = votable.into_iter().last() { ReplayStage::update_confidence_cache( - &bank_forks, + &ancestors, &tower, lockouts, total_staked, @@ -1072,11 +1105,21 @@ mod test { ); } + thread::sleep(Duration::from_millis(200)); + assert_eq!( - bank_forks.read().unwrap().get_fork_confidence(0).unwrap(), + fork_confidence_cache + .read() + .unwrap() + .get_fork_confidence(0) + .unwrap(), &Confidence::new(0, 3, 2) ); - assert!(bank_forks.read().unwrap().get_fork_confidence(1).is_none()); + assert!(fork_confidence_cache + .read() + .unwrap() + .get_fork_confidence(1) + .is_none()); tower.record_vote(arc_bank0.slot(), arc_bank0.hash()); @@ -1089,10 +1132,12 @@ mod test { bank_forks.write().unwrap().insert(bank1); let arc_bank1 = bank_forks.read().unwrap().get(1).unwrap().clone(); leader_vote(&arc_bank1, &leader_voting_pubkey); - let votable = ReplayStage::generate_votable_banks(&bank_forks, &tower, &mut progress); + let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); + let votable = + ReplayStage::generate_votable_banks(&ancestors, &bank_forks, &tower, &mut progress); if let Some((_, _, lockouts, total_staked)) = votable.into_iter().last() { ReplayStage::update_confidence_cache( - &bank_forks, + &ancestors, &tower, lockouts, total_staked, @@ -1111,10 +1156,12 @@ mod test { bank_forks.write().unwrap().insert(bank2); let arc_bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); leader_vote(&arc_bank2, &leader_voting_pubkey); - let votable = ReplayStage::generate_votable_banks(&bank_forks, &tower, &mut progress); + let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); + let votable = + ReplayStage::generate_votable_banks(&ancestors, &bank_forks, &tower, &mut progress); if let Some((_, _, lockouts, total_staked)) = votable.into_iter().last() { ReplayStage::update_confidence_cache( - &bank_forks, + &ancestors, &tower, lockouts, total_staked, @@ -1124,15 +1171,27 @@ mod test { thread::sleep(Duration::from_millis(200)); assert_eq!( - bank_forks.read().unwrap().get_fork_confidence(0).unwrap(), + fork_confidence_cache + .read() + .unwrap() + .get_fork_confidence(0) + .unwrap(), &Confidence::new_with_stake_weighted(3, 3, 14, 60) ); assert_eq!( - bank_forks.read().unwrap().get_fork_confidence(1).unwrap(), + fork_confidence_cache + .read() + .unwrap() + .get_fork_confidence(1) + .unwrap(), &Confidence::new_with_stake_weighted(3, 3, 6, 18) ); assert_eq!( - bank_forks.read().unwrap().get_fork_confidence(2).unwrap(), + fork_confidence_cache + .read() + .unwrap() + .get_fork_confidence(2) + .unwrap(), &Confidence::new_with_stake_weighted(0, 3, 2, 0) ); } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c66adb15ff..48b711229f 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -17,6 +17,7 @@ use crate::blob_fetch_stage::BlobFetchStage; use crate::blockstream_service::BlockstreamService; use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::cluster_info::ClusterInfo; +use crate::confidence::ForkConfidenceCache; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::ledger_cleanup_service::LedgerCleanupService; use crate::poh_recorder::PohRecorder; @@ -77,6 +78,7 @@ impl Tvu { leader_schedule_cache: &Arc, exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, + fork_confidence_cache: Arc>, ) -> Self where T: 'static + KeypairUtil + Sync + Send, @@ -153,6 +155,7 @@ impl Tvu { leader_schedule_cache, vec![blockstream_slot_sender, ledger_cleanup_slot_sender], snapshot_package_sender, + fork_confidence_cache, ); let blockstream_service = if blockstream_unix_socket.is_some() { @@ -258,6 +261,7 @@ pub mod tests { let voting_keypair = Keypair::new(); let storage_keypair = Arc::new(Keypair::new()); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default())); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(&Arc::new(voting_keypair)), @@ -282,6 +286,7 @@ pub mod tests { &leader_schedule_cache, &exit, completed_slots_receiver, + fork_confidence_cache, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 5bc5e3408e..a1ec0501cc 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -5,6 +5,7 @@ use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::blocktree_processor::{self, BankForksInfo}; use crate::broadcast_stage::BroadcastStageType; use crate::cluster_info::{ClusterInfo, Node}; +use crate::confidence::ForkConfidenceCache; use crate::contact_info::ContactInfo; use crate::erasure::ErasureConfig; use crate::gossip_service::{discover_cluster, GossipService}; @@ -301,6 +302,7 @@ impl Validator { Some(voting_keypair) }; + let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default())); let tvu = Tvu::new( vote_account, voting_keypair, @@ -318,6 +320,7 @@ impl Validator { &leader_schedule_cache, &exit, completed_slots_receiver, + fork_confidence_cache, ); if config.dev_sigverify_disabled { diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index d56f05f8d6..19f3563b76 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -5,6 +5,7 @@ use log::*; use solana_core::banking_stage::create_test_recorder; use solana_core::blocktree::{create_new_tmp_ledger, Blocktree}; use solana_core::cluster_info::{ClusterInfo, Node}; +use solana_core::confidence::ForkConfidenceCache; use solana_core::entry::next_entry_mut; use solana_core::entry::EntrySlice; use solana_core::genesis_utils::{create_genesis_block_with_leader, GenesisBlockInfo}; @@ -120,6 +121,7 @@ fn test_replay() { { let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&working_bank, &blocktree); + let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default())); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(&Arc::new(voting_keypair)), @@ -144,6 +146,7 @@ fn test_replay() { &leader_schedule_cache, &exit, completed_slots_receiver, + fork_confidence_cache, ); let mut mint_ref_balance = mint_balance;