diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 20277dc49c..ef020b530d 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1,4 +1,4 @@ -use crate::replay_stage::ProgressMap; +use crate::progress_map::ProgressMap; use chrono::prelude::*; use solana_ledger::bank_forks::BankForks; use solana_runtime::bank::Bank; @@ -480,7 +480,11 @@ impl Tower { #[cfg(test)] pub mod test { use super::*; - use crate::replay_stage::{ForkProgress, HeaviestForkFailures, ReplayStage}; + use crate::{ + cluster_info_vote_listener::VoteTracker, + progress_map::ForkProgress, + replay_stage::{HeaviestForkFailures, ReplayStage}, + }; use solana_ledger::bank_forks::BankForks; use solana_runtime::{ bank::Bank, @@ -523,7 +527,7 @@ pub mod test { cluster_votes: &mut HashMap>, validator_keypairs: &HashMap, my_keypairs: &ValidatorVoteKeypairs, - progress: &mut HashMap, + progress: &mut ProgressMap, tower: &mut Tower, ) -> Vec { let node = self @@ -562,7 +566,7 @@ pub mod test { info!("parent of {} is {}", missing_slot, parent_bank.slot(),); progress .entry(missing_slot) - .or_insert_with(|| ForkProgress::new(parent_bank.last_blockhash())); + .or_insert_with(|| ForkProgress::new(parent_bank.last_blockhash(), None, None)); // Create the missing bank let new_bank = @@ -607,6 +611,9 @@ pub mod test { &mut frozen_banks, tower, progress, + &VoteTracker::default(), + bank_forks, + &mut HashSet::new(), ); let bank = bank_forks @@ -633,7 +640,14 @@ pub mod test { } let vote = tower.new_vote_from_bank(&bank, &my_vote_pubkey).0; if let Some(new_root) = tower.record_bank_vote(vote) { - ReplayStage::handle_new_root(new_root, bank_forks, progress, &None, &mut 0); + ReplayStage::handle_new_root( + new_root, + bank_forks, + progress, + &None, + &mut 0, + &mut HashSet::new(), + ); } // Mark the vote for this bank under this node's pubkey so it will be @@ -687,7 +701,7 @@ pub mod test { pub(crate) fn initialize_state( validator_keypairs_map: &HashMap, stake: u64, - ) -> (BankForks, HashMap) { + ) -> (BankForks, ProgressMap) { let validator_keypairs: Vec<_> = validator_keypairs_map.values().collect(); let GenesisConfigInfo { genesis_config, @@ -702,8 +716,8 @@ pub mod test { } bank0.freeze(); - let mut progress = HashMap::new(); - progress.insert(0, ForkProgress::new(bank0.last_blockhash())); + let mut progress = ProgressMap::default(); + progress.insert(0, ForkProgress::new(bank0.last_blockhash(), None, None)); (BankForks::new(0, bank0), progress) } @@ -735,7 +749,7 @@ pub mod test { bank_forks: &RwLock, cluster_votes: &mut HashMap>, keypairs: &HashMap, - progress: &mut HashMap, + progress: &mut ProgressMap, ) -> bool { // Check that within some reasonable time, validator can make a new // root on this fork diff --git a/core/src/lib.rs b/core/src/lib.rs index 0648f007de..3bf1d06a6f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -32,6 +32,7 @@ pub mod ledger_cleanup_service; pub mod local_vote_signer_service; pub mod poh_recorder; pub mod poh_service; +pub mod progress_map; pub mod repair_service; pub mod replay_stage; mod result; diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs new file mode 100644 index 0000000000..2d15881539 --- /dev/null +++ b/core/src/progress_map.rs @@ -0,0 +1,387 @@ +use crate::{ + cluster_info_vote_listener::SlotVoteTracker, consensus::StakeLockout, + replay_stage::SUPERMINORITY_THRESHOLD, +}; +use solana_ledger::{ + bank_forks::BankForks, + blockstore_processor::{ConfirmationProgress, ConfirmationTiming}, +}; +use solana_runtime::bank::Bank; +use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; +use std::{ + collections::{HashMap, HashSet}, + rc::Rc, + sync::{Arc, RwLock}, +}; + +#[derive(Default)] +pub(crate) struct ReplaySlotStats(ConfirmationTiming); +impl std::ops::Deref for ReplaySlotStats { + type Target = ConfirmationTiming; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl std::ops::DerefMut for ReplaySlotStats { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl ReplaySlotStats { + pub fn report_stats(&self, slot: Slot, num_entries: usize, num_shreds: u64) { + datapoint_info!( + "replay-slot-stats", + ("slot", slot as i64, i64), + ("fetch_entries_time", self.fetch_elapsed as i64, i64), + ( + "fetch_entries_fail_time", + self.fetch_fail_elapsed as i64, + i64 + ), + ("entry_verification_time", self.verify_elapsed as i64, i64), + ("replay_time", self.replay_elapsed as i64, i64), + ( + "replay_total_elapsed", + self.started.elapsed().as_micros() as i64, + i64 + ), + ("total_entries", num_entries as i64, i64), + ("total_shreds", num_shreds as i64, i64), + ); + } +} + +#[derive(Debug)] +pub(crate) struct ValidatorStakeInfo { + pub validator_vote_pubkey: Pubkey, + pub stake: u64, + pub total_epoch_stake: u64, +} + +impl Default for ValidatorStakeInfo { + fn default() -> Self { + Self { + stake: 0, + validator_vote_pubkey: Pubkey::default(), + total_epoch_stake: 1, + } + } +} + +impl ValidatorStakeInfo { + pub fn new(validator_vote_pubkey: Pubkey, stake: u64, total_epoch_stake: u64) -> Self { + Self { + validator_vote_pubkey, + stake, + total_epoch_stake, + } + } +} + +pub(crate) struct ForkProgress { + pub(crate) is_dead: bool, + pub(crate) fork_stats: ForkStats, + pub(crate) propagated_stats: PropagatedStats, + pub(crate) replay_stats: ReplaySlotStats, + pub(crate) replay_progress: ConfirmationProgress, +} + +impl ForkProgress { + pub fn new( + last_entry: Hash, + prev_leader_slot: Option, + validator_stake_info: Option, + ) -> Self { + let ( + is_leader_slot, + propagated_validators_stake, + propagated_validators, + is_propagated, + total_epoch_stake, + ) = validator_stake_info + .map(|info| { + ( + true, + info.stake, + vec![Rc::new(info.validator_vote_pubkey)] + .into_iter() + .collect(), + { + if info.total_epoch_stake == 0 { + true + } else { + info.stake as f64 / info.total_epoch_stake as f64 + > SUPERMINORITY_THRESHOLD + } + }, + info.total_epoch_stake, + ) + }) + .unwrap_or((false, 0, HashSet::new(), false, 0)); + Self { + is_dead: false, + fork_stats: ForkStats::default(), + replay_stats: ReplaySlotStats::default(), + replay_progress: ConfirmationProgress::new(last_entry), + propagated_stats: PropagatedStats { + prev_leader_slot, + is_leader_slot, + propagated_validators_stake, + propagated_validators, + is_propagated, + total_epoch_stake, + ..PropagatedStats::default() + }, + } + } + + pub fn new_from_bank( + bank: &Bank, + my_pubkey: &Pubkey, + voting_pubkey: &Pubkey, + prev_leader_slot: Option, + ) -> Self { + let validator_fork_info = { + if bank.collector_id() == my_pubkey { + let stake = bank.epoch_vote_account_stake(voting_pubkey); + Some(ValidatorStakeInfo::new( + *voting_pubkey, + stake, + bank.total_epoch_stake(), + )) + } else { + None + } + }; + + Self::new(bank.last_blockhash(), prev_leader_slot, validator_fork_info) + } +} + +#[derive(Debug, Clone, Default)] +pub(crate) struct ForkStats { + pub(crate) weight: u128, + pub(crate) fork_weight: u128, + pub(crate) total_staked: u64, + pub(crate) slot: Slot, + pub(crate) block_height: u64, + pub(crate) has_voted: bool, + pub(crate) is_recent: bool, + pub(crate) is_empty: bool, + pub(crate) vote_threshold: bool, + pub(crate) is_locked_out: bool, + pub(crate) stake_lockouts: HashMap, + pub(crate) confirmation_reported: bool, + pub(crate) computed: bool, +} + +#[derive(Clone, Default)] +pub(crate) struct PropagatedStats { + pub(crate) propagated_validators: HashSet>, + pub(crate) propagated_validators_stake: u64, + pub(crate) is_propagated: bool, + pub(crate) is_leader_slot: bool, + pub(crate) prev_leader_slot: Option, + pub(crate) slot_vote_tracker: Option>>, + pub(crate) total_epoch_stake: u64, +} + +#[derive(Default)] +pub(crate) struct ProgressMap { + progress_map: HashMap, +} + +impl std::ops::Deref for ProgressMap { + type Target = HashMap; + fn deref(&self) -> &Self::Target { + &self.progress_map + } +} + +impl std::ops::DerefMut for ProgressMap { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.progress_map + } +} + +impl ProgressMap { + pub fn insert(&mut self, slot: Slot, fork_progress: ForkProgress) { + self.progress_map.insert(slot, fork_progress); + } + + pub fn get_propagated_stats(&self, slot: Slot) -> Option<&PropagatedStats> { + self.progress_map + .get(&slot) + .map(|fork_progress| &fork_progress.propagated_stats) + } + + pub fn get_propagated_stats_mut(&mut self, slot: Slot) -> Option<&mut PropagatedStats> { + self.progress_map + .get_mut(&slot) + .map(|fork_progress| &mut fork_progress.propagated_stats) + } + + pub fn get_fork_stats(&self, slot: Slot) -> Option<&ForkStats> { + self.progress_map + .get(&slot) + .map(|fork_progress| &fork_progress.fork_stats) + } + + pub fn get_fork_stats_mut(&mut self, slot: Slot) -> Option<&mut ForkStats> { + self.progress_map + .get_mut(&slot) + .map(|fork_progress| &mut fork_progress.fork_stats) + } + + pub fn is_propagated(&self, slot: Slot) -> bool { + let leader_slot_to_check = self.get_latest_leader_slot(slot); + + // prev_leader_slot doesn't exist because already rooted + // or this validator hasn't been scheduled as a leader + // yet. In both cases the latest leader is vacuously + // confirmed + leader_slot_to_check + .map(|leader_slot_to_check| { + // If the leader's stats are None (isn't in the + // progress map), this means that prev_leader slot is + // rooted, so return true + self.get_propagated_stats(leader_slot_to_check) + .map(|stats| stats.is_propagated) + .unwrap_or(true) + }) + .unwrap_or(true) + } + + pub fn get_latest_leader_slot(&self, slot: Slot) -> Option { + let propagated_stats = self + .get_propagated_stats(slot) + .expect("All frozen banks must exist in the Progress map"); + + if propagated_stats.is_leader_slot { + Some(slot) + } else { + propagated_stats.prev_leader_slot + } + } + + pub fn get_bank_prev_leader_slot(&self, bank: &Bank) -> Option { + let parent_slot = bank.parent_slot(); + self.get_propagated_stats(parent_slot) + .map(|stats| { + if stats.is_leader_slot { + Some(parent_slot) + } else { + stats.prev_leader_slot + } + }) + .unwrap_or(None) + } + + pub fn handle_new_root(&mut self, bank_forks: &BankForks) { + self.progress_map + .retain(|k, _| bank_forks.get(*k).is_some()); + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_is_propagated_status_on_construction() { + // If the given ValidatorStakeInfo == None, then this is not + // a leader slot and is_propagated == false + let progress = ForkProgress::new(Hash::default(), Some(9), None); + assert!(!progress.propagated_stats.is_propagated); + + // If the stake is zero, then threshold is always achieved + let progress = ForkProgress::new( + Hash::default(), + Some(9), + Some(ValidatorStakeInfo { + total_epoch_stake: 0, + ..ValidatorStakeInfo::default() + }), + ); + assert!(progress.propagated_stats.is_propagated); + + // If the stake is non zero, then threshold is not achieved unless + // validator has enough stake by itself to pass threshold + let progress = ForkProgress::new( + Hash::default(), + Some(9), + Some(ValidatorStakeInfo { + total_epoch_stake: 2, + ..ValidatorStakeInfo::default() + }), + ); + assert!(!progress.propagated_stats.is_propagated); + + // Give the validator enough stake by itself to pass threshold + let progress = ForkProgress::new( + Hash::default(), + Some(9), + Some(ValidatorStakeInfo { + stake: 1, + total_epoch_stake: 2, + ..ValidatorStakeInfo::default() + }), + ); + assert!(progress.propagated_stats.is_propagated); + + // Check that the default ValidatorStakeInfo::default() constructs a ForkProgress + // with is_propagated == false, otherwise propagation tests will fail to run + // the proper checks (most will auto-pass without checking anything) + let progress = ForkProgress::new( + Hash::default(), + Some(9), + Some(ValidatorStakeInfo::default()), + ); + assert!(!progress.propagated_stats.is_propagated); + } + + #[test] + fn test_is_propagated() { + let mut progress_map = ProgressMap::default(); + + // Insert new ForkProgress for slot 10 (not a leader slot) and its + // previous leader slot 9 (leader slot) + progress_map.insert(10, ForkProgress::new(Hash::default(), Some(9), None)); + progress_map.insert( + 9, + ForkProgress::new(Hash::default(), None, Some(ValidatorStakeInfo::default())), + ); + + // None of these slot have parents which are confirmed + assert!(!progress_map.is_propagated(9)); + assert!(!progress_map.is_propagated(10)); + + // Insert new ForkProgress for slot 8 with no previous leader. + // The previous leader before 8, slot 7, does not exist in + // progress map, so is_propagated(8) should return true as + // this implies the parent is rooted + progress_map.insert(8, ForkProgress::new(Hash::default(), Some(7), None)); + assert!(progress_map.is_propagated(8)); + + // If we set the is_propagated = true, is_propagated should return true + progress_map + .get_propagated_stats_mut(9) + .unwrap() + .is_propagated = true; + assert!(progress_map.is_propagated(9)); + assert!(progress_map.get(&9).unwrap().propagated_stats.is_propagated); + + // Because slot 9 is now confirmed, then slot 10 is also confirmed b/c 9 + // is the last leader slot before 10 + assert!(progress_map.is_propagated(10)); + + // If we make slot 10 a leader slot though, even though its previous + // leader slot 9 has been confirmed, slot 10 itself is not confirmed + progress_map + .get_propagated_stats_mut(10) + .unwrap() + .is_leader_slot = true; + assert!(!progress_map.is_propagated(10)); + } +} diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 79de13d2c7..6e042dc251 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -7,6 +7,7 @@ use crate::{ commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData}, consensus::{StakeLockout, Tower}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, + progress_map::{ForkProgress, ForkStats, ProgressMap, PropagatedStats}, result::Result, rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, @@ -15,10 +16,7 @@ use solana_ledger::{ bank_forks::BankForks, block_error::BlockError, blockstore::Blockstore, - blockstore_processor::{ - self, BlockstoreProcessorError, ConfirmationProgress, ConfirmationTiming, - TransactionStatusSender, - }, + blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender}, entry::VerifyRecyclers, leader_schedule_cache::LeaderScheduleCache, snapshot_package::SnapshotPackageSender, @@ -27,7 +25,7 @@ use solana_measure::thread_mem_usage; use solana_metrics::inc_new_counter_info; use solana_runtime::bank::Bank; use solana_sdk::{ - clock::Slot, + clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, hash::Hash, pubkey::Pubkey, signature::{Keypair, Signer}, @@ -37,6 +35,8 @@ use solana_sdk::{ use solana_vote_program::vote_instruction; use std::{ collections::{HashMap, HashSet}, + ops::Deref, + rc::Rc, result, sync::{ atomic::{AtomicBool, Ordering}, @@ -48,14 +48,15 @@ use std::{ }; pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; - -pub(crate) type ProgressMap = HashMap; +pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; +pub const MAX_UNCONFIRMED_SLOTS: usize = 5; #[derive(PartialEq, Debug)] pub(crate) enum HeaviestForkFailures { LockedOut(u64), FailedThreshold(u64), FailedSwitchThreshold(u64), + NoPropagatedConfirmation(u64), } // Implement a destructor for the ReplayStage thread to signal it exited @@ -77,6 +78,12 @@ impl Drop for Finalizer { } } +#[derive(Default)] +struct SkippedSlotsInfo { + last_retransmit_slot: u64, + last_skipped_slot: u64, +} + #[derive(Default)] pub struct ReplayStageConfig { pub my_pubkey: Pubkey, @@ -97,79 +104,6 @@ pub struct ReplayStage { commitment_service: AggregateCommitmentService, } -#[derive(Default)] -pub(crate) struct ReplaySlotStats(ConfirmationTiming); -impl std::ops::Deref for ReplaySlotStats { - type Target = ConfirmationTiming; - fn deref(&self) -> &Self::Target { - &self.0 - } -} -impl std::ops::DerefMut for ReplaySlotStats { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -#[derive(Debug, Clone, Default)] -pub(crate) struct ForkStats { - weight: u128, - fork_weight: u128, - total_staked: u64, - slot: Slot, - block_height: u64, - has_voted: bool, - is_recent: bool, - is_empty: bool, - pub(crate) vote_threshold: bool, - pub(crate) is_locked_out: bool, - pub(crate) stake_lockouts: HashMap, - computed: bool, - confirmation_reported: bool, -} - -impl ReplaySlotStats { - pub fn report_stats(&self, slot: Slot, num_entries: usize, num_shreds: u64) { - datapoint_info!( - "replay-slot-stats", - ("slot", slot as i64, i64), - ("fetch_entries_time", self.fetch_elapsed as i64, i64), - ( - "fetch_entries_fail_time", - self.fetch_fail_elapsed as i64, - i64 - ), - ("entry_verification_time", self.verify_elapsed as i64, i64), - ("replay_time", self.replay_elapsed as i64, i64), - ( - "replay_total_elapsed", - self.started.elapsed().as_micros() as i64, - i64 - ), - ("total_entries", num_entries as i64, i64), - ("total_shreds", num_shreds as i64, i64), - ); - } -} - -pub(crate) struct ForkProgress { - is_dead: bool, - pub(crate) fork_stats: ForkStats, - replay_stats: ReplaySlotStats, - replay_progress: ConfirmationProgress, -} - -impl ForkProgress { - pub fn new(last_entry: Hash) -> Self { - Self { - is_dead: false, - fork_stats: ForkStats::default(), - replay_stats: ReplaySlotStats::default(), - replay_progress: ConfirmationProgress::new(last_entry), - } - } -} - impl ReplayStage { #[allow(clippy::new_ret_no_self)] pub fn new( @@ -179,7 +113,7 @@ impl ReplayStage { cluster_info: Arc>, ledger_signal_receiver: Receiver, poh_recorder: Arc>, - _vote_tracker: Arc, + vote_tracker: Arc, retransmit_slots_sender: RetransmitSlotsSender, ) -> (Self, Receiver>>) { let ReplayStageConfig { @@ -201,7 +135,6 @@ impl ReplayStage { let mut tower = Tower::new(&my_pubkey, &vote_account, &bank_forks.read().unwrap()); // Start the replay stage loop - let (lockouts_sender, commitment_service) = AggregateCommitmentService::new(&exit, block_commitment_cache); @@ -209,13 +142,32 @@ impl ReplayStage { let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { - let _retransmit_slots_sender = retransmit_slots_sender; + let mut all_pubkeys: HashSet> = HashSet::new(); let verify_recyclers = VerifyRecyclers::default(); let _exit = Finalizer::new(exit.clone()); - let mut progress = HashMap::new(); + let mut progress = ProgressMap::default(); + let mut frozen_banks: Vec<_> = bank_forks + .read() + .unwrap() + .frozen_banks() + .values() + .cloned() + .collect(); + + frozen_banks.sort_by_key(|bank| bank.slot()); + // Initialize progress map with any root banks - for bank in bank_forks.read().unwrap().frozen_banks().values() { - progress.insert(bank.slot(), ForkProgress::new(bank.last_blockhash())); + for bank in frozen_banks { + let prev_leader_slot = progress.get_bank_prev_leader_slot(&bank); + progress.insert( + bank.slot(), + ForkProgress::new_from_bank( + &bank, + &my_pubkey, + &vote_account, + prev_leader_slot, + ), + ); } let mut current_leader = None; let mut last_reset = Hash::default(); @@ -225,6 +177,7 @@ impl ReplayStage { slots.last().cloned().unwrap_or(0) }; let mut switch_threshold = false; + let mut skipped_slots_info = SkippedSlotsInfo::default(); loop { let allocated = thread_mem_usage::Allocatedp::default(); @@ -242,6 +195,8 @@ impl ReplayStage { &leader_schedule_cache, &subscriptions, rewards_recorder_sender.clone(), + &mut progress, + &mut all_pubkeys, ); Self::report_memory(&allocated, "generate_new_bank_forks", start); @@ -252,6 +207,7 @@ impl ReplayStage { &blockstore, &bank_forks, &my_pubkey, + &vote_account, &mut progress, transaction_status_sender.clone(), &verify_recyclers, @@ -274,9 +230,12 @@ impl ReplayStage { &mut frozen_banks, &tower, &mut progress, + &vote_tracker, + &bank_forks, + &mut all_pubkeys, ); for slot in newly_computed_slot_stats { - let fork_stats = &progress.get(&slot).unwrap().fork_stats; + let fork_stats = progress.get_fork_stats(slot).unwrap(); let confirmed_forks = Self::confirm_forks( &tower, &fork_stats.stake_lockouts, @@ -320,6 +279,26 @@ impl ReplayStage { heaviest_bank.as_ref().map(|b| b.slot()), failure_reasons ); + + for r in failure_reasons { + if let HeaviestForkFailures::NoPropagatedConfirmation(slot) = r { + if let Some(latest_leader_slot) = progress.get_latest_leader_slot(slot) + { + if let Some(stats) = + progress.get_propagated_stats(latest_leader_slot) + { + info!( + "total staked: {}, observed staked: {}, pubkeys: {:?}, latest_leader_slot: {}, epoch: {:?}", + stats.total_epoch_stake, + stats.propagated_validators_stake, + stats.propagated_validators, + latest_leader_slot, + bank_forks.read().unwrap().get(latest_leader_slot).map(|x| x.epoch()), + ); + } + } + } + } } let start = allocated.get(); @@ -354,6 +333,7 @@ impl ReplayStage { &accounts_hash_sender, &latest_root_senders, &mut earliest_vote_on_fork, + &mut all_pubkeys, )?; ancestors @@ -445,6 +425,9 @@ impl ReplayStage { &leader_schedule_cache, &subscriptions, rewards_recorder_sender.clone(), + &progress, + &retransmit_slots_sender, + &mut skipped_slots_info, ); let poh_bank = poh_recorder.lock().unwrap().bank(); @@ -521,6 +504,26 @@ impl ReplayStage { current_leader.replace(new_leader.to_owned()); } + fn check_propagation_for_start_leader( + poh_slot: Slot, + parent_slot: Slot, + progress_map: &ProgressMap, + ) -> bool { + // Check if the next leader slot is part of a consecutive block, in + // which case ignore the propagation check + let is_consecutive_leader = progress_map + .get_propagated_stats(parent_slot) + .unwrap() + .is_leader_slot + && parent_slot == poh_slot - 1; + + if is_consecutive_leader { + return true; + } + + progress_map.is_propagated(parent_slot) + } + fn maybe_start_leader( my_pubkey: &Pubkey, bank_forks: &Arc>, @@ -528,6 +531,9 @@ impl ReplayStage { leader_schedule_cache: &Arc, subscriptions: &Arc, rewards_recorder_sender: Option, + progress_map: &ProgressMap, + retransmit_slots_sender: &RetransmitSlotsSender, + skipped_slots_info: &mut SkippedSlotsInfo, ) { // all the individual calls to poh_recorder.lock() are designed to // increase granularity, decrease contention @@ -582,6 +588,36 @@ impl ReplayStage { ("leader", next_leader.to_string(), String), ); + if !Self::check_propagation_for_start_leader(poh_slot, parent_slot, progress_map) { + let latest_leader_slot = progress_map.get_latest_leader_slot(parent_slot).expect("In order for propagated check to fail, latest leader must exist in progress map"); + if poh_slot != skipped_slots_info.last_skipped_slot { + datapoint_info!( + "replay_stage-skip_leader_slot", + ("slot", poh_slot, i64), + ("parent_slot", parent_slot, i64), + ("latest_unconfirmed_leader", latest_leader_slot, i64) + ); + skipped_slots_info.last_skipped_slot = poh_slot; + } + let bank = bank_forks.read().unwrap().get(latest_leader_slot) + .expect("In order for propagated check to fail, latest leader must exist in progress map, and thus also in BankForks").clone(); + + // Signal retransmit + if poh_slot < skipped_slots_info.last_retransmit_slot + || poh_slot + >= skipped_slots_info + .last_retransmit_slot + .saturating_sub(NUM_CONSECUTIVE_LEADER_SLOTS) + { + datapoint_info!("replay_stage-retransmit", ("slot", bank.slot(), i64),); + retransmit_slots_sender + .send(vec![(bank.slot(), bank.clone())].into_iter().collect()) + .unwrap(); + skipped_slots_info.last_retransmit_slot = poh_slot; + } + return; + } + let root_slot = bank_forks.read().unwrap().root(); info!( "new fork:{} parent:{} (leader) root:{}", @@ -670,6 +706,7 @@ impl ReplayStage { accounts_hash_sender: &Option, latest_root_senders: &[Sender], earliest_vote_on_fork: &mut Slot, + all_pubkeys: &mut HashSet>, ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -701,6 +738,7 @@ impl ReplayStage { progress, accounts_hash_sender, earliest_vote_on_fork, + all_pubkeys, ); latest_root_senders.iter().for_each(|s| { if let Err(e) = s.send(new_root) { @@ -716,7 +754,7 @@ impl ReplayStage { Self::update_commitment_cache( bank.clone(), - progress.get(&bank.slot()).unwrap().fork_stats.total_staked, + progress.get_fork_stats(bank.slot()).unwrap().total_staked, lockouts_sender, ); @@ -792,6 +830,7 @@ impl ReplayStage { blockstore: &Arc, bank_forks: &Arc>, my_pubkey: &Pubkey, + vote_account: &Pubkey, progress: &mut ProgressMap, transaction_status_sender: Option, verify_recyclers: &VerifyRecyclers, @@ -809,13 +848,14 @@ impl ReplayStage { } let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); + let prev_leader_slot = progress.get_bank_prev_leader_slot(&bank); // Insert a progress entry even for slots this node is the leader for, so that // 1) confirm_forks can report confirmation, 2) we can cache computations about // this bank in `select_forks()` - let bank_progress = &mut progress - .entry(bank.slot()) - .or_insert_with(|| ForkProgress::new(bank.last_blockhash())); + let bank_progress = &mut progress.entry(bank.slot()).or_insert_with(|| { + ForkProgress::new_from_bank(&bank, &my_pubkey, vote_account, prev_leader_slot) + }); if bank.collector_id() != my_pubkey { let replay_result = Self::replay_blockstore_into_bank( &bank, @@ -863,10 +903,15 @@ impl ReplayStage { frozen_banks: &mut Vec>, tower: &Tower, progress: &mut ProgressMap, + vote_tracker: &VoteTracker, + bank_forks: &RwLock, + all_pubkeys: &mut HashSet>, ) -> Vec { frozen_banks.sort_by_key(|bank| bank.slot()); let mut new_stats = vec![]; for bank in frozen_banks { + let bank_slot = bank.slot(); + // Only time progress map should be missing a bank slot // is if this node was the leader for this slot as those banks // are not replayed in replay_active_banks() @@ -875,53 +920,109 @@ impl ReplayStage { .and_then(|b| progress.get(&b.slot())) .map(|x| x.fork_stats.fork_weight) .unwrap_or(0); - let stats = &mut progress - .get_mut(&bank.slot()) - .expect("All frozen banks must exist in the Progress map") - .fork_stats; - if !stats.computed { - stats.slot = bank.slot(); - let (stake_lockouts, total_staked, bank_weight) = tower.collect_vote_lockouts( - bank.slot(), - bank.vote_accounts().into_iter(), - &ancestors, - ); - stats.total_staked = total_staked; - stats.weight = bank_weight; - stats.fork_weight = stats.weight + parent_weight; + { + let stats = progress + .get_fork_stats_mut(bank_slot) + .expect("All frozen banks must exist in the Progress map"); - datapoint_info!( - "bank_weight", - ("slot", bank.slot(), i64), - // u128 too large for influx, convert to hex - ("weight", format!("{:X}", stats.weight), String), - ); - info!( - "{} slot_weight: {} {} {} {}", - my_pubkey, - stats.slot, - stats.weight, - stats.fork_weight, - bank.parent().map(|b| b.slot()).unwrap_or(0) - ); - stats.stake_lockouts = stake_lockouts; - stats.block_height = bank.block_height(); - stats.computed = true; - new_stats.push(stats.slot); + if !stats.computed { + stats.slot = bank_slot; + let (stake_lockouts, total_staked, bank_weight) = tower.collect_vote_lockouts( + bank_slot, + bank.vote_accounts().into_iter(), + &ancestors, + ); + stats.total_staked = total_staked; + stats.weight = bank_weight; + stats.fork_weight = stats.weight + parent_weight; + + datapoint_info!( + "bank_weight", + ("slot", bank_slot, i64), + // u128 too large for influx, convert to hex + ("weight", format!("{:X}", stats.weight), String), + ); + info!( + "{} slot_weight: {} {} {} {}", + my_pubkey, + stats.slot, + stats.weight, + stats.fork_weight, + bank.parent().map(|b| b.slot()).unwrap_or(0) + ); + stats.stake_lockouts = stake_lockouts; + stats.block_height = bank.block_height(); + stats.computed = true; + new_stats.push(stats.slot); + } } + + Self::update_propagation_status( + progress, + bank_slot, + all_pubkeys, + bank_forks, + vote_tracker, + ); + + let stats = progress + .get_fork_stats_mut(bank_slot) + .expect("All frozen banks must exist in the Progress map"); + stats.vote_threshold = tower.check_vote_stake_threshold( - bank.slot(), + bank_slot, &stats.stake_lockouts, stats.total_staked, ); - stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors); - stats.has_voted = tower.has_voted(bank.slot()); - stats.is_recent = tower.is_recent(bank.slot()); + stats.is_locked_out = tower.is_locked_out(bank_slot, &ancestors); + stats.has_voted = tower.has_voted(bank_slot); + stats.is_recent = tower.is_recent(bank_slot); } new_stats } + fn update_propagation_status( + progress: &mut ProgressMap, + slot: Slot, + all_pubkeys: &mut HashSet>, + bank_forks: &RwLock, + vote_tracker: &VoteTracker, + ) { + // If propagation has already been confirmed, return + if progress.is_propagated(slot) { + return; + } + + // Otherwise we have to check the votes for confirmation + let mut slot_vote_tracker = progress + .get_propagated_stats(slot) + .expect("All frozen banks must exist in the Progress map") + .slot_vote_tracker + .clone(); + + if slot_vote_tracker.is_none() { + slot_vote_tracker = vote_tracker.get_slot_vote_tracker(slot); + progress + .get_propagated_stats_mut(slot) + .expect("All frozen banks must exist in the Progress map") + .slot_vote_tracker = slot_vote_tracker.clone(); + } + + let newly_voted_pubkeys = slot_vote_tracker + .as_ref() + .and_then(|slot_vote_tracker| slot_vote_tracker.write().unwrap().get_updates()) + .unwrap_or_else(|| vec![]); + + Self::update_fork_propagated_threshold_from_votes( + progress, + newly_voted_pubkeys, + slot, + bank_forks, + all_pubkeys, + ); + } + // Returns: // 1) The heaviest bank // 2) The latest votable bank on the same fork as the last vote @@ -948,10 +1049,9 @@ impl ReplayStage { // Only time progress map should be missing a bank slot // is if this node was the leader for this slot as those banks // are not replayed in replay_active_banks() - let stats = &progress - .get(&bank.slot()) - .expect("All frozen banks must exist in the Progress map") - .fork_stats; + let stats = progress + .get_fork_stats(bank.slot()) + .expect("All frozen banks must exist in the Progress map"); if let Some(last_vote) = last_vote { if ancestors @@ -1066,7 +1166,7 @@ impl ReplayStage { } else { if !*switch_threshold { let total_staked = - progress.get(&bank.slot()).unwrap().fork_stats.total_staked; + progress.get_fork_stats(bank.slot()).unwrap().total_staked; *switch_threshold = tower.check_switch_threshold( earliest_vote_on_fork, &ancestors, @@ -1099,22 +1199,30 @@ impl ReplayStage { }; if let Some(bank) = selected_fork { - let (is_locked_out, vote_threshold, fork_weight) = { - let fork_stats = &progress.get(&bank.slot()).unwrap().fork_stats; + let (is_locked_out, vote_threshold, is_leader_slot, fork_weight) = { + let fork_stats = progress.get_fork_stats(bank.slot()).unwrap(); + let propagated_stats = &progress.get_propagated_stats(bank.slot()).unwrap(); ( fork_stats.is_locked_out, fork_stats.vote_threshold, + propagated_stats.is_leader_slot, fork_stats.weight, ) }; + + let propagation_confirmed = is_leader_slot || progress.is_propagated(bank.slot()); + if is_locked_out { failure_reasons.push(HeaviestForkFailures::LockedOut(bank.slot())); } if !vote_threshold { failure_reasons.push(HeaviestForkFailures::FailedThreshold(bank.slot())); } + if !propagation_confirmed { + failure_reasons.push(HeaviestForkFailures::NoPropagatedConfirmation(bank.slot())); + } - if !is_locked_out && vote_threshold { + if !is_locked_out && vote_threshold && propagation_confirmed { info!("voting: {} {}", bank.slot(), fork_weight); ( selected_fork.clone(), @@ -1129,6 +1237,134 @@ impl ReplayStage { } } + fn update_fork_propagated_threshold_from_votes( + progress: &mut ProgressMap, + mut newly_voted_pubkeys: Vec>, + fork_tip: Slot, + bank_forks: &RwLock, + all_pubkeys: &mut HashSet>, + ) { + let mut current_leader_slot = progress.get_latest_leader_slot(fork_tip); + let mut did_newly_reach_threshold = false; + let root = bank_forks.read().unwrap().root(); + loop { + // These cases mean confirmation of propagation on any earlier + // leader blocks must have been reached + if current_leader_slot == None || current_leader_slot.unwrap() <= root { + break; + } + + let leader_propagated_stats = progress + .get_propagated_stats_mut(current_leader_slot.unwrap()) + .expect("current_leader_slot > root, so must exist in the progress map"); + + // If a descendant has reached propagation threshold, then + // all its ancestor banks have alsso reached propagation + // threshold as well (Validators can't have voted for a + // descendant without also getting the ancestor block) + if leader_propagated_stats.is_propagated || + // If there's no new validators to record, and there's no + // newly achieved threshold, then there's no further + // information to propagate backwards to past leader blocks + (newly_voted_pubkeys.is_empty() && !did_newly_reach_threshold) + { + break; + } + + // We only iterate through the list of leader slots by traversing + // the linked list of 'prev_leader_slot`'s outlined in the + // `progress` map + assert!(leader_propagated_stats.is_leader_slot); + let leader_bank = bank_forks + .read() + .unwrap() + .get(current_leader_slot.unwrap()) + .expect("Entry in progress map must exist in BankForks") + .clone(); + + did_newly_reach_threshold = Self::update_slot_propagated_threshold_from_votes( + &mut newly_voted_pubkeys, + &leader_bank, + leader_propagated_stats, + all_pubkeys, + did_newly_reach_threshold, + ) || did_newly_reach_threshold; + + // Now jump to process the previous leader slot + current_leader_slot = leader_propagated_stats.prev_leader_slot; + } + } + + fn update_slot_propagated_threshold_from_votes( + newly_voted_pubkeys: &mut Vec>, + leader_bank: &Bank, + leader_propagated_stats: &mut PropagatedStats, + all_pubkeys: &mut HashSet>, + did_child_reach_threshold: bool, + ) -> bool { + // Track whether this slot newly confirm propagation + // throughout the network (switched from is_propagated == false + // to is_propagated == true) + let mut did_newly_reach_threshold = false; + + // If a child of this slot confirmed propagation, then + // we can return early as this implies this slot must also + // be propagated + if did_child_reach_threshold { + if !leader_propagated_stats.is_propagated { + leader_propagated_stats.is_propagated = true; + return true; + } else { + return false; + } + } + + if leader_propagated_stats.is_propagated { + return false; + } + + // Remove the valdators that we already know voted for this slot + // Those validators are safe to drop because they don't to be ported back any + // further because parents must have: + // 1) Also recorded this validator already, or + // 2) Already reached the propagation threshold, in which case + // they no longer need to track the set of propagated validators + newly_voted_pubkeys.retain(|voting_pubkey| { + if !leader_propagated_stats + .propagated_validators + .contains(&**voting_pubkey) + { + let mut cached_pubkey: Option> = + all_pubkeys.get(&**voting_pubkey).cloned(); + if cached_pubkey.is_none() { + let new_pubkey = Rc::new(**voting_pubkey); + all_pubkeys.insert(new_pubkey.clone()); + cached_pubkey = Some(new_pubkey); + } + let voting_pubkey = cached_pubkey.unwrap(); + leader_propagated_stats + .propagated_validators + .insert(voting_pubkey.clone()); + leader_propagated_stats.propagated_validators_stake += + leader_bank.epoch_vote_account_stake(&voting_pubkey); + + if leader_propagated_stats.total_epoch_stake == 0 + || leader_propagated_stats.propagated_validators_stake as f64 + / leader_propagated_stats.total_epoch_stake as f64 + > SUPERMINORITY_THRESHOLD + { + leader_propagated_stats.is_propagated = true; + did_newly_reach_threshold = true + } + true + } else { + false + } + }); + + did_newly_reach_threshold + } + fn confirm_forks( tower: &Tower, stake_lockouts: &HashMap, @@ -1170,25 +1406,33 @@ impl ReplayStage { progress: &mut ProgressMap, accounts_hash_sender: &Option, earliest_vote_on_fork: &mut u64, + all_pubkeys: &mut HashSet>, ) { + let old_epoch = bank_forks.read().unwrap().root_bank().epoch(); bank_forks .write() .unwrap() .set_root(new_root, accounts_hash_sender); let r_bank_forks = bank_forks.read().unwrap(); + let new_epoch = bank_forks.read().unwrap().root_bank().epoch(); + if old_epoch != new_epoch { + all_pubkeys.retain(|x| Rc::strong_count(x) > 1); + } *earliest_vote_on_fork = std::cmp::max(new_root, *earliest_vote_on_fork); - progress.retain(|k, _| r_bank_forks.get(*k).is_some()); + progress.handle_new_root(&r_bank_forks); } fn generate_new_bank_forks( blockstore: &Blockstore, - forks_lock: &RwLock, + bank_forks: &RwLock, leader_schedule_cache: &Arc, subscriptions: &Arc, rewards_recorder_sender: Option, + progress: &mut ProgressMap, + all_pubkeys: &mut HashSet>, ) { // Find the next slot that chains to the old slot - let forks = forks_lock.read().unwrap(); + let forks = bank_forks.read().unwrap(); let frozen_banks = forks.frozen_banks(); let frozen_bank_slots: Vec = frozen_banks.keys().cloned().collect(); let next_slots = blockstore @@ -1228,12 +1472,26 @@ impl ReplayStage { &rewards_recorder_sender, subscriptions, ); + if let Some(leader_vote_accounts) = + child_bank.epoch_vote_accounts_for_node_id(&leader) + { + Self::update_fork_propagated_threshold_from_votes( + progress, + leader_vote_accounts + .vote_accounts + .iter() + .collect::>(), + parent_bank.slot(), + bank_forks, + all_pubkeys, + ); + } new_banks.insert(child_slot, child_bank); } } drop(forks); - let mut forks = forks_lock.write().unwrap(); + let mut forks = bank_forks.write().unwrap(); for (_, bank) in new_banks { forks.insert(bank); } @@ -1277,6 +1535,7 @@ pub(crate) mod tests { commitment::BlockCommitment, consensus::test::{initialize_state, VoteSimulator}, consensus::Tower, + progress_map::ValidatorStakeInfo, replay_stage::ReplayStage, transaction_status_service::TransactionStatusService, }; @@ -1293,9 +1552,11 @@ pub(crate) mod tests { SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD, }, }; - use solana_runtime::genesis_utils::{GenesisConfigInfo, ValidatorVoteKeypairs}; + use solana_runtime::genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}; use solana_sdk::{ account::Account, + clock::NUM_CONSECUTIVE_LEADER_SLOTS, + genesis_config, hash::{hash, Hash}, instruction::InstructionError, packet::PACKET_DATA_SIZE, @@ -1349,6 +1610,7 @@ pub(crate) mod tests { bank.store_account(&pubkey, &vote_account); } + let vote_tracker = VoteTracker::default(); let mut towers: Vec = iter::repeat_with(|| Tower::new_for_tests(8, 0.67)) .take(validators.len()) .collect(); @@ -1417,14 +1679,15 @@ pub(crate) mod tests { let mut bank_forks = BankForks::new(neutral_fork.fork[0], Bank::new(&genesis_config)); - let mut fork_progresses: Vec> = iter::repeat_with(HashMap::new) + let mut fork_progresses: Vec = iter::repeat_with(ProgressMap::default) .take(validators.len()) .collect(); for fork_progress in fork_progresses.iter_mut() { + let bank = &bank_forks.banks[&0]; fork_progress .entry(neutral_fork.fork[0]) - .or_insert_with(|| ForkProgress::new(bank_forks.banks[&0].last_blockhash())); + .or_insert_with(|| ForkProgress::new(bank.last_blockhash(), None, None)); } for index in 1..neutral_fork.fork.len() { @@ -1447,13 +1710,10 @@ pub(crate) mod tests { bank_forks.banks[&neutral_fork.fork[index]].freeze(); for fork_progress in fork_progresses.iter_mut() { + let bank = &bank_forks.banks[&neutral_fork.fork[index]]; fork_progress .entry(bank_forks.banks[&neutral_fork.fork[index]].slot()) - .or_insert_with(|| { - ForkProgress::new( - bank_forks.banks[&neutral_fork.fork[index]].last_blockhash(), - ) - }); + .or_insert_with(|| ForkProgress::new(bank.last_blockhash(), None, None)); } } @@ -1490,20 +1750,17 @@ pub(crate) mod tests { bank_forks.banks[&fork_info.fork[index]].freeze(); for fork_progress in fork_progresses.iter_mut() { + let bank = &bank_forks.banks[&fork_info.fork[index]]; fork_progress .entry(bank_forks.banks[&fork_info.fork[index]].slot()) - .or_insert_with(|| { - ForkProgress::new( - bank_forks.banks[&fork_info.fork[index]].last_blockhash(), - ) - }); + .or_insert_with(|| ForkProgress::new(bank.last_blockhash(), None, None)); } } } let bank_fork_ancestors = bank_forks.ancestors(); let wrapped_bank_fork = Arc::new(RwLock::new(bank_forks)); - + let mut all_pubkeys = HashSet::new(); (0..validators.len()) .map(|i| { let mut frozen_banks: Vec<_> = wrapped_bank_fork @@ -1519,6 +1776,9 @@ pub(crate) mod tests { &mut frozen_banks, &towers[i], &mut fork_progresses[i], + &vote_tracker, + &wrapped_bank_fork, + &mut all_pubkeys, ); let (heaviest_bank, _) = ReplayStage::select_forks( &frozen_banks, @@ -1531,7 +1791,7 @@ pub(crate) mod tests { None } else { let bank = heaviest_bank.unwrap(); - let stats = &fork_progresses[i].get(&bank.slot()).unwrap().fork_stats; + let stats = &fork_progresses[i].get_fork_stats(bank.slot()).unwrap(); Some(ForkSelectionResponse { slot: stats.slot, is_locked_out: stats.is_locked_out, @@ -1605,49 +1865,128 @@ pub(crate) mod tests { fn test_child_slots_of_same_parent() { let ledger_path = get_tmp_ledger_path!(); { + // Setup let blockstore = Arc::new( Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"), ); + let validator_voting_keypairs: Vec<_> = (0..20) + .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) + .collect(); - let genesis_config = create_genesis_config(10_000).genesis_config; + let validator_voting_keys: HashMap<_, _> = validator_voting_keypairs + .iter() + .map(|v| (v.node_keypair.pubkey(), v.vote_keypair.pubkey())) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + 100, + ); let bank0 = Bank::new(&genesis_config); + let mut progress = ProgressMap::default(); + progress.insert( + 0, + ForkProgress::new_from_bank(&bank0, bank0.collector_id(), &Pubkey::default(), None), + ); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0)); let exit = Arc::new(AtomicBool::new(false)); let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); - let bank_forks = BankForks::new(0, bank0); - bank_forks.working_bank().freeze(); + let mut bank_forks = BankForks::new(0, bank0); - // Insert shred for slot 1, generate new forks, check result - let (shreds, _) = make_slot_entries(1, 0, 8); - blockstore.insert_shreds(shreds, None, false).unwrap(); - assert!(bank_forks.get(1).is_none()); + // Insert a non-root bank so that the propagation logic will update this + // bank + let bank1 = Bank::new_from_parent( + bank_forks.get(0).unwrap(), + &leader_schedule_cache.slot_leader_at(1, None).unwrap(), + 1, + ); + progress.insert( + 1, + ForkProgress::new_from_bank( + &bank1, + bank1.collector_id(), + &validator_voting_keys.get(&bank1.collector_id()).unwrap(), + Some(0), + ), + ); + assert!(progress.get_propagated_stats(1).unwrap().is_leader_slot); + bank1.freeze(); + bank_forks.insert(bank1); let bank_forks = RwLock::new(bank_forks); - ReplayStage::generate_new_bank_forks( - &blockstore, - &bank_forks, - &leader_schedule_cache, - &subscriptions, - None, - ); - assert!(bank_forks.read().unwrap().get(1).is_some()); - // Insert shred for slot 3, generate new forks, check result - let (shreds, _) = make_slot_entries(2, 0, 8); + // Insert shreds for slot NUM_CONSECUTIVE_LEADER_SLOTS, + // chaining to slot 1 + let (shreds, _) = make_slot_entries(NUM_CONSECUTIVE_LEADER_SLOTS, 1, 8); blockstore.insert_shreds(shreds, None, false).unwrap(); - assert!(bank_forks.read().unwrap().get(2).is_none()); + assert!(bank_forks + .read() + .unwrap() + .get(NUM_CONSECUTIVE_LEADER_SLOTS) + .is_none()); ReplayStage::generate_new_bank_forks( &blockstore, &bank_forks, &leader_schedule_cache, &subscriptions, None, + &mut progress, + &mut HashSet::new(), ); - assert!(bank_forks.read().unwrap().get(1).is_some()); - assert!(bank_forks.read().unwrap().get(2).is_some()); - } + assert!(bank_forks + .read() + .unwrap() + .get(NUM_CONSECUTIVE_LEADER_SLOTS) + .is_some()); - let _ignored = remove_dir_all(&ledger_path); + // Insert shreds for slot 2 * NUM_CONSECUTIVE_LEADER_SLOTS, + // chaining to slot 1 + let (shreds, _) = make_slot_entries(2 * NUM_CONSECUTIVE_LEADER_SLOTS, 1, 8); + blockstore.insert_shreds(shreds, None, false).unwrap(); + assert!(bank_forks + .read() + .unwrap() + .get(2 * NUM_CONSECUTIVE_LEADER_SLOTS) + .is_none()); + ReplayStage::generate_new_bank_forks( + &blockstore, + &bank_forks, + &leader_schedule_cache, + &subscriptions, + None, + &mut progress, + &mut HashSet::new(), + ); + assert!(bank_forks + .read() + .unwrap() + .get(NUM_CONSECUTIVE_LEADER_SLOTS) + .is_some()); + assert!(bank_forks + .read() + .unwrap() + .get(2 * NUM_CONSECUTIVE_LEADER_SLOTS) + .is_some()); + + // // There are 20 equally staked acccounts, of which 3 have built + // banks above or at bank 1. Because 3/20 < SUPERMINORITY_THRESHOLD, + // we should see 3 validators in bank 1's propagated_validator set. + let expected_leader_slots = vec![ + 1, + NUM_CONSECUTIVE_LEADER_SLOTS, + 2 * NUM_CONSECUTIVE_LEADER_SLOTS, + ]; + for slot in expected_leader_slots { + let leader = leader_schedule_cache.slot_leader_at(slot, None).unwrap(); + let vote_key = validator_voting_keys.get(&leader).unwrap(); + assert!(progress + .get_propagated_stats(1) + .unwrap() + .propagated_validators + .contains(vote_key)); + } + } } #[test] @@ -1662,9 +2001,9 @@ pub(crate) mod tests { root, ); bank_forks.write().unwrap().insert(root_bank); - let mut progress = HashMap::new(); + let mut progress = ProgressMap::default(); for i in 0..=root { - progress.insert(i, ForkProgress::new(Hash::default())); + progress.insert(i, ForkProgress::new(Hash::default(), None, None)); } let mut earliest_vote_on_fork = root - 1; ReplayStage::handle_new_root( @@ -1673,6 +2012,7 @@ pub(crate) mod tests { &mut progress, &None, &mut earliest_vote_on_fork, + &mut HashSet::new(), ); assert_eq!(bank_forks.read().unwrap().root(), root); assert_eq!(progress.len(), 1); @@ -1686,6 +2026,7 @@ pub(crate) mod tests { &mut progress, &None, &mut earliest_vote_on_fork, + &mut HashSet::new(), ); assert_eq!(earliest_vote_on_fork, root + 1); } @@ -1916,11 +2257,11 @@ pub(crate) mod tests { } = create_genesis_config(1000); genesis_config.poh_config.hashes_per_tick = Some(2); let bank0 = Arc::new(Bank::new(&genesis_config)); - let mut progress = HashMap::new(); + let mut progress = ProgressMap::default(); let last_blockhash = bank0.last_blockhash(); let mut bank0_progress = progress .entry(bank0.slot()) - .or_insert_with(|| ForkProgress::new(last_blockhash)); + .or_insert_with(|| ForkProgress::new(last_blockhash, None, None)); let shreds = shred_to_insert(&mint_keypair, bank0.clone()); blockstore.insert_shreds(shreds, None, false).unwrap(); let res = ReplayStage::replay_blockstore_into_bank( @@ -2205,6 +2546,9 @@ pub(crate) mod tests { &mut frozen_banks, &tower, &mut progress, + &VoteTracker::default(), + &bank_forks, + &mut HashSet::new(), ); assert_eq!(newly_computed, vec![0]); // The only vote is in bank 1, and bank_forks does not currently contain @@ -2224,7 +2568,7 @@ pub(crate) mod tests { // Insert the bank that contains a vote for slot 0, which confirms slot 0 bank_forks.write().unwrap().insert(bank1); - progress.insert(1, ForkProgress::new(bank0.last_blockhash())); + progress.insert(1, ForkProgress::new(bank0.last_blockhash(), None, None)); let ancestors = bank_forks.read().unwrap().ancestors(); let mut frozen_banks: Vec<_> = bank_forks .read() @@ -2239,6 +2583,9 @@ pub(crate) mod tests { &mut frozen_banks, &tower, &mut progress, + &VoteTracker::default(), + &bank_forks, + &mut HashSet::new(), ); assert_eq!(newly_computed, vec![1]); @@ -2268,6 +2615,9 @@ pub(crate) mod tests { &mut frozen_banks, &tower, &mut progress, + &VoteTracker::default(), + &bank_forks, + &mut HashSet::new(), ); // No new stats should have been computed assert!(newly_computed.is_empty()); @@ -2323,21 +2673,511 @@ pub(crate) mod tests { &mut frozen_banks, &tower, &mut progress, + &VoteTracker::default(), + &bank_forks, + &mut HashSet::new(), ); frozen_banks.sort_by_key(|bank| bank.slot()); for pair in frozen_banks.windows(2) { - let first = progress - .get(&pair[0].slot()) - .unwrap() - .fork_stats - .fork_weight; - let second = progress - .get(&pair[1].slot()) - .unwrap() - .fork_stats - .fork_weight; + let first = progress.get_fork_stats(pair[0].slot()).unwrap().fork_weight; + let second = progress.get_fork_stats(pair[1].slot()).unwrap().fork_weight; assert!(second >= first); } } + + #[test] + fn test_update_slot_propagated_threshold_from_votes() { + let keypairs: HashMap<_, _> = iter::repeat_with(|| { + let node_keypair = Keypair::new(); + let vote_keypair = Keypair::new(); + let stake_keypair = Keypair::new(); + let node_pubkey = node_keypair.pubkey(); + ( + node_pubkey, + ValidatorVoteKeypairs::new(node_keypair, vote_keypair, stake_keypair), + ) + }) + .take(10) + .collect(); + + let vote_pubkeys: Vec<_> = keypairs + .values() + .map(|keys| keys.vote_keypair.pubkey()) + .collect(); + + let stake = 10_000; + let (bank_forks, _) = initialize_state(&keypairs, stake); + let root_bank = bank_forks.root_bank().clone(); + let mut propagated_stats = PropagatedStats { + total_epoch_stake: stake * 10, + ..PropagatedStats::default() + }; + + let mut all_pubkeys = HashSet::new(); + let mut child_reached_threshold = false; + for i in 0..10 { + propagated_stats.is_propagated = false; + let mut newly_voted_pubkeys = vote_pubkeys[..i].iter().cloned().map(Arc::new).collect(); + let did_newly_reach_threshold = + ReplayStage::update_slot_propagated_threshold_from_votes( + &mut newly_voted_pubkeys, + &root_bank, + &mut propagated_stats, + &mut all_pubkeys, + child_reached_threshold, + ); + + // Only the i'th voted pubkey should be new (everything else was + // inserted in previous iteration of the loop), so those redundant + // pubkeys should be filtered out + let added_pubkeys = { + if i == 0 { + vec![] + } else { + vec![Arc::new(vote_pubkeys[i - 1])] + } + }; + assert_eq!(newly_voted_pubkeys, added_pubkeys); + + // If we crossed the superminority threshold, then + // `did_newly_reach_threshold == true`, otherwise the + // threshold has not been reached + if i >= 4 { + assert!(propagated_stats.is_propagated); + assert!(did_newly_reach_threshold); + } else { + assert!(!propagated_stats.is_propagated); + assert!(!did_newly_reach_threshold); + } + } + + // Simulate a child slot seeing threshold (`child_reached_threshold` = true), + // then the parent should also be marked as having reached threshold, + // even if there are no new pubkeys to add (`newly_voted_pubkeys.is_empty()`) + propagated_stats = PropagatedStats { + total_epoch_stake: stake * 10, + ..PropagatedStats::default() + }; + propagated_stats.total_epoch_stake = stake * 10; + all_pubkeys = HashSet::new(); + child_reached_threshold = true; + let mut newly_voted_pubkeys: Vec> = vec![]; + + assert!(ReplayStage::update_slot_propagated_threshold_from_votes( + &mut newly_voted_pubkeys, + &root_bank, + &mut propagated_stats, + &mut all_pubkeys, + child_reached_threshold, + )); + + // If propagation already happened (propagated_stats.is_propagated = true), + // always returns false + propagated_stats = PropagatedStats { + total_epoch_stake: stake * 10, + ..PropagatedStats::default() + }; + propagated_stats.is_propagated = true; + all_pubkeys = HashSet::new(); + child_reached_threshold = true; + newly_voted_pubkeys = vec![]; + assert!(!ReplayStage::update_slot_propagated_threshold_from_votes( + &mut newly_voted_pubkeys, + &root_bank, + &mut propagated_stats, + &mut all_pubkeys, + child_reached_threshold, + )); + + child_reached_threshold = false; + assert!(!ReplayStage::update_slot_propagated_threshold_from_votes( + &mut newly_voted_pubkeys, + &root_bank, + &mut propagated_stats, + &mut all_pubkeys, + child_reached_threshold, + )); + } + + #[test] + fn test_update_propagation_status() { + // Create genesis stakers + let node_keypair = Keypair::new(); + let vote_keypair = Keypair::new(); + let stake_keypair = Keypair::new(); + let vote_pubkey = Arc::new(vote_keypair.pubkey()); + let mut keypairs = HashMap::new(); + + keypairs.insert( + node_keypair.pubkey(), + ValidatorVoteKeypairs::new(node_keypair, vote_keypair, stake_keypair), + ); + + let stake = 10_000; + let (mut bank_forks, mut progress_map) = initialize_state(&keypairs, stake); + + let bank0 = bank_forks.get(0).unwrap().clone(); + bank_forks.insert(Bank::new_from_parent(&bank0, &Pubkey::default(), 9)); + let bank9 = bank_forks.get(9).unwrap().clone(); + bank_forks.insert(Bank::new_from_parent(&bank9, &Pubkey::default(), 10)); + bank_forks.set_root(9, &None); + let total_epoch_stake = bank0.total_epoch_stake(); + + // Insert new ForkProgress for slot 10 and its + // previous leader slot 9 + progress_map.insert( + 10, + ForkProgress::new( + Hash::default(), + Some(9), + Some(ValidatorStakeInfo { + total_epoch_stake, + ..ValidatorStakeInfo::default() + }), + ), + ); + progress_map.insert( + 9, + ForkProgress::new( + Hash::default(), + Some(8), + Some(ValidatorStakeInfo { + total_epoch_stake, + ..ValidatorStakeInfo::default() + }), + ), + ); + + // Make sure is_propagated == false so that the propagation logic + // runs in `update_propagation_status` + assert!(!progress_map.is_propagated(10)); + + let vote_tracker = VoteTracker::new(&bank_forks.root_bank()); + vote_tracker.insert_vote(10, vote_pubkey.clone()); + ReplayStage::update_propagation_status( + &mut progress_map, + 10, + &mut HashSet::new(), + &RwLock::new(bank_forks), + &vote_tracker, + ); + + let propagated_stats = &progress_map.get(&10).unwrap().propagated_stats; + + // There should now be a cached reference to the VoteTracker for + // slot 10 + assert!(propagated_stats.slot_vote_tracker.is_some()); + + // Updates should have been consumed + assert!(propagated_stats + .slot_vote_tracker + .as_ref() + .unwrap() + .write() + .unwrap() + .get_updates() + .is_none()); + + // The voter should be recorded + assert!(propagated_stats + .propagated_validators + .contains(&*vote_pubkey)); + + assert_eq!(propagated_stats.propagated_validators_stake, stake); + } + + #[test] + fn test_chain_update_propagation_status() { + let keypairs: HashMap<_, _> = iter::repeat_with(|| { + let node_keypair = Keypair::new(); + let vote_keypair = Keypair::new(); + let stake_keypair = Keypair::new(); + let node_pubkey = node_keypair.pubkey(); + ( + node_pubkey, + ValidatorVoteKeypairs::new(node_keypair, vote_keypair, stake_keypair), + ) + }) + .take(10) + .collect(); + + let vote_pubkeys: Vec<_> = keypairs + .values() + .map(|keys| keys.vote_keypair.pubkey()) + .collect(); + + let stake_per_validator = 10_000; + let (mut bank_forks, mut progress_map) = initialize_state(&keypairs, stake_per_validator); + bank_forks.set_root(0, &None); + let total_epoch_stake = bank_forks.root_bank().total_epoch_stake(); + + // Insert new ForkProgress representing a slot for all slots 1..=num_banks. Only + // make even numbered ones leader slots + for i in 1..=10 { + let parent_bank = bank_forks.get(i - 1).unwrap().clone(); + let prev_leader_slot = ((i - 1) / 2) * 2; + bank_forks.insert(Bank::new_from_parent(&parent_bank, &Pubkey::default(), i)); + progress_map.insert( + i, + ForkProgress::new(Hash::default(), Some(prev_leader_slot), { + if i % 2 == 0 { + Some(ValidatorStakeInfo { + total_epoch_stake, + ..ValidatorStakeInfo::default() + }) + } else { + None + } + }), + ); + } + + let vote_tracker = VoteTracker::new(&bank_forks.root_bank()); + for vote_pubkey in &vote_pubkeys { + // Insert a vote for the last bank for each voter + vote_tracker.insert_vote(10, Arc::new(vote_pubkey.clone())); + } + + // The last bank should reach propagation threshold, and propagate it all + // the way back through earlier leader banks + ReplayStage::update_propagation_status( + &mut progress_map, + 10, + &mut HashSet::new(), + &RwLock::new(bank_forks), + &vote_tracker, + ); + + for i in 1..=10 { + let propagated_stats = &progress_map.get(&i).unwrap().propagated_stats; + // Only the even numbered ones were leader banks, so only + // those should have been updated + if i % 2 == 0 { + assert!(propagated_stats.is_propagated); + } else { + assert!(!propagated_stats.is_propagated); + } + } + } + + #[test] + fn test_chain_update_propagation_status2() { + let num_validators = 6; + let keypairs: HashMap<_, _> = iter::repeat_with(|| { + let node_keypair = Keypair::new(); + let vote_keypair = Keypair::new(); + let stake_keypair = Keypair::new(); + let node_pubkey = node_keypair.pubkey(); + ( + node_pubkey, + ValidatorVoteKeypairs::new(node_keypair, vote_keypair, stake_keypair), + ) + }) + .take(num_validators) + .collect(); + + let vote_pubkeys: Vec<_> = keypairs + .values() + .map(|keys| keys.vote_keypair.pubkey()) + .collect(); + + let stake_per_validator = 10_000; + let (mut bank_forks, mut progress_map) = initialize_state(&keypairs, stake_per_validator); + bank_forks.set_root(0, &None); + + let total_epoch_stake = num_validators as u64 * stake_per_validator; + + // Insert new ForkProgress representing a slot for all slots 1..=num_banks. Only + // make even numbered ones leader slots + for i in 1..=10 { + let parent_bank = bank_forks.get(i - 1).unwrap().clone(); + let prev_leader_slot = i - 1; + bank_forks.insert(Bank::new_from_parent(&parent_bank, &Pubkey::default(), i)); + let mut fork_progress = ForkProgress::new( + Hash::default(), + Some(prev_leader_slot), + Some(ValidatorStakeInfo { + total_epoch_stake, + ..ValidatorStakeInfo::default() + }), + ); + + let end_range = { + // The earlier slots are one pubkey away from reaching confirmation + if i < 5 { + 2 + } else { + // The later slots are two pubkeys away from reaching confirmation + 1 + } + }; + fork_progress.propagated_stats.propagated_validators = vote_pubkeys[0..end_range] + .iter() + .cloned() + .map(Rc::new) + .collect(); + fork_progress.propagated_stats.propagated_validators_stake = + end_range as u64 * stake_per_validator; + progress_map.insert(i, fork_progress); + } + + let vote_tracker = VoteTracker::new(&bank_forks.root_bank()); + // Insert a new vote + vote_tracker.insert_vote(10, Arc::new(vote_pubkeys[2].clone())); + + // The last bank should reach propagation threshold, and propagate it all + // the way back through earlier leader banks + ReplayStage::update_propagation_status( + &mut progress_map, + 10, + &mut HashSet::new(), + &RwLock::new(bank_forks), + &vote_tracker, + ); + + // Only the first 5 banks should have reached the threshold + for i in 1..=10 { + let propagated_stats = &progress_map.get(&i).unwrap().propagated_stats; + if i < 5 { + assert!(propagated_stats.is_propagated); + } else { + assert!(!propagated_stats.is_propagated); + } + } + } + + #[test] + fn test_check_propagation_for_start_leader() { + let mut progress_map = ProgressMap::default(); + let poh_slot = 5; + let parent_slot = 3; + + // If there is no previous leader slot (previous leader slot is None), + // should succeed + progress_map.insert(3, ForkProgress::new(Hash::default(), None, None)); + assert!(ReplayStage::check_propagation_for_start_leader( + poh_slot, + parent_slot, + &progress_map, + )); + + // If the parent was itself the leader, then requires propagation confirmation + progress_map.insert( + 3, + ForkProgress::new(Hash::default(), None, Some(ValidatorStakeInfo::default())), + ); + assert!(!ReplayStage::check_propagation_for_start_leader( + poh_slot, + parent_slot, + &progress_map, + )); + progress_map + .get_mut(&3) + .unwrap() + .propagated_stats + .is_propagated = true; + assert!(ReplayStage::check_propagation_for_start_leader( + poh_slot, + parent_slot, + &progress_map, + )); + // Now, set up the progress map to show that the previous leader slot of 5 is + // 2 (even though the parent is 3), so 2 needs to see propagation confirmation + // before we can start a leader for block 5 + progress_map.insert(3, ForkProgress::new(Hash::default(), Some(2), None)); + progress_map.insert( + 2, + ForkProgress::new(Hash::default(), None, Some(ValidatorStakeInfo::default())), + ); + + // Last leader slot has not seen propagation threshold, so should fail + assert!(!ReplayStage::check_propagation_for_start_leader( + poh_slot, + parent_slot, + &progress_map, + )); + + // If we set the is_propagated = true for the last leader slot, should + // allow the block to be generated + progress_map + .get_mut(&2) + .unwrap() + .propagated_stats + .is_propagated = true; + assert!(ReplayStage::check_propagation_for_start_leader( + poh_slot, + parent_slot, + &progress_map, + )); + + // If the root is 3, this filters out slot 2 from the progress map, + // which implies confirmation + let mut bank_forks = BankForks::new( + 3, + Bank::new(&genesis_config::create_genesis_config(10000).0), + ); + let bank5 = Bank::new_from_parent(bank_forks.get(3).unwrap(), &Pubkey::default(), 5); + bank_forks.insert(bank5); + + // Should purge only slot 2 from the progress map + progress_map.handle_new_root(&bank_forks); + + // Should succeed + assert!(ReplayStage::check_propagation_for_start_leader( + poh_slot, + parent_slot, + &progress_map, + )); + } + + #[test] + fn test_check_propagation_for_consecutive_start_leader() { + let mut progress_map = ProgressMap::default(); + let poh_slot = 4; + let mut parent_slot = 3; + + // Set up the progress map to show that the last leader slot of 4 is 3, + // which means 3 and 4 are consecutiive leader slots + progress_map.insert( + 3, + ForkProgress::new(Hash::default(), None, Some(ValidatorStakeInfo::default())), + ); + progress_map.insert( + 2, + ForkProgress::new(Hash::default(), None, Some(ValidatorStakeInfo::default())), + ); + + // If the last leader slot has not seen propagation threshold, but + // was the direct parent (implying consecutive leader slots), create + // the block regardless + assert!(ReplayStage::check_propagation_for_start_leader( + poh_slot, + parent_slot, + &progress_map, + )); + + // If propagation threshold was achieved on parent, block should + // also be created + progress_map + .get_mut(&3) + .unwrap() + .propagated_stats + .is_propagated = true; + assert!(ReplayStage::check_propagation_for_start_leader( + poh_slot, + parent_slot, + &progress_map, + )); + + parent_slot = 2; + + // Even thought 2 is also a leader slot, because it's not consecutive + // we still have to respect the propagation threshold check + assert!(!ReplayStage::check_propagation_for_start_leader( + poh_slot, + parent_slot, + &progress_map, + )); + } }