Make background services aware of incremental snapshots (#19401)

AccountsBackgroundService now knows about incremental snapshots.  It is
now also in charge of deciding if an AccountsPackage is destined to be a
SnapshotPackage or not (or just used by AccountsHashVerifier).

!!! New behavior changes !!!

Taking snapshots (both bank and archive) **MUST** succeed.

This is required because of how the last full snapshot slot is
calculated, which is used by AccountsBackgroundService when calling
`clean_accounts()`.

File system calls are now unwrapped and will result in a crash. As Trent told me:

>Well I think if a snapshot fails due to some IO error, it's very likely that the operator is going to have to intervene before it works.  We should exit error in this case, otherwise the validator might happily spin for several more hours, never successfully writing a complete snapshot, before something else brings it down.  This would leave the validator's last local snapshot many more slots behind than it would be had we exited outright and potentially force the operator to abandon ledger continuity in favor of a quick catchup

Other errors will set the `exit` flag to `true`, and the node will gracefully shutdown.

Fixes #19167 
Fixes #19168
This commit is contained in:
Brooks Prumo
2021-08-31 18:33:27 -05:00
committed by GitHub
parent 718ab7c12e
commit fe9ee9134a
9 changed files with 408 additions and 425 deletions

View File

@@ -1,6 +1,6 @@
use {
crate::{
accounts_db::{AccountShrinkThreshold, AccountsDb},
accounts_db::AccountShrinkThreshold,
accounts_index::{AccountSecondaryIndexes, AccountsIndexConfig},
bank::{Bank, BankSlotDelta, Builtins},
hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap},
@@ -16,14 +16,13 @@ use {
AccountsPackage, AccountsPackageSendError, AccountsPackageSender, SnapshotPackage,
SnapshotType,
},
sorted_storages::SortedStorages,
},
bincode::{config::Options, serialize_into},
bzip2::bufread::BzDecoder,
flate2::read::GzDecoder,
lazy_static::lazy_static,
log::*,
rayon::{prelude::*, ThreadPool},
rayon::prelude::*,
regex::Regex,
solana_measure::measure::Measure,
solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey},
@@ -49,8 +48,8 @@ pub const MAX_BANK_SNAPSHOTS: usize = 8; // Save some snapshots but not too many
const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
const VERSION_STRING_V1_2_0: &str = "1.2.0";
const DEFAULT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0;
pub(crate) const TMP_FULL_SNAPSHOT_PREFIX: &str = "tmp-snapshot-";
pub(crate) const TMP_INCREMENTAL_SNAPSHOT_PREFIX: &str = "tmp-incremental-snapshot-";
pub(crate) const TMP_BANK_SNAPSHOT_PREFIX: &str = "tmp-bank-snapshot-";
pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-";
pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: usize = 2;
pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz)$";
pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz)$";
@@ -224,9 +223,7 @@ pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef<Path>) {
.file_name()
.into_string()
.unwrap_or_else(|_| String::new());
if file_name.starts_with(TMP_FULL_SNAPSHOT_PREFIX)
|| file_name.starts_with(TMP_INCREMENTAL_SNAPSHOT_PREFIX)
{
if file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX) {
if entry.path().is_file() {
fs::remove_file(entry.path())
} else {
@@ -269,7 +266,7 @@ pub fn archive_snapshot_package(
.map_err(|e| SnapshotError::IoWithSource(e, "create archive path"))?;
// Create the staging directories
let staging_dir_prefix = snapshot_package.snapshot_type.to_prefix();
let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX;
let staging_dir = tempfile::Builder::new()
.prefix(&format!(
"{}{}-",
@@ -293,7 +290,7 @@ pub fn archive_snapshot_package(
.map_err(|e| SnapshotError::IoWithSource(e, "create staging symlinks"))?;
// Add the AppendVecs into the compressible list
for storage in snapshot_package.storages.iter().flatten() {
for storage in snapshot_package.snapshot_storages.iter().flatten() {
storage.flush()?;
let storage_path = storage.get_path();
let output_path = staging_accounts_dir.join(crate::append_vec::AppendVec::file_name(
@@ -742,7 +739,7 @@ pub fn bank_from_snapshot_archives(
let unarchived_full_snapshot = unarchive_snapshot(
&bank_snapshots_dir,
TMP_FULL_SNAPSHOT_PREFIX,
TMP_SNAPSHOT_ARCHIVE_PREFIX,
full_snapshot_archive_info.path(),
"snapshot untar",
account_paths,
@@ -754,7 +751,7 @@ pub fn bank_from_snapshot_archives(
if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info {
let unarchived_incremental_snapshot = unarchive_snapshot(
&bank_snapshots_dir,
TMP_INCREMENTAL_SNAPSHOT_PREFIX,
TMP_SNAPSHOT_ARCHIVE_PREFIX,
incremental_snapshot_archive_info.path(),
"incremental snapshot untar",
account_paths,
@@ -1530,7 +1527,12 @@ where
})
}
/// Gather the necessary elements for a snapshot of the given `root_bank`
/// Gather the necessary elements for a snapshot of the given `root_bank`.
///
/// **DEVELOPER NOTE** Any error that is returned from this function may bring down the node! This
/// function is called from AccountsBackgroundService to handle snapshot requests. Since taking a
/// snapshot is not permitted to fail, any errors returned here will trigger the node to shutdown.
/// So, be careful whenever adding new code that may return errors.
pub fn snapshot_bank(
root_bank: &Bank,
status_cache_slot_deltas: Vec<BankSlotDelta>,
@@ -1538,30 +1540,42 @@ pub fn snapshot_bank(
bank_snapshots_dir: impl AsRef<Path>,
snapshot_archives_dir: impl AsRef<Path>,
snapshot_version: SnapshotVersion,
archive_format: &ArchiveFormat,
archive_format: ArchiveFormat,
hash_for_testing: Option<Hash>,
snapshot_type: Option<SnapshotType>,
) -> Result<()> {
let storages = root_bank.get_snapshot_storages(None);
let snapshot_storages = snapshot_type.map_or_else(SnapshotStorages::default, |snapshot_type| {
let incremental_snapshot_base_slot = match snapshot_type {
SnapshotType::IncrementalSnapshot(incremental_snapshot_base_slot) => {
Some(incremental_snapshot_base_slot)
}
_ => None,
};
root_bank.get_snapshot_storages(incremental_snapshot_base_slot)
});
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
add_bank_snapshot(&bank_snapshots_dir, root_bank, &storages, snapshot_version)?;
let bank_snapshot_info = add_bank_snapshot(
&bank_snapshots_dir,
root_bank,
&snapshot_storages,
snapshot_version,
)?;
add_snapshot_time.stop();
inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize);
// Package the relevant snapshots
let highest_bank_snapshot_info = get_highest_bank_snapshot_info(&bank_snapshots_dir)
.expect("no snapshots found in config bank_snapshots_dir");
let accounts_package = AccountsPackage::new_for_full_snapshot(
let accounts_package = AccountsPackage::new(
root_bank,
&highest_bank_snapshot_info,
&bank_snapshot_info,
bank_snapshots_dir,
status_cache_slot_deltas,
snapshot_archives_dir,
storages,
*archive_format,
snapshot_storages,
archive_format,
snapshot_version,
hash_for_testing,
)?;
snapshot_type,
)
.expect("failed to hard link bank snapshot into a tmpdir");
accounts_package_sender.send(accounts_package)?;
@@ -1579,7 +1593,6 @@ pub fn bank_to_full_snapshot_archive(
snapshot_version: Option<SnapshotVersion>,
snapshot_archives_dir: impl AsRef<Path>,
archive_format: ArchiveFormat,
thread_pool: Option<&ThreadPool>,
maximum_snapshots_to_retain: usize,
) -> Result<FullSnapshotArchiveInfo> {
let snapshot_version = snapshot_version.unwrap_or_default();
@@ -1592,18 +1605,18 @@ pub fn bank_to_full_snapshot_archive(
bank.rehash(); // Bank accounts may have been manually modified by the caller
let temp_dir = tempfile::tempdir_in(bank_snapshots_dir)?;
let storages = bank.get_snapshot_storages(None);
let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?;
let snapshot_storages = bank.get_snapshot_storages(None);
let bank_snapshot_info =
add_bank_snapshot(&temp_dir, bank, &snapshot_storages, snapshot_version)?;
package_process_and_archive_full_snapshot(
package_and_archive_full_snapshot(
bank,
&bank_snapshot_info,
&temp_dir,
snapshot_archives_dir,
storages,
snapshot_storages,
archive_format,
snapshot_version,
thread_pool,
maximum_snapshots_to_retain,
)
}
@@ -1621,7 +1634,6 @@ pub fn bank_to_incremental_snapshot_archive(
snapshot_version: Option<SnapshotVersion>,
snapshot_archives_dir: impl AsRef<Path>,
archive_format: ArchiveFormat,
thread_pool: Option<&ThreadPool>,
maximum_snapshots_to_retain: usize,
) -> Result<IncrementalSnapshotArchiveInfo> {
let snapshot_version = snapshot_version.unwrap_or_default();
@@ -1635,25 +1647,25 @@ pub fn bank_to_incremental_snapshot_archive(
bank.rehash(); // Bank accounts may have been manually modified by the caller
let temp_dir = tempfile::tempdir_in(bank_snapshots_dir)?;
let storages = bank.get_snapshot_storages(Some(full_snapshot_slot));
let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?;
let snapshot_storages = bank.get_snapshot_storages(Some(full_snapshot_slot));
let bank_snapshot_info =
add_bank_snapshot(&temp_dir, bank, &snapshot_storages, snapshot_version)?;
package_process_and_archive_incremental_snapshot(
package_and_archive_incremental_snapshot(
bank,
full_snapshot_slot,
&bank_snapshot_info,
&temp_dir,
snapshot_archives_dir,
storages,
snapshot_storages,
archive_format,
snapshot_version,
thread_pool,
maximum_snapshots_to_retain,
)
}
/// Helper function to hold shared code to package, process, and archive full snapshots
pub fn package_process_and_archive_full_snapshot(
pub fn package_and_archive_full_snapshot(
bank: &Bank,
bank_snapshot_info: &BankSnapshotInfo,
bank_snapshots_dir: impl AsRef<Path>,
@@ -1661,10 +1673,9 @@ pub fn package_process_and_archive_full_snapshot(
snapshot_storages: SnapshotStorages,
archive_format: ArchiveFormat,
snapshot_version: SnapshotVersion,
thread_pool: Option<&ThreadPool>,
maximum_snapshots_to_retain: usize,
) -> Result<FullSnapshotArchiveInfo> {
let accounts_package = AccountsPackage::new_for_full_snapshot(
let accounts_package = AccountsPackage::new(
bank,
bank_snapshot_info,
bank_snapshots_dir,
@@ -1674,14 +1685,11 @@ pub fn package_process_and_archive_full_snapshot(
archive_format,
snapshot_version,
None,
Some(SnapshotType::FullSnapshot),
)?;
let snapshot_package = process_and_archive_accounts_package(
accounts_package,
thread_pool,
None,
maximum_snapshots_to_retain,
)?;
let snapshot_package = SnapshotPackage::from(accounts_package);
archive_snapshot_package(&snapshot_package, maximum_snapshots_to_retain)?;
Ok(FullSnapshotArchiveInfo::new(
snapshot_package.snapshot_archive_info,
@@ -1690,7 +1698,7 @@ pub fn package_process_and_archive_full_snapshot(
/// Helper function to hold shared code to package, process, and archive incremental snapshots
#[allow(clippy::too_many_arguments)]
pub fn package_process_and_archive_incremental_snapshot(
pub fn package_and_archive_incremental_snapshot(
bank: &Bank,
incremental_snapshot_base_slot: Slot,
bank_snapshot_info: &BankSnapshotInfo,
@@ -1699,12 +1707,10 @@ pub fn package_process_and_archive_incremental_snapshot(
snapshot_storages: SnapshotStorages,
archive_format: ArchiveFormat,
snapshot_version: SnapshotVersion,
thread_pool: Option<&ThreadPool>,
maximum_snapshots_to_retain: usize,
) -> Result<IncrementalSnapshotArchiveInfo> {
let accounts_package = AccountsPackage::new_for_incremental_snapshot(
let accounts_package = AccountsPackage::new(
bank,
incremental_snapshot_base_slot,
bank_snapshot_info,
bank_snapshots_dir,
bank.src.slot_deltas(&bank.src.roots()),
@@ -1713,14 +1719,13 @@ pub fn package_process_and_archive_incremental_snapshot(
archive_format,
snapshot_version,
None,
Some(SnapshotType::IncrementalSnapshot(
incremental_snapshot_base_slot,
)),
)?;
let snapshot_package = process_and_archive_accounts_package(
accounts_package,
thread_pool,
Some(incremental_snapshot_base_slot),
maximum_snapshots_to_retain,
)?;
let snapshot_package = SnapshotPackage::from(accounts_package);
archive_snapshot_package(&snapshot_package, maximum_snapshots_to_retain)?;
Ok(IncrementalSnapshotArchiveInfo::new(
incremental_snapshot_base_slot,
@@ -1728,89 +1733,6 @@ pub fn package_process_and_archive_incremental_snapshot(
))
}
/// Helper function to hold shared code to process and archive accounts packages
fn process_and_archive_accounts_package(
accounts_package: AccountsPackage,
thread_pool: Option<&ThreadPool>,
incremental_snapshot_base_slot: Option<Slot>,
maximum_snapshots_to_retain: usize,
) -> Result<SnapshotPackage> {
let snapshot_package = process_accounts_package(
accounts_package,
thread_pool,
incremental_snapshot_base_slot,
);
archive_snapshot_package(&snapshot_package, maximum_snapshots_to_retain)?;
Ok(snapshot_package)
}
pub fn process_accounts_package(
accounts_package: AccountsPackage,
thread_pool: Option<&ThreadPool>,
incremental_snapshot_base_slot: Option<Slot>,
) -> SnapshotPackage {
let mut time = Measure::start("hash");
let hash = accounts_package.hash; // temporarily remaining here
if let Some(expected_hash) = accounts_package.hash_for_testing {
let sorted_storages = SortedStorages::new(&accounts_package.storages);
let (hash, lamports) = AccountsDb::calculate_accounts_hash_without_index(
&sorted_storages,
thread_pool,
crate::accounts_hash::HashStats::default(),
false,
None,
)
.unwrap();
assert_eq!(accounts_package.expected_capitalization, lamports);
assert_eq!(expected_hash, hash);
};
time.stop();
datapoint_info!(
"accounts_hash_verifier",
("calculate_hash", time.as_us(), i64),
);
let snapshot_archive_path = match incremental_snapshot_base_slot {
None => build_full_snapshot_archive_path(
accounts_package.snapshot_archives_dir,
accounts_package.slot,
&hash,
accounts_package.archive_format,
),
Some(incremental_snapshot_base_slot) => build_incremental_snapshot_archive_path(
accounts_package.snapshot_archives_dir,
incremental_snapshot_base_slot,
accounts_package.slot,
&hash,
accounts_package.archive_format,
),
};
let snapshot_type = match incremental_snapshot_base_slot {
None => SnapshotType::FullSnapshot,
Some(_) => SnapshotType::IncrementalSnapshot,
};
SnapshotPackage::new(
accounts_package.slot,
accounts_package.block_height,
accounts_package.slot_deltas,
accounts_package.snapshot_links,
accounts_package.storages,
snapshot_archive_path,
hash,
accounts_package.archive_format,
accounts_package.snapshot_version,
snapshot_type,
)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -2491,7 +2413,6 @@ mod tests {
None,
snapshot_archives_dir.path(),
snapshot_archive_format,
None,
1,
)
.unwrap();
@@ -2582,7 +2503,6 @@ mod tests {
None,
snapshot_archives_dir.path(),
snapshot_archive_format,
None,
std::usize::MAX,
)
.unwrap();
@@ -2659,7 +2579,6 @@ mod tests {
None,
snapshot_archives_dir.path(),
snapshot_archive_format,
None,
std::usize::MAX,
)
.unwrap();
@@ -2692,7 +2611,6 @@ mod tests {
None,
snapshot_archives_dir.path(),
snapshot_archive_format,
None,
std::usize::MAX,
)
.unwrap();
@@ -2759,7 +2677,6 @@ mod tests {
None,
&snapshot_archives_dir,
snapshot_archive_format,
None,
std::usize::MAX,
)
.unwrap();
@@ -2792,7 +2709,6 @@ mod tests {
None,
&snapshot_archives_dir,
snapshot_archive_format,
None,
std::usize::MAX,
)
.unwrap();
@@ -2891,7 +2807,6 @@ mod tests {
None,
snapshot_archives_dir.path(),
snapshot_archive_format,
None,
DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
)
.unwrap();
@@ -2933,7 +2848,6 @@ mod tests {
None,
snapshot_archives_dir.path(),
snapshot_archive_format,
None,
DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
)
.unwrap();
@@ -2994,7 +2908,6 @@ mod tests {
None,
snapshot_archives_dir.path(),
snapshot_archive_format,
None,
DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
)
.unwrap();