Make startup aware of Incremental Snapshots (#19550)

This commit is contained in:
Brooks Prumo
2021-09-02 19:05:15 -05:00
committed by GitHub
parent 57f51352f6
commit d45ced0a5d
8 changed files with 337 additions and 92 deletions

View File

@ -8,12 +8,20 @@ use crate::{
};
use log::*;
use solana_entry::entry::VerifyRecyclers;
use solana_runtime::{bank_forks::BankForks, snapshot_config::SnapshotConfig, snapshot_utils};
use solana_runtime::{
bank_forks::BankForks, snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig, snapshot_package::AccountsPackageSender, snapshot_utils,
};
use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash};
use std::{fs, path::PathBuf, process, result};
pub type LoadResult = result::Result<
(BankForks, LeaderScheduleCache, Option<(Slot, Hash)>),
(
BankForks,
LeaderScheduleCache,
Option<Slot>,
Option<(Slot, Hash)>,
),
BlockstoreProcessorError,
>;
@ -21,9 +29,16 @@ fn to_loadresult(
bpr: BlockstoreProcessorResult,
snapshot_slot_and_hash: Option<(Slot, Hash)>,
) -> LoadResult {
bpr.map(|(bank_forks, leader_schedule_cache)| {
(bank_forks, leader_schedule_cache, snapshot_slot_and_hash)
})
bpr.map(
|(bank_forks, leader_schedule_cache, last_full_snapshot_slot)| {
(
bank_forks,
leader_schedule_cache,
last_full_snapshot_slot,
snapshot_slot_and_hash,
)
},
)
}
/// Load the banks and accounts
@ -39,6 +54,7 @@ pub fn load(
process_options: ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
accounts_package_sender: AccountsPackageSender,
) -> LoadResult {
if let Some(snapshot_config) = snapshot_config {
info!(
@ -63,6 +79,7 @@ pub fn load(
process_options,
transaction_status_sender,
cache_block_meta_sender,
accounts_package_sender,
);
} else {
info!("No snapshot package available; will load from genesis");
@ -77,6 +94,8 @@ pub fn load(
account_paths,
process_options,
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
)
}
@ -86,6 +105,8 @@ fn load_from_genesis(
account_paths: Vec<PathBuf>,
process_options: ProcessOptions,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
) -> LoadResult {
info!("Processing ledger from genesis");
to_loadresult(
@ -95,6 +116,8 @@ fn load_from_genesis(
account_paths,
process_options,
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
),
None,
)
@ -110,6 +133,7 @@ fn load_from_snapshot(
process_options: ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
accounts_package_sender: AccountsPackageSender,
) -> LoadResult {
// Fail hard here if snapshot fails to load, don't silently continue
if account_paths.is_empty() {
@ -117,24 +141,25 @@ fn load_from_snapshot(
process::exit(1);
}
let (deserialized_bank, timings) = snapshot_utils::bank_from_latest_snapshot_archives(
&snapshot_config.bank_snapshots_dir,
&snapshot_config.snapshot_archives_dir,
&account_paths,
&process_options.frozen_accounts,
genesis_config,
process_options.debug_keys.clone(),
Some(&crate::builtins::get(process_options.bpf_jit)),
process_options.account_indexes.clone(),
process_options.accounts_db_caching_enabled,
process_options.limit_load_slot_count_from_snapshot,
process_options.shrink_ratio,
process_options.accounts_db_test_hash_calculation,
process_options.accounts_db_skip_shrink,
process_options.verify_index,
process_options.accounts_index_config,
)
.expect("Load from snapshot failed");
let (deserialized_bank, timings, full_snapshot_archive_info, _) =
snapshot_utils::bank_from_latest_snapshot_archives(
&snapshot_config.bank_snapshots_dir,
&snapshot_config.snapshot_archives_dir,
&account_paths,
&process_options.frozen_accounts,
genesis_config,
process_options.debug_keys.clone(),
Some(&crate::builtins::get(process_options.bpf_jit)),
process_options.account_indexes.clone(),
process_options.accounts_db_caching_enabled,
process_options.limit_load_slot_count_from_snapshot,
process_options.shrink_ratio,
process_options.accounts_db_test_hash_calculation,
process_options.accounts_db_skip_shrink,
process_options.verify_index,
process_options.accounts_index_config,
)
.expect("Load from snapshot failed");
let deserialized_bank_slot_and_hash = (
deserialized_bank.slot(),
@ -153,7 +178,10 @@ fn load_from_snapshot(
&VerifyRecyclers::default(),
transaction_status_sender,
cache_block_meta_sender,
Some(snapshot_config),
accounts_package_sender,
timings,
full_snapshot_archive_info.slot(),
),
Some(deserialized_bank_slot_and_hash),
)

View File

@ -25,7 +25,9 @@ use solana_runtime::{
bank_forks::BankForks,
bank_utils,
commitment::VOTE_THRESHOLD_SIZE,
snapshot_utils::BankFromArchiveTimings,
snapshot_config::SnapshotConfig,
snapshot_package::{AccountsPackageSender, SnapshotType},
snapshot_utils::{self, BankFromArchiveTimings},
transaction_batch::TransactionBatch,
vote_account::VoteAccount,
vote_sender_types::ReplayVoteSender,
@ -43,7 +45,6 @@ use solana_sdk::{
use solana_transaction_status::token_balances::{
collect_token_balances, TransactionTokenBalancesSet,
};
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
@ -83,7 +84,7 @@ impl BlockCostCapacityMeter {
}
}
pub type BlockstoreProcessorInner = (BankForks, LeaderScheduleCache);
pub type BlockstoreProcessorInner = (BankForks, LeaderScheduleCache, Option<Slot>);
pub type BlockstoreProcessorResult =
result::Result<BlockstoreProcessorInner, BlockstoreProcessorError>;
@ -480,6 +481,8 @@ pub fn process_blockstore(
account_paths: Vec<PathBuf>,
opts: ProcessOptions,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
) -> BlockstoreProcessorResult {
if let Some(num_threads) = opts.override_num_threads {
PAR_THREAD_POOL.with(|pool| {
@ -520,11 +523,15 @@ pub fn process_blockstore(
&recyclers,
None,
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
BankFromArchiveTimings::default(),
None,
)
}
// Process blockstore from a known root bank
/// Process blockstore from a known root bank
#[allow(clippy::too_many_arguments)]
pub(crate) fn process_blockstore_from_root(
blockstore: &Blockstore,
bank: Bank,
@ -532,7 +539,10 @@ pub(crate) fn process_blockstore_from_root(
recyclers: &VerifyRecyclers,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
timings: BankFromArchiveTimings,
last_full_snapshot_slot: Slot,
) -> BlockstoreProcessorResult {
do_process_blockstore_from_root(
blockstore,
@ -541,10 +551,14 @@ pub(crate) fn process_blockstore_from_root(
recyclers,
transaction_status_sender,
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
timings,
Some(last_full_snapshot_slot),
)
}
#[allow(clippy::too_many_arguments)]
fn do_process_blockstore_from_root(
blockstore: &Blockstore,
bank: Arc<Bank>,
@ -552,7 +566,10 @@ fn do_process_blockstore_from_root(
recyclers: &VerifyRecyclers,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
timings: BankFromArchiveTimings,
mut last_full_snapshot_slot: Option<Slot>,
) -> BlockstoreProcessorResult {
info!("processing ledger from slot {}...", bank.slot());
@ -614,7 +631,10 @@ fn do_process_blockstore_from_root(
recyclers,
transaction_status_sender,
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
&mut timing,
&mut last_full_snapshot_slot,
)?;
initial_forks.sort_by_key(|bank| bank.slot());
@ -630,6 +650,7 @@ fn do_process_blockstore_from_root(
if initial_forks.is_empty() {
return Err(BlockstoreProcessorError::NoValidForksFound);
}
let bank_forks = BankForks::new_from_banks(&initial_forks, root);
let processing_time = now.elapsed();
@ -697,7 +718,7 @@ fn do_process_blockstore_from_root(
);
assert!(bank_forks.active_banks().is_empty());
Ok((bank_forks, leader_schedule_cache))
Ok((bank_forks, leader_schedule_cache, last_full_snapshot_slot))
}
/// Verify that a segment of entries has the correct number of ticks and hashes
@ -1038,7 +1059,10 @@ fn load_frozen_forks(
recyclers: &VerifyRecyclers,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
timing: &mut ExecuteTimings,
last_full_snapshot_slot: &mut Option<Slot>,
) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> {
let mut initial_forks = HashMap::new();
let mut all_banks = HashMap::new();
@ -1054,6 +1078,7 @@ fn load_frozen_forks(
"load_frozen_forks() latest root from blockstore: {}, max_root: {}",
blockstore_max_root, max_root,
);
process_next_slots(
root_bank,
root_meta,
@ -1161,11 +1186,35 @@ fn load_frozen_forks(
leader_schedule_cache.set_root(new_root_bank);
new_root_bank.squash();
if let Some(snapshot_config) = snapshot_config {
let block_height = new_root_bank.block_height();
if block_height % snapshot_config.full_snapshot_archive_interval_slots == 0 {
snapshot_utils::snapshot_bank(
new_root_bank,
new_root_bank.src.slot_deltas(&new_root_bank.src.roots()),
&accounts_package_sender,
&snapshot_config.bank_snapshots_dir,
&snapshot_config.snapshot_archives_dir,
snapshot_config.snapshot_version,
snapshot_config.archive_format,
None,
Some(SnapshotType::FullSnapshot),
)
.expect("Failed to snapshot bank while loading frozen banks");
trace!(
"took bank snapshot for new root bank, block height: {}, slot: {}",
block_height,
*root
);
*last_full_snapshot_slot = Some(*root);
}
}
if last_free.elapsed() > Duration::from_secs(10) {
// Must be called after `squash()`, so that AccountsDb knows what
// the roots are for the cache flushing in exhaustively_free_unused_resource().
// This could take few secs; so update last_free later
new_root_bank.exhaustively_free_unused_resource(None);
new_root_bank.exhaustively_free_unused_resource(*last_full_snapshot_slot);
last_free = Instant::now();
}
@ -1414,8 +1463,9 @@ pub mod tests {
use matches::assert_matches;
use rand::{thread_rng, Rng};
use solana_entry::entry::{create_ticks, next_entry, next_entry_mut};
use solana_runtime::genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
use solana_runtime::{
genesis_utils::{self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs},
snapshot_utils::{ArchiveFormat, SnapshotVersion},
};
use solana_sdk::{
account::{AccountSharedData, WritableAccount},
@ -1432,7 +1482,11 @@ pub mod tests {
vote_state::{VoteState, VoteStateVersions, MAX_LOCKOUT_HISTORY},
vote_transaction,
};
use std::{collections::BTreeSet, sync::RwLock};
use std::{
collections::BTreeSet,
sync::{mpsc::channel, RwLock},
};
use tempfile::TempDir;
use trees::tr;
fn test_process_blockstore(
@ -1440,7 +1494,17 @@ pub mod tests {
blockstore: &Blockstore,
opts: ProcessOptions,
) -> BlockstoreProcessorInner {
process_blockstore(genesis_config, blockstore, Vec::new(), opts, None).unwrap()
let (accounts_package_sender, _) = channel();
process_blockstore(
genesis_config,
blockstore,
Vec::new(),
opts,
None,
None,
accounts_package_sender,
)
.unwrap()
}
#[test]
@ -2228,7 +2292,7 @@ pub mod tests {
accounts_db_test_hash_calculation: true,
..ProcessOptions::default()
};
let (_bank_forks, leader_schedule) =
let (_bank_forks, leader_schedule, _) =
test_process_blockstore(&genesis_config, &blockstore, opts);
assert_eq!(leader_schedule.max_schedules(), std::usize::MAX);
}
@ -3005,14 +3069,18 @@ pub mod tests {
bank1.squash();
// Test process_blockstore_from_root() from slot 1 onwards
let (bank_forks, _leader_schedule) = do_process_blockstore_from_root(
let (accounts_package_sender, _) = channel();
let (bank_forks, ..) = do_process_blockstore_from_root(
&blockstore,
bank1,
&opts,
&recyclers,
None,
None,
None,
accounts_package_sender,
BankFromArchiveTimings::default(),
None,
)
.unwrap();
@ -3034,6 +3102,120 @@ pub mod tests {
verify_fork_infos(&bank_forks);
}
/// Test that processing the blockstore is aware of incremental snapshots. When processing the
/// blockstore from a root, like what happens when loading from a snapshot, there may be new
/// roots that cross a full snapshot interval. In these cases, a bank snapshot must be taken,
/// so that a full snapshot archive is created and available by the time the background
/// services spin up.
///
/// For this test, process enough roots to cross the full snapshot interval multiple times.
/// Ensure afterwards that the snapshots were created.
#[test]
fn test_process_blockstore_from_root_with_snapshots() {
solana_logger::setup();
let GenesisConfigInfo {
mut genesis_config, ..
} = create_genesis_config(123);
let ticks_per_slot = 1;
genesis_config.ticks_per_slot = ticks_per_slot;
let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_config);
let blockstore = Blockstore::open(&ledger_path).unwrap();
const ROOT_INTERVAL_SLOTS: Slot = 2;
const FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = ROOT_INTERVAL_SLOTS * 5;
const LAST_SLOT: Slot = FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS * 4;
let mut last_hash = blockhash;
for i in 1..=LAST_SLOT {
last_hash =
fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, i, i - 1, last_hash);
}
let roots_to_set = (0..=LAST_SLOT)
.step_by(ROOT_INTERVAL_SLOTS as usize)
.collect_vec();
blockstore.set_roots(roots_to_set.iter()).unwrap();
// Set up bank1
let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
let opts = ProcessOptions {
poh_verify: true,
accounts_db_test_hash_calculation: true,
..ProcessOptions::default()
};
let recyclers = VerifyRecyclers::default();
process_bank_0(&bank0, &blockstore, &opts, &recyclers, None);
let slot_start_processing = 1;
let bank = Arc::new(Bank::new_from_parent(
&bank0,
&Pubkey::default(),
slot_start_processing,
));
confirm_full_slot(
&blockstore,
&bank,
&opts,
&recyclers,
&mut ConfirmationProgress::new(bank0.last_blockhash()),
None,
None,
&mut ExecuteTimings::default(),
)
.unwrap();
bank.squash();
let bank_snapshots_tempdir = TempDir::new().unwrap();
let snapshot_config = SnapshotConfig {
full_snapshot_archive_interval_slots: FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
incremental_snapshot_archive_interval_slots: Slot::MAX, // value does not matter
snapshot_archives_dir: PathBuf::default(), // value does not matter
bank_snapshots_dir: bank_snapshots_tempdir.path().to_path_buf(),
archive_format: ArchiveFormat::TarZstd, // value does not matter
snapshot_version: SnapshotVersion::default(), // value does not matter
maximum_snapshots_to_retain: usize::MAX, // value does not matter
};
let (accounts_package_sender, accounts_package_receiver) = channel();
do_process_blockstore_from_root(
&blockstore,
bank,
&opts,
&recyclers,
None,
None,
Some(&snapshot_config),
accounts_package_sender.clone(),
BankFromArchiveTimings::default(),
None,
)
.unwrap();
// The `drop()` is necessary here in order to call `.iter()` on the channel below
drop(accounts_package_sender);
// Ensure all the AccountsPackages were created and sent to the AccountsPackageReceiver
let received_accounts_package_slots = accounts_package_receiver
.iter()
.map(|accounts_package| accounts_package.slot)
.collect::<Vec<_>>();
let expected_slots = (slot_start_processing..=LAST_SLOT)
.filter(|slot| slot % FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS == 0)
.collect::<Vec<_>>();
assert_eq!(received_accounts_package_slots, expected_slots);
// Ensure all the bank snapshots were created
let bank_snapshots = snapshot_utils::get_bank_snapshots(&bank_snapshots_tempdir);
let mut bank_snapshot_slots = bank_snapshots
.into_iter()
.map(|bank_snapshot| bank_snapshot.slot)
.collect::<Vec<_>>();
bank_snapshot_slots.sort_unstable();
assert_eq!(bank_snapshot_slots, expected_slots);
}
#[test]
#[ignore]
fn test_process_entries_stress() {