From e27d6d098898fc1b4063cea91181cde7a4992599 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 27 Mar 2019 04:30:26 -0700 Subject: [PATCH] validator confirmation --- core/src/locktower.rs | 46 ++++++++++++++++++++++ core/src/replay_stage.rs | 82 ++++++++++++++++++++++++++++++---------- 2 files changed, 108 insertions(+), 20 deletions(-) diff --git a/core/src/locktower.rs b/core/src/locktower.rs index 8c229addcd..74313129ca 100644 --- a/core/src/locktower.rs +++ b/core/src/locktower.rs @@ -173,6 +173,15 @@ impl Locktower { stake_lockouts } + pub fn is_slot_confirmed(&self, slot: u64, lockouts: &HashMap) -> bool { + lockouts + .get(&slot) + .map(|lockout| { + (lockout.stake as f64 / self.epoch_stakes.total_staked as f64) > self.threshold_size + }) + .unwrap_or(false) + } + pub fn is_recent_epoch(&self, bank: &Bank) -> bool { let bank_epoch = bank.get_epoch_and_slot_index(bank.slot()).0; bank_epoch >= self.epoch_stakes.slot @@ -491,6 +500,43 @@ mod test { assert!(locktower.check_vote_stake_threshold(0, &stakes)); } + #[test] + fn test_is_slot_confirmed_not_enough_stake_failure() { + let locktower = Locktower::new(EpochStakes::new_for_tests(2), 1, 0.67); + let stakes = vec![( + 0, + StakeLockout { + stake: 1, + lockout: 8, + }, + )] + .into_iter() + .collect(); + assert!(!locktower.is_slot_confirmed(0, &stakes)); + } + + #[test] + fn test_is_slot_confirmed_unknown_slot() { + let locktower = Locktower::new(EpochStakes::new_for_tests(2), 1, 0.67); + let stakes = HashMap::new(); + assert!(!locktower.is_slot_confirmed(0, &stakes)); + } + + #[test] + fn test_is_slot_confirmed_pass() { + let locktower = Locktower::new(EpochStakes::new_for_tests(2), 1, 0.67); + let stakes = vec![( + 0, + StakeLockout { + stake: 2, + lockout: 8, + }, + )] + .into_iter() + .collect(); + assert!(locktower.is_slot_confirmed(0, &stakes)); + } + #[test] fn test_is_locked_out_empty() { let locktower = Locktower::new(EpochStakes::new_for_tests(2), 0, 0.67); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 213d16aa79..5a2a1833af 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -6,12 +6,13 @@ use crate::blocktree_processor; use crate::cluster_info::ClusterInfo; use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; use crate::leader_schedule_utils; -use crate::locktower::Locktower; +use crate::locktower::{Locktower, StakeLockout}; use crate::packet::BlobError; use crate::poh_recorder::PohRecorder; use crate::result; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; +use hashbrown::HashMap; use solana_metrics::counter::Counter; use solana_metrics::influxdb; use solana_runtime::bank::Bank; @@ -21,7 +22,6 @@ use solana_sdk::signature::KeypairUtil; use solana_sdk::timing::{self, duration_as_ms}; use solana_sdk::transaction::Transaction; use solana_vote_api::vote_instruction::{Vote, VoteInstruction}; -use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex, RwLock}; @@ -53,6 +53,24 @@ pub struct ReplayStage { t_replay: JoinHandle>, } +#[derive(Default)] +struct ForkProgress { + last_entry: Hash, + num_blobs: usize, + started_ms: u64, + supermajority_confirmed_ms: u64, +} +impl ForkProgress { + pub fn new(last_entry: Hash) -> Self { + Self { + last_entry, + num_blobs: 0, + started_ms: timing::timestamp(), + supermajority_confirmed_ms: 0, + } + } +} + impl ReplayStage { #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] pub fn new( @@ -115,7 +133,8 @@ impl ReplayStage { ticks_per_slot = bank.ticks_per_slot(); } - let votable = Self::generate_votable_banks(&bank_forks, &locktower); + let votable = + Self::generate_votable_banks(&bank_forks, &locktower, &mut progress); if let Some((_, bank)) = votable.last() { subscriptions.notify_subscribers(&bank); @@ -265,10 +284,10 @@ impl ReplayStage { }); } } - pub fn replay_blocktree_into_bank( + fn replay_blocktree_into_bank( bank: &Bank, blocktree: &Blocktree, - progress: &mut HashMap, + progress: &mut HashMap, forward_entry_sender: &EntrySender, ) -> result::Result<()> { let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?; @@ -290,7 +309,7 @@ impl ReplayStage { bank: &Arc, bank_forks: &Arc>, locktower: &mut Locktower, - progress: &mut HashMap, + progress: &mut HashMap, voting_keypair: &Option>, vote_account: &Pubkey, cluster_info: &Arc>, @@ -341,7 +360,7 @@ impl ReplayStage { bank_forks: &Arc>, my_id: &Pubkey, ticks_per_slot: &mut u64, - progress: &mut HashMap, + progress: &mut HashMap, forward_entry_sender: &EntrySender, slot_full_sender: &Sender<(u64, Pubkey)>, ) -> result::Result<()> { @@ -370,6 +389,7 @@ impl ReplayStage { fn generate_votable_banks( bank_forks: &Arc>, locktower: &Locktower, + progress: &mut HashMap, ) -> Vec<(u128, Arc)> { let locktower_start = Instant::now(); // Locktower voting @@ -409,6 +429,7 @@ impl ReplayStage { .filter(|(b, stake_lockouts)| { let vote_threshold = locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts); + Self::confirm_forks(locktower, stake_lockouts, progress); debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold); vote_threshold }) @@ -434,32 +455,53 @@ impl ReplayStage { votable } - pub fn load_blocktree_entries( + fn confirm_forks( + locktower: &Locktower, + stake_lockouts: &HashMap, + progress: &mut HashMap, + ) { + for (slot, prog) in progress.iter_mut() { + if prog.supermajority_confirmed_ms == 0 + && locktower.is_slot_confirmed(*slot, stake_lockouts) + { + prog.supermajority_confirmed_ms = timing::timestamp(); + let duration = prog.supermajority_confirmed_ms - prog.started_ms; + info!("fork confirmed {} {}", *slot, duration); + solana_metrics::submit( + influxdb::Point::new(&"validator-confirmation") + .add_field("duration_ms", influxdb::Value::Integer(duration as i64)) + .to_owned(), + ); + } + } + } + + fn load_blocktree_entries( bank: &Bank, blocktree: &Blocktree, - progress: &mut HashMap, + progress: &mut HashMap, ) -> result::Result<(Vec, usize)> { let bank_slot = bank.slot(); let bank_progress = &mut progress .entry(bank_slot) - .or_insert((bank.last_blockhash(), 0)); - blocktree.get_slot_entries_with_blob_count(bank_slot, bank_progress.1 as u64, None) + .or_insert(ForkProgress::new(bank.last_blockhash())); + blocktree.get_slot_entries_with_blob_count(bank_slot, bank_progress.num_blobs as u64, None) } - pub fn replay_entries_into_bank( + fn replay_entries_into_bank( bank: &Bank, entries: Vec, - progress: &mut HashMap, + progress: &mut HashMap, forward_entry_sender: &EntrySender, num: usize, ) -> result::Result<()> { let bank_progress = &mut progress .entry(bank.slot()) - .or_insert((bank.last_blockhash(), 0)); - let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.0); - bank_progress.1 += num; + .or_insert(ForkProgress::new(bank.last_blockhash())); + let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.last_entry); + bank_progress.num_blobs += num; if let Some(last_entry) = entries.last() { - bank_progress.0 = last_entry.hash; + bank_progress.last_entry = last_entry.hash; } if result.is_ok() { forward_entry_sender.send(entries)?; @@ -489,7 +531,7 @@ impl ReplayStage { fn handle_new_root( bank_forks: &Arc>, - progress: &mut HashMap, + progress: &mut HashMap, ) { let r_bank_forks = bank_forks.read().unwrap(); progress.retain(|k, _| r_bank_forks.get(*k).is_some()); @@ -498,7 +540,7 @@ impl ReplayStage { fn process_completed_bank( my_id: &Pubkey, bank: Arc, - progress: &mut HashMap, + progress: &mut HashMap, slot_full_sender: &Sender<(u64, Pubkey)>, ) { bank.freeze(); @@ -736,7 +778,7 @@ mod test { let bank0 = Bank::new(&genesis_block); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0))); let mut progress = HashMap::new(); - progress.insert(5, (Hash::default(), 0)); + progress.insert(5, ForkProgress::new(Hash::default())); ReplayStage::handle_new_root(&bank_forks, &mut progress); assert!(progress.is_empty()); }