Fix progress map losing banks and recomputing stats (bp #7026) (#7033)

automerge
This commit is contained in:
mergify[bot]
2019-11-19 15:00:53 -08:00
committed by Grimes
parent 8545ca2cc5
commit e8db4997de

View File

@ -17,7 +17,7 @@ use solana_ledger::entry::{Entry, EntrySlice};
use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::snapshot_package::SnapshotPackageSender; use solana_ledger::snapshot_package::SnapshotPackageSender;
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_metrics::{datapoint_warn, inc_new_counter_info}; use solana_metrics::inc_new_counter_info;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::clock::Slot; use solana_sdk::clock::Slot;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
@ -85,6 +85,7 @@ struct ForkStats {
is_locked_out: bool, is_locked_out: bool,
stake_lockouts: HashMap<u64, StakeLockout>, stake_lockouts: HashMap<u64, StakeLockout>,
computed: bool, computed: bool,
confirmation_reported: bool,
} }
impl ReplaySlotStats { impl ReplaySlotStats {
@ -196,6 +197,10 @@ impl ReplayStage {
.spawn(move || { .spawn(move || {
let _exit = Finalizer::new(exit_.clone()); let _exit = Finalizer::new(exit_.clone());
let mut progress = HashMap::new(); let mut progress = HashMap::new();
// Initialize progress map with any root banks
for bank in bank_forks.read().unwrap().frozen_banks().values() {
progress.insert(bank.slot(), ForkProgress::new(bank.slot(), bank.last_blockhash()));
}
let mut current_leader = None; let mut current_leader = None;
let mut last_reset = Hash::default(); let mut last_reset = Hash::default();
let mut partition = false; let mut partition = false;
@ -447,12 +452,9 @@ impl ReplayStage {
fn replay_blocktree_into_bank( fn replay_blocktree_into_bank(
bank: &Arc<Bank>, bank: &Arc<Bank>,
blocktree: &Blocktree, blocktree: &Blocktree,
progress: &mut HashMap<u64, ForkProgress>, bank_progress: &mut ForkProgress,
) -> (Result<()>, usize) { ) -> (Result<()>, usize) {
let mut tx_count = 0; let mut tx_count = 0;
let bank_progress = &mut progress
.entry(bank.slot())
.or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash()));
let now = Instant::now(); let now = Instant::now();
let load_result = let load_result =
Self::load_blocktree_entries_with_shred_count(bank, blocktree, bank_progress); Self::load_blocktree_entries_with_shred_count(bank, blocktree, bank_progress);
@ -485,18 +487,14 @@ impl ReplayStage {
("error", format!("error: {:?}", replay_result), String), ("error", format!("error: {:?}", replay_result), String),
("slot", bank.slot(), i64) ("slot", bank.slot(), i64)
); );
Self::mark_dead_slot(bank.slot(), blocktree, progress); Self::mark_dead_slot(bank.slot(), blocktree, bank_progress);
} }
(replay_result, tx_count) (replay_result, tx_count)
} }
fn mark_dead_slot(slot: u64, blocktree: &Blocktree, progress: &mut HashMap<u64, ForkProgress>) { fn mark_dead_slot(slot: Slot, blocktree: &Blocktree, bank_progress: &mut ForkProgress) {
// Remove from progress map so we no longer try to replay this bank bank_progress.is_dead = true;
let mut progress_entry = progress
.get_mut(&slot)
.expect("Progress entry must exist after call to replay_entries_into_bank()");
progress_entry.is_dead = true;
blocktree blocktree
.set_dead_slot(slot) .set_dead_slot(slot)
.expect("Failed to mark slot as dead in blocktree"); .expect("Failed to mark slot as dead in blocktree");
@ -638,9 +636,16 @@ impl ReplayStage {
} }
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
// Insert a progress entry even for slots this node is the leader for, so that
// 1) confirm_forks can report confirmation, 2) we can cache computations about
// this bank in `select_fork()`
let bank_progress = &mut progress
.entry(bank.slot())
.or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash()));
if bank.collector_id() != my_pubkey { if bank.collector_id() != my_pubkey {
let (replay_result, replay_tx_count) = let (replay_result, replay_tx_count) =
Self::replay_blocktree_into_bank(&bank, &blocktree, progress); Self::replay_blocktree_into_bank(&bank, &blocktree, bank_progress);
tx_count += replay_tx_count; tx_count += replay_tx_count;
if Self::is_replay_result_fatal(&replay_result) { if Self::is_replay_result_fatal(&replay_result) {
trace!("replay_result_fatal slot {}", bank_slot); trace!("replay_result_fatal slot {}", bank_slot);
@ -691,10 +696,15 @@ impl ReplayStage {
let stats: Vec<ForkStats> = frozen_banks let stats: Vec<ForkStats> = frozen_banks
.iter() .iter()
.map(|bank| { .map(|bank| {
// Only time progress map should be missing a bank slot
// is if this node was the leader for this slot as those banks
// are not replayed in replay_active_banks()
let mut stats = progress let mut stats = progress
.get(&bank.slot()) .get(&bank.slot())
.map(|s| s.fork_stats.clone()) .expect("All frozen banks must exist in the Progress map")
.unwrap_or_default(); .fork_stats
.clone();
if !stats.computed { if !stats.computed {
stats.slot = bank.slot(); stats.slot = bank.slot();
let (stake_lockouts, total_staked) = tower.collect_vote_lockouts( let (stake_lockouts, total_staked) = tower.collect_vote_lockouts(
@ -722,9 +732,10 @@ impl ReplayStage {
stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors); stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors);
stats.has_voted = tower.has_voted(bank.slot()); stats.has_voted = tower.has_voted(bank.slot());
stats.is_recent = tower.is_recent(bank.slot()); stats.is_recent = tower.is_recent(bank.slot());
if let Some(fp) = progress.get_mut(&bank.slot()) { progress
fp.fork_stats = stats.clone(); .get_mut(&bank.slot())
} .expect("All frozen banks must exist in the Progress map")
.fork_stats = stats.clone();
stats stats
}) })
.collect(); .collect();
@ -823,29 +834,30 @@ impl ReplayStage {
progress: &mut HashMap<u64, ForkProgress>, progress: &mut HashMap<u64, ForkProgress>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) { ) {
progress.retain(|slot, prog| { for (slot, prog) in progress.iter_mut() {
let duration = timing::timestamp() - prog.started_ms; if !prog.fork_stats.confirmation_reported {
if tower.is_slot_confirmed(*slot, stake_lockouts, total_staked) let duration = timing::timestamp() - prog.started_ms;
&& bank_forks if tower.is_slot_confirmed(*slot, stake_lockouts, total_staked)
.read() && bank_forks
.unwrap() .read()
.get(*slot) .unwrap()
.map(|s| s.is_frozen()) .get(*slot)
.unwrap_or(true) .map(|s| s.is_frozen())
{ .unwrap_or(true)
info!("validator fork confirmed {} {}ms", *slot, duration); {
datapoint_warn!("validator-confirmation", ("duration_ms", duration, i64)); info!("validator fork confirmed {} {}ms", *slot, duration);
false datapoint_warn!("validatorconfirmation", ("duration_ms", duration, i64));
} else { prog.fork_stats.confirmation_reported = true;
debug!( } else {
"validator fork not confirmed {} {}ms {:?}", debug!(
*slot, "validator fork not confirmed {} {}ms {:?}",
duration, *slot,
stake_lockouts.get(slot) duration,
); stake_lockouts.get(slot)
true );
}
} }
}); }
} }
fn load_blocktree_entries_with_shred_count( fn load_blocktree_entries_with_shred_count(
@ -1163,11 +1175,13 @@ mod test {
let bank0 = Arc::new(Bank::new(&genesis_block)); let bank0 = Arc::new(Bank::new(&genesis_block));
let mut progress = HashMap::new(); let mut progress = HashMap::new();
let last_blockhash = bank0.last_blockhash(); let last_blockhash = bank0.last_blockhash();
progress.insert(bank0.slot(), ForkProgress::new(0, last_blockhash)); let mut bank0_progress = progress
.entry(bank0.slot())
.or_insert_with(|| ForkProgress::new(0, last_blockhash));
let shreds = shred_to_insert(&mint_keypair, &last_blockhash, bank0.slot()); let shreds = shred_to_insert(&mint_keypair, &last_blockhash, bank0.slot());
blocktree.insert_shreds(shreds, None, false).unwrap(); blocktree.insert_shreds(shreds, None, false).unwrap();
let (res, _tx_count) = let (res, _tx_count) =
ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress); ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut bank0_progress);
// Check that the erroring bank was marked as dead in the progress map // Check that the erroring bank was marked as dead in the progress map
assert!(progress assert!(progress