Persistent tower (#10718)

* Save/restore Tower

* Avoid unwrap()

* Rebase cleanups

* Forcibly pass test

* Correct reconcilation of votes after validator resume

* d b g

* Add more tests

* fsync and fix test

* Add test

* Fix fmt

* Debug

* Fix tests...

* save

* Clarify error message and code cleaning around it

* Move most of code out of tower save hot codepath

* Proper comment for the lack of fsync on tower

* Clean up

* Clean up

* Simpler type alias

* Manage tower-restored ancestor slots without banks

* Add comment

* Extract long code blocks...

* Add comment

* Simplify returned tuple...

* Tweak too aggresive log

* Fix typo...

* Add test

* Update comment

* Improve test to require non-empty stray restored slots

* Measure tower save and dump all tower contents

* Log adjust and add threshold related assertions

* cleanup adjust

* Properly lower stray restored slots priority...

* Rust fmt

* Fix test....

* Clarify comments a bit and add TowerError::TooNew

* Further clean-up arround TowerError

* Truly create ancestors by excluding last vote slot

* Add comment for stray_restored_slots

* Add comment for stray_restored_slots

* Use BTreeSet

* Consider root_slot into post-replay adjustment

* Tweak logging

* Add test for stray_restored_ancestors

* Reorder some code

* Better names for unit tests

* Add frozen_abi to SavedTower

* Fold long lines

* Tweak stray ancestors and too old slot history

* Re-adjust error conditon of too old slot history

* Test normal ancestors is checked before stray ones

* Fix conflict, update tests, adjust behavior a bit

* Fix test

* Address review comments

* Last touch!

* Immediately after creating cleaning pr

* Revert stray slots

* Revert comment...

* Report error as metrics

* Revert not to panic! and ignore unfixable test...

* Normalize lockouts.root_slot more strictly

* Add comments for panic! and more assertions

* Proper initialize root without vote account

* Clarify code and comments based on review feedback

* Fix rebase

* Further simplify based on assured tower root

* Reorder code for more readability

Co-authored-by: Michael Vines <mvines@gmail.com>
This commit is contained in:
Ryo Onodera
2020-09-19 14:03:54 +09:00
committed by GitHub
parent 28f2c15597
commit cb8661bd49
15 changed files with 1712 additions and 106 deletions

File diff suppressed because it is too large Load Diff

View File

@ -491,6 +491,44 @@ impl HeaviestSubtreeForkChoice {
);
}
fn heaviest_slot_on_same_voted_fork(&self, tower: &Tower) -> Option<Slot> {
tower
.last_voted_slot()
.map(|last_voted_slot| {
let heaviest_slot_on_same_voted_fork = self.best_slot(last_voted_slot);
if heaviest_slot_on_same_voted_fork.is_none() {
if !tower.is_stray_last_vote() {
// Unless last vote is stray, self.bast_slot(last_voted_slot) must return
// Some(_), justifying to panic! here.
// Also, adjust_lockouts_after_replay() correctly makes last_voted_slot None,
// if all saved votes are ancestors of replayed_root_slot. So this code shouldn't be
// touched in that case as well.
// In other words, except being stray, all other slots have been voted on while this
// validator has been running, so we must be able to fetch best_slots for all of
// them.
panic!(
"a bank at last_voted_slot({}) is a frozen bank so must have been\
added to heaviest_subtree_fork_choice at time of freezing",
last_voted_slot,
)
} else {
// fork_infos doesn't have corresponding data for the stray restored last vote,
// meaning some inconsistency between saved tower and ledger.
// (newer snapshot, or only a saved tower is moved over to new setup?)
return None;
}
}
let heaviest_slot_on_same_voted_fork = heaviest_slot_on_same_voted_fork.unwrap();
if heaviest_slot_on_same_voted_fork == last_voted_slot {
None
} else {
Some(heaviest_slot_on_same_voted_fork)
}
})
.unwrap_or(None)
}
#[cfg(test)]
fn set_stake_voted_at(&mut self, slot: Slot, stake_voted_at: u64) {
self.fork_infos.get_mut(&slot).unwrap().stake_voted_at = stake_voted_at;
@ -550,26 +588,17 @@ impl ForkChoice for HeaviestSubtreeForkChoice {
_ancestors: &HashMap<u64, HashSet<u64>>,
bank_forks: &RwLock<BankForks>,
) -> (Arc<Bank>, Option<Arc<Bank>>) {
let last_voted_slot = tower.last_voted_slot();
let heaviest_slot_on_same_voted_fork = last_voted_slot.map(|last_voted_slot| {
let heaviest_slot_on_same_voted_fork =
self.best_slot(last_voted_slot).expect("a bank at last_voted_slot is a frozen bank so must have been added to heaviest_subtree_fork_choice at time of freezing");
if heaviest_slot_on_same_voted_fork == last_voted_slot {
None
} else {
Some(heaviest_slot_on_same_voted_fork)
}
}).unwrap_or(None);
let heaviest_slot = self.best_overall_slot();
let r_bank_forks = bank_forks.read().unwrap();
(
r_bank_forks.get(heaviest_slot).unwrap().clone(),
heaviest_slot_on_same_voted_fork.map(|heaviest_slot_on_same_voted_fork| {
r_bank_forks
.get(heaviest_slot_on_same_voted_fork)
.unwrap()
.clone()
}),
r_bank_forks.get(self.best_overall_slot()).unwrap().clone(),
self.heaviest_slot_on_same_voted_fork(tower)
.map(|heaviest_slot_on_same_voted_fork| {
r_bank_forks
.get(heaviest_slot_on_same_voted_fork)
.unwrap()
.clone()
}),
)
}
}
@ -611,6 +640,7 @@ mod test {
use super::*;
use crate::consensus::test::VoteSimulator;
use solana_runtime::{bank::Bank, bank_utils};
use solana_sdk::{hash::Hash, slot_history::SlotHistory};
use std::{collections::HashSet, ops::Range};
use trees::tr;
@ -1490,6 +1520,48 @@ mod test {
assert!(heaviest_subtree_fork_choice.subtree_diff(0, 6).is_empty());
}
#[test]
fn test_stray_restored_slot() {
let forks = tr(0) / (tr(1) / tr(2));
let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_tree(forks);
let mut tower = Tower::new_for_tests(10, 0.9);
tower.record_vote(1, Hash::default());
assert_eq!(tower.is_stray_last_vote(), false);
assert_eq!(
heaviest_subtree_fork_choice.heaviest_slot_on_same_voted_fork(&tower),
Some(2)
);
// Make slot 1 (existing in bank_forks) a restored stray slot
let mut slot_history = SlotHistory::default();
slot_history.add(0);
// Work around TooOldSlotHistory
slot_history.add(999);
tower = tower
.adjust_lockouts_after_replay(0, &slot_history)
.unwrap();
assert_eq!(tower.is_stray_last_vote(), true);
assert_eq!(
heaviest_subtree_fork_choice.heaviest_slot_on_same_voted_fork(&tower),
Some(2)
);
// Make slot 3 (NOT existing in bank_forks) a restored stray slot
tower.record_vote(3, Hash::default());
tower = tower
.adjust_lockouts_after_replay(0, &slot_history)
.unwrap();
assert_eq!(tower.is_stray_last_vote(), true);
assert_eq!(
heaviest_subtree_fork_choice.heaviest_slot_on_same_voted_fork(&tower),
None
);
}
fn setup_forks() -> HeaviestSubtreeForkChoice {
/*
Build fork structure:

View File

@ -219,6 +219,7 @@ impl ReplayStage {
cluster_info: Arc<ClusterInfo>,
ledger_signal_receiver: Receiver<bool>,
poh_recorder: Arc<Mutex<PohRecorder>>,
mut tower: Tower,
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: RetransmitSlotsSender,
@ -255,53 +256,16 @@ impl ReplayStage {
let mut all_pubkeys = PubkeyReferences::default();
let verify_recyclers = VerifyRecyclers::default();
let _exit = Finalizer::new(exit.clone());
let mut progress = ProgressMap::default();
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
.frozen_banks()
.values()
.cloned()
.collect();
frozen_banks.sort_by_key(|bank| bank.slot());
// Initialize progress map with any root banks
for bank in &frozen_banks {
let prev_leader_slot = progress.get_bank_prev_leader_slot(bank);
progress.insert(
bank.slot(),
ForkProgress::new_from_bank(
bank,
&my_pubkey,
&vote_account,
prev_leader_slot,
0,
0,
),
);
}
let root_bank = bank_forks.read().unwrap().root_bank().clone();
let root = root_bank.slot();
let unlock_heaviest_subtree_fork_choice_slot =
Self::get_unlock_heaviest_subtree_fork_choice(root_bank.cluster_type());
let mut heaviest_subtree_fork_choice =
HeaviestSubtreeForkChoice::new_from_frozen_banks(root, &frozen_banks);
let (
mut progress,
mut heaviest_subtree_fork_choice,
unlock_heaviest_subtree_fork_choice_slot,
) = Self::initialize_progress_and_fork_choice_with_locked_bank_forks(
&bank_forks,
&my_pubkey,
&vote_account,
);
let mut bank_weight_fork_choice = BankWeightForkChoice::default();
let heaviest_bank = if root > unlock_heaviest_subtree_fork_choice_slot {
bank_forks
.read()
.unwrap()
.get(heaviest_subtree_fork_choice.best_overall_slot())
.expect(
"The best overall slot must be one of `frozen_banks` which all
exist in bank_forks",
)
.clone()
} else {
Tower::find_heaviest_bank(&bank_forks, &my_pubkey).unwrap_or(root_bank)
};
let mut tower = Tower::new(&my_pubkey, &vote_account, root, &heaviest_bank);
let mut current_leader = None;
let mut last_reset = Hash::default();
let mut partition_exists = false;
@ -652,6 +616,65 @@ impl ReplayStage {
.unwrap_or(true)
}
fn initialize_progress_and_fork_choice_with_locked_bank_forks(
bank_forks: &RwLock<BankForks>,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
) -> (ProgressMap, HeaviestSubtreeForkChoice, Slot) {
let (root_bank, frozen_banks) = {
let bank_forks = bank_forks.read().unwrap();
(
bank_forks.root_bank().clone(),
bank_forks.frozen_banks().values().cloned().collect(),
)
};
Self::initialize_progress_and_fork_choice(
&root_bank,
frozen_banks,
&my_pubkey,
&vote_account,
)
}
pub(crate) fn initialize_progress_and_fork_choice(
root_bank: &Arc<Bank>,
mut frozen_banks: Vec<Arc<Bank>>,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
) -> (ProgressMap, HeaviestSubtreeForkChoice, Slot) {
let mut progress = ProgressMap::default();
frozen_banks.sort_by_key(|bank| bank.slot());
// Initialize progress map with any root banks
for bank in &frozen_banks {
let prev_leader_slot = progress.get_bank_prev_leader_slot(bank);
progress.insert(
bank.slot(),
ForkProgress::new_from_bank(
bank,
&my_pubkey,
&vote_account,
prev_leader_slot,
0,
0,
),
);
}
let root = root_bank.slot();
let unlock_heaviest_subtree_fork_choice_slot =
Self::get_unlock_heaviest_subtree_fork_choice(root_bank.cluster_type());
let heaviest_subtree_fork_choice =
HeaviestSubtreeForkChoice::new_from_frozen_banks(root, &frozen_banks);
(
progress,
heaviest_subtree_fork_choice,
unlock_heaviest_subtree_fork_choice_slot,
)
}
fn report_memory(
allocated: &solana_measure::thread_mem_usage::Allocatedp,
name: &'static str,
@ -1015,7 +1038,15 @@ impl ReplayStage {
}
trace!("handle votable bank {}", bank.slot());
let (vote, tower_index) = tower.new_vote_from_bank(bank, vote_account_pubkey);
if let Some(new_root) = tower.record_bank_vote(vote) {
let new_root = tower.record_bank_vote(vote);
let last_vote = tower.last_vote_and_timestamp();
if let Err(err) = tower.save(&cluster_info.keypair) {
error!("Unable to save tower: {:?}", err);
std::process::exit(1);
}
if let Some(new_root) = new_root {
// get the root bank before squash
let root_bank = bank_forks
.read()
@ -1075,7 +1106,7 @@ impl ReplayStage {
bank,
vote_account_pubkey,
authorized_voter_keypairs,
tower.last_vote_and_timestamp(),
last_vote,
tower_index,
switch_fork_decision,
);

View File

@ -10,6 +10,7 @@ use crate::{
cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker},
cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender,
consensus::Tower,
ledger_cleanup_service::LedgerCleanupService,
poh_recorder::PohRecorder,
replay_stage::{ReplayStage, ReplayStageConfig},
@ -90,6 +91,7 @@ impl Tvu {
ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
tower: Tower,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver,
@ -203,6 +205,7 @@ impl Tvu {
cluster_info.clone(),
ledger_signal_receiver,
poh_recorder.clone(),
tower,
vote_tracker,
cluster_slots,
retransmit_slots_sender,
@ -301,6 +304,7 @@ pub mod tests {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let tower = Tower::new_with_key(&target1_keypair.pubkey());
let tvu = Tvu::new(
&vote_keypair.pubkey(),
vec![Arc::new(vote_keypair)],
@ -322,6 +326,7 @@ pub mod tests {
block_commitment_cache.clone(),
)),
&poh_recorder,
tower,
&leader_schedule_cache,
&exit,
completed_slots_receiver,

View File

@ -6,6 +6,7 @@ use crate::{
cluster_info::{ClusterInfo, Node},
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
consensus::{reconcile_blockstore_roots_with_tower, Tower, TowerError},
contact_info::ContactInfo,
gossip_service::{discover_cluster, GossipService},
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
@ -95,6 +96,7 @@ pub struct ValidatorConfig {
pub accounts_hash_interval_slots: u64,
pub max_genesis_archive_unpacked_size: u64,
pub wal_recovery_mode: Option<BlockstoreRecoveryMode>,
pub require_tower: bool,
}
impl Default for ValidatorConfig {
@ -125,6 +127,7 @@ impl Default for ValidatorConfig {
accounts_hash_interval_slots: std::u64::MAX,
max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
wal_recovery_mode: None,
require_tower: false,
}
}
}
@ -253,7 +256,8 @@ impl Validator {
cache_block_time_sender,
cache_block_time_service,
},
) = new_banks_from_ledger(config, ledger_path, poh_verify, &exit);
tower,
) = new_banks_from_ledger(&id, vote_account, config, ledger_path, poh_verify, &exit);
let leader_schedule_cache = Arc::new(leader_schedule_cache);
let bank = bank_forks.working_bank();
@ -475,6 +479,7 @@ impl Validator {
ledger_signal_receiver,
&subscriptions,
&poh_recorder,
tower,
&leader_schedule_cache,
&exit,
completed_slots_receiver,
@ -613,8 +618,81 @@ impl Validator {
}
}
fn active_vote_account_exists_in_bank(bank: &Arc<Bank>, vote_account: &Pubkey) -> bool {
if let Some(account) = &bank.get_account(vote_account) {
if let Some(vote_state) = VoteState::from(&account) {
return !vote_state.votes.is_empty();
}
}
false
}
fn post_process_restored_tower(
restored_tower: crate::consensus::Result<Tower>,
validator_identity: &Pubkey,
vote_account: &Pubkey,
config: &ValidatorConfig,
ledger_path: &Path,
bank_forks: &BankForks,
) -> Tower {
restored_tower
.and_then(|tower| {
let root_bank = bank_forks.root_bank();
let slot_history = root_bank.get_slot_history();
tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history)
})
.unwrap_or_else(|err| {
let voting_has_been_active =
active_vote_account_exists_in_bank(&bank_forks.working_bank(), &vote_account);
let saved_tower_is_missing = if let TowerError::IOError(io_err) = &err {
io_err.kind() == std::io::ErrorKind::NotFound
} else {
false
};
if !saved_tower_is_missing {
datapoint_error!(
"tower_error",
(
"error",
format!("Unable to restore tower: {}", err),
String
),
);
}
if config.require_tower && voting_has_been_active {
error!("Requested mandatory tower restore failed: {}", err);
error!(
"And there is an existing vote_account containing actual votes. \
Aborting due to possible conflicting duplicate votes"
);
process::exit(1);
}
if saved_tower_is_missing && !voting_has_been_active {
// Currently, don't protect against spoofed snapshots with no tower at all
info!(
"Ignoring expected failed tower restore because this is the initial \
validator start with the vote account..."
);
} else {
error!(
"Rebuilding a new tower from the latest vote account due to failed tower restore: {}",
err
);
}
Tower::new_from_bankforks(
&bank_forks,
&ledger_path,
&validator_identity,
&vote_account,
)
})
}
#[allow(clippy::type_complexity)]
fn new_banks_from_ledger(
validator_identity: &Pubkey,
vote_account: &Pubkey,
config: &ValidatorConfig,
ledger_path: &Path,
poh_verify: bool,
@ -628,6 +706,7 @@ fn new_banks_from_ledger(
LeaderScheduleCache,
Option<(Slot, Hash)>,
TransactionHistoryServices,
Tower,
) {
info!("loading ledger from {:?}...", ledger_path);
let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size);
@ -659,6 +738,14 @@ fn new_banks_from_ledger(
.expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction);
let restored_tower = Tower::restore(ledger_path, &validator_identity);
if let Ok(tower) = &restored_tower {
reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap_or_else(|err| {
error!("Failed to reconcile blockstore with tower: {:?}", err);
std::process::exit(1);
});
}
let process_options = blockstore_processor::ProcessOptions {
poh_verify,
dev_halt_at_slot: config.dev_halt_at_slot,
@ -690,6 +777,17 @@ fn new_banks_from_ledger(
process::exit(1);
});
let tower = post_process_restored_tower(
restored_tower,
&validator_identity,
&vote_account,
&config,
&ledger_path,
&bank_forks,
);
info!("Tower state: {:?}", tower);
leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
bank_forks.set_snapshot_config(config.snapshot_config.clone());
@ -704,6 +802,7 @@ fn new_banks_from_ledger(
leader_schedule_cache,
snapshot_hash,
transaction_history_services,
tower,
)
}