Create leader schedule before processing blockstore

This commit is contained in:
Michael Vines
2022-03-14 09:16:12 -07:00
parent 543d5d4a5d
commit 390dc24608
4 changed files with 82 additions and 71 deletions

View File

@ -550,9 +550,9 @@ mod tests {
full_leader_cache: true, full_leader_cache: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (bank_forks, cached_leader_schedule, _) = let (bank_forks, leader_schedule_cache) =
test_process_blockstore(&genesis_config, &blockstore, opts); test_process_blockstore(&genesis_config, &blockstore, opts);
let leader_schedule_cache = Arc::new(cached_leader_schedule); let leader_schedule_cache = Arc::new(leader_schedule_cache);
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let mut me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); let mut me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);

View File

@ -488,6 +488,19 @@ impl Validator {
transaction_notifier, transaction_notifier,
); );
let tower = {
let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id);
if let Ok(tower) = &restored_tower {
reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| {
error!("Failed to reconcile blockstore with tower: {:?}", err);
abort()
});
}
post_process_restored_tower(restored_tower, &id, &vote_account, config, &bank_forks)
};
info!("Tower state: {:?}", tower);
*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
if !config.no_os_network_stats_reporting { if !config.no_os_network_stats_reporting {
@ -1319,33 +1332,38 @@ fn new_banks_from_ledger(
.cache_block_meta_sender .cache_block_meta_sender
.as_ref(); .as_ref();
let (bank_forks, starting_snapshot_hashes) = bank_forks_utils::load_bank_forks( let (mut bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
&genesis_config, bank_forks_utils::load_bank_forks(
&blockstore, &genesis_config,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
&process_options,
cache_block_meta_sender,
accounts_update_notifier,
);
let (mut bank_forks, mut leader_schedule_cache, last_full_snapshot_slot) =
blockstore_processor::process_blockstore_from_root(
&blockstore, &blockstore,
bank_forks, config.account_paths.clone(),
&process_options, config.account_shrink_paths.clone(),
transaction_history_services
.transaction_status_sender
.as_ref(),
cache_block_meta_sender,
config.snapshot_config.as_ref(), config.snapshot_config.as_ref(),
accounts_package_sender, &process_options,
) cache_block_meta_sender,
.unwrap_or_else(|err| { accounts_update_notifier,
error!("Failed to load ledger: {:?}", err); );
abort()
}); leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
bank_forks.set_snapshot_config(config.snapshot_config.clone());
bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);
let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root(
&blockstore,
&mut bank_forks,
&leader_schedule_cache,
&process_options,
transaction_history_services
.transaction_status_sender
.as_ref(),
cache_block_meta_sender,
config.snapshot_config.as_ref(),
accounts_package_sender,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});
let last_full_snapshot_slot = let last_full_snapshot_slot =
last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0)); last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0));
@ -1399,21 +1417,6 @@ fn new_banks_from_ledger(
); );
} }
let tower = post_process_restored_tower(
restored_tower,
validator_identity,
vote_account,
config,
&bank_forks,
);
info!("Tower state: {:?}", tower);
leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
bank_forks.set_snapshot_config(config.snapshot_config.clone());
bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);
if let Some(blockstore_root_scan) = blockstore_root_scan { if let Some(blockstore_root_scan) = blockstore_root_scan {
if let Err(err) = blockstore_root_scan.join() { if let Err(err) = blockstore_root_scan.join() {
warn!("blockstore_root_scan failed to join {:?}", err); warn!("blockstore_root_scan failed to join {:?}", err);

View File

@ -47,7 +47,7 @@ pub fn load(
accounts_package_sender: AccountsPackageSender, accounts_package_sender: AccountsPackageSender,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> LoadResult { ) -> LoadResult {
let (bank_forks, starting_snapshot_hashes) = load_bank_forks( let (mut bank_forks, leader_schedule_cache, starting_snapshot_hashes) = load_bank_forks(
genesis_config, genesis_config,
blockstore, blockstore,
account_paths, account_paths,
@ -60,16 +60,15 @@ pub fn load(
blockstore_processor::process_blockstore_from_root( blockstore_processor::process_blockstore_from_root(
blockstore, blockstore,
bank_forks, &mut bank_forks,
&leader_schedule_cache,
&process_options, &process_options,
transaction_status_sender, transaction_status_sender,
cache_block_meta_sender, cache_block_meta_sender,
snapshot_config, snapshot_config,
accounts_package_sender, accounts_package_sender,
) )
.map(|(bank_forks, leader_schedule_cache, ..)| { .map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes))
(bank_forks, leader_schedule_cache, starting_snapshot_hashes)
})
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -82,7 +81,11 @@ pub fn load_bank_forks(
process_options: &ProcessOptions, process_options: &ProcessOptions,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> (BankForks, Option<StartingSnapshotHashes>) { ) -> (
BankForks,
LeaderScheduleCache,
Option<StartingSnapshotHashes>,
) {
let snapshot_present = if let Some(snapshot_config) = snapshot_config { let snapshot_present = if let Some(snapshot_config) = snapshot_config {
info!( info!(
"Initializing bank snapshot path: {}", "Initializing bank snapshot path: {}",
@ -107,7 +110,7 @@ pub fn load_bank_forks(
false false
}; };
if snapshot_present { let (bank_forks, starting_snapshot_hashes) = if snapshot_present {
bank_forks_from_snapshot( bank_forks_from_snapshot(
genesis_config, genesis_config,
account_paths, account_paths,
@ -139,7 +142,13 @@ pub fn load_bank_forks(
), ),
None, None,
) )
};
let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank_forks.root_bank());
if process_options.full_leader_cache {
leader_schedule_cache.set_max_schedules(std::usize::MAX);
} }
(bank_forks, leader_schedule_cache, starting_snapshot_hashes)
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]

View File

@ -95,11 +95,6 @@ impl BlockCostCapacityMeter {
} }
} }
pub type BlockstoreProcessorInner = (BankForks, LeaderScheduleCache, Option<Slot>);
pub type BlockstoreProcessorResult =
result::Result<BlockstoreProcessorInner, BlockstoreProcessorError>;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new() thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count()) .num_threads(get_thread_count())
.thread_name(|ix| format!("blockstore_processor_{}", ix)) .thread_name(|ix| format!("blockstore_processor_{}", ix))
@ -570,8 +565,8 @@ pub fn test_process_blockstore(
genesis_config: &GenesisConfig, genesis_config: &GenesisConfig,
blockstore: &Blockstore, blockstore: &Blockstore,
opts: ProcessOptions, opts: ProcessOptions,
) -> BlockstoreProcessorInner { ) -> (BankForks, LeaderScheduleCache) {
let (bank_forks, ..) = crate::bank_forks_utils::load_bank_forks( let (mut bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks(
genesis_config, genesis_config,
blockstore, blockstore,
Vec::new(), Vec::new(),
@ -584,14 +579,16 @@ pub fn test_process_blockstore(
let (accounts_package_sender, _) = unbounded(); let (accounts_package_sender, _) = unbounded();
process_blockstore_from_root( process_blockstore_from_root(
blockstore, blockstore,
bank_forks, &mut bank_forks,
&leader_schedule_cache,
&opts, &opts,
None, None,
None, None,
None, None,
accounts_package_sender, accounts_package_sender,
) )
.unwrap() .unwrap();
(bank_forks, leader_schedule_cache)
} }
pub(crate) fn process_blockstore_for_bank_0( pub(crate) fn process_blockstore_for_bank_0(
@ -632,13 +629,14 @@ pub(crate) fn process_blockstore_for_bank_0(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn process_blockstore_from_root( pub fn process_blockstore_from_root(
blockstore: &Blockstore, blockstore: &Blockstore,
mut bank_forks: BankForks, bank_forks: &mut BankForks,
leader_schedule_cache: &LeaderScheduleCache,
opts: &ProcessOptions, opts: &ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>, snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender, accounts_package_sender: AccountsPackageSender,
) -> BlockstoreProcessorResult { ) -> result::Result<Option<Slot>, BlockstoreProcessorError> {
if let Some(num_threads) = opts.override_num_threads { if let Some(num_threads) = opts.override_num_threads {
PAR_THREAD_POOL.with(|pool| { PAR_THREAD_POOL.with(|pool| {
*pool.borrow_mut() = rayon::ThreadPoolBuilder::new() *pool.borrow_mut() = rayon::ThreadPoolBuilder::new()
@ -692,10 +690,6 @@ pub fn process_blockstore_from_root(
let mut timing = ExecuteTimings::default(); let mut timing = ExecuteTimings::default();
// Iterate and replay slots from blockstore starting from `start_slot` // Iterate and replay slots from blockstore starting from `start_slot`
let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
if opts.full_leader_cache {
leader_schedule_cache.set_max_schedules(std::usize::MAX);
}
let mut last_full_snapshot_slot = None; let mut last_full_snapshot_slot = None;
@ -704,11 +698,11 @@ pub fn process_blockstore_from_root(
.unwrap_or_else(|_| panic!("Failed to get meta for slot {}", start_slot)) .unwrap_or_else(|_| panic!("Failed to get meta for slot {}", start_slot))
{ {
load_frozen_forks( load_frozen_forks(
&mut bank_forks, bank_forks,
start_slot, start_slot,
&start_slot_meta, &start_slot_meta,
blockstore, blockstore,
&leader_schedule_cache, leader_schedule_cache,
opts, opts,
transaction_status_sender, transaction_status_sender,
cache_block_meta_sender, cache_block_meta_sender,
@ -764,7 +758,7 @@ pub fn process_blockstore_from_root(
); );
assert!(bank_forks.active_banks().is_empty()); assert!(bank_forks.active_banks().is_empty());
Ok((bank_forks, leader_schedule_cache, last_full_snapshot_slot)) Ok(last_full_snapshot_slot)
} }
/// Verify that a segment of entries has the correct number of ticks and hashes /// Verify that a segment of entries has the correct number of ticks and hashes
@ -2374,7 +2368,7 @@ pub mod tests {
accounts_db_test_hash_calculation: true, accounts_db_test_hash_calculation: true,
..ProcessOptions::default() ..ProcessOptions::default()
}; };
let (_bank_forks, leader_schedule, _) = let (_bank_forks, leader_schedule) =
test_process_blockstore(&genesis_config, &blockstore, opts); test_process_blockstore(&genesis_config, &blockstore, opts);
assert_eq!(leader_schedule.max_schedules(), std::usize::MAX); assert_eq!(leader_schedule.max_schedules(), std::usize::MAX);
} }
@ -3160,11 +3154,14 @@ pub mod tests {
None, None,
); );
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank1);
// Test process_blockstore_from_root() from slot 1 onwards // Test process_blockstore_from_root() from slot 1 onwards
let (accounts_package_sender, _) = unbounded(); let (accounts_package_sender, _) = unbounded();
let (bank_forks, ..) = process_blockstore_from_root( process_blockstore_from_root(
&blockstore, &blockstore,
bank_forks, &mut bank_forks,
&leader_schedule_cache,
&opts, &opts,
None, None,
None, None,
@ -3268,10 +3265,12 @@ pub mod tests {
}; };
let (accounts_package_sender, accounts_package_receiver) = unbounded(); let (accounts_package_sender, accounts_package_receiver) = unbounded();
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
process_blockstore_from_root( process_blockstore_from_root(
&blockstore, &blockstore,
bank_forks, &mut bank_forks,
&leader_schedule_cache,
&opts, &opts,
None, None,
None, None,