Add incremental snapshot utils (#18504)
This commit adds high-level functions for creating and loading-from incremental snapshots, plus all low-level functions required to perform those tasks. This commit **does not** add taking incremental snapshots as part of a running validator, nor starting up a node with an incremental snapshot; just laying ground work. Additionally, `snapshot_utils` and `serde_snapshot` have been refactored to use a common code paths for the different snapshots. Also of note, some renaming has happened: 1. Snapshots are now either `full_` or `incremental_` throughout the codebase. If not specified, the code applies to both. 2. Bank snapshots now are called "bank snapshots" (before they were called "slot snapshots", "bank snapshots", or just "snapshots"). The one exception is within `Bank`, where they are still just "snapshots", because they are already "bank snapshots". 3. Snapshot archives now have `_archive` in the code. This should clear up an ambiguity between bank snapshots and snapshot archives.
This commit is contained in:
@ -192,7 +192,7 @@ impl SnapshotRequestHandler {
|
||||
|
||||
// Cleanup outdated snapshots
|
||||
let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
|
||||
snapshot_utils::purge_old_snapshots(&self.snapshot_config.snapshot_path);
|
||||
snapshot_utils::purge_old_bank_snapshots(&self.snapshot_config.snapshot_path);
|
||||
purge_old_snapshots_time.stop();
|
||||
total_time.stop();
|
||||
|
||||
|
@ -38,7 +38,7 @@ use crate::{
|
||||
AccountAddressFilter, Accounts, TransactionAccounts, TransactionLoadResult,
|
||||
TransactionLoaders,
|
||||
},
|
||||
accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorages},
|
||||
accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorage, SnapshotStorages},
|
||||
accounts_index::{AccountSecondaryIndexes, IndexKey, ScanResult},
|
||||
ancestors::{Ancestors, AncestorsForSerialization},
|
||||
blockhash_queue::BlockhashQueue,
|
||||
@ -4737,6 +4737,21 @@ impl Bank {
|
||||
self.rc.get_snapshot_storages(self.slot())
|
||||
}
|
||||
|
||||
/// Get the snapshot storages _higher than_ the `full_snapshot_slot`. This is used when making an
|
||||
/// incremental snapshot.
|
||||
pub fn get_incremental_snapshot_storages(&self, full_snapshot_slot: Slot) -> SnapshotStorages {
|
||||
self.get_snapshot_storages()
|
||||
.into_iter()
|
||||
.map(|storage| {
|
||||
storage
|
||||
.into_iter()
|
||||
.filter(|entry| entry.slot() > full_snapshot_slot)
|
||||
.collect::<SnapshotStorage>()
|
||||
})
|
||||
.filter(|storage| !storage.is_empty())
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
fn verify_hash(&self) -> bool {
|
||||
assert!(self.is_frozen());
|
||||
|
@ -74,6 +74,62 @@ struct AccountsDbFields<T>(
|
||||
BankHashInfo,
|
||||
);
|
||||
|
||||
/// Helper type to wrap BufReader streams when deserializing and reconstructing from either just a
|
||||
/// full snapshot, or both a full and incremental snapshot
|
||||
pub struct SnapshotStreams<'a, R> {
|
||||
pub full_snapshot_stream: &'a mut BufReader<R>,
|
||||
pub incremental_snapshot_stream: Option<&'a mut BufReader<R>>,
|
||||
}
|
||||
|
||||
/// Helper type to wrap AccountsDbFields when reconstructing AccountsDb from either just a full
|
||||
/// snapshot, or both a full and incremental snapshot
|
||||
#[derive(Debug)]
|
||||
struct SnapshotAccountsDbFields<T> {
|
||||
full_snapshot_accounts_db_fields: AccountsDbFields<T>,
|
||||
incremental_snapshot_accounts_db_fields: Option<AccountsDbFields<T>>,
|
||||
}
|
||||
|
||||
impl<T> SnapshotAccountsDbFields<T> {
|
||||
/// Collapse the SnapshotAccountsDbFields into a single AccountsDbFields. If there is no
|
||||
/// incremental snapshot, this returns the AccountsDbFields from the full snapshot. Otherwise
|
||||
/// this uses the version, slot, and bank hash info from the incremental snapshot, then the
|
||||
/// combination of the storages from both the full and incremental snapshots.
|
||||
fn collapse_into(self) -> Result<AccountsDbFields<T>, Error> {
|
||||
match self.incremental_snapshot_accounts_db_fields {
|
||||
None => Ok(self.full_snapshot_accounts_db_fields),
|
||||
Some(AccountsDbFields(
|
||||
mut incremental_snapshot_storages,
|
||||
incremental_snapshot_version,
|
||||
incremental_snapshot_slot,
|
||||
incremental_snapshot_bank_hash_info,
|
||||
)) => {
|
||||
let full_snapshot_storages = self.full_snapshot_accounts_db_fields.0;
|
||||
let full_snapshot_slot = self.full_snapshot_accounts_db_fields.2;
|
||||
|
||||
// filter out incremental snapshot storages with slot <= full snapshot slot
|
||||
incremental_snapshot_storages.retain(|slot, _| *slot > full_snapshot_slot);
|
||||
|
||||
// There must not be any overlap in the slots of storages between the full snapshot and the incremental snapshot
|
||||
incremental_snapshot_storages
|
||||
.iter()
|
||||
.all(|storage_entry| !full_snapshot_storages.contains_key(storage_entry.0)).then(|| ()).ok_or_else(|| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "Snapshots are incompatible: There are storages for the same slot in both the full snapshot and the incremental snapshot!")
|
||||
})?;
|
||||
|
||||
let mut combined_storages = full_snapshot_storages;
|
||||
combined_storages.extend(incremental_snapshot_storages.into_iter());
|
||||
|
||||
Ok(AccountsDbFields(
|
||||
combined_storages,
|
||||
incremental_snapshot_version,
|
||||
incremental_snapshot_slot,
|
||||
incremental_snapshot_bank_hash_info,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait TypeContext<'a> {
|
||||
type SerializableAccountStorageEntry: Serialize
|
||||
+ DeserializeOwned
|
||||
@ -127,16 +183,16 @@ where
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn bank_from_stream<R>(
|
||||
pub(crate) fn bank_from_streams<R>(
|
||||
serde_style: SerdeStyle,
|
||||
stream: &mut BufReader<R>,
|
||||
snapshot_streams: &mut SnapshotStreams<R>,
|
||||
account_paths: &[PathBuf],
|
||||
unpacked_append_vec_map: UnpackedAppendVecMap,
|
||||
genesis_config: &GenesisConfig,
|
||||
frozen_account_pubkeys: &[Pubkey],
|
||||
debug_keys: Option<Arc<HashSet<Pubkey>>>,
|
||||
additional_builtins: Option<&Builtins>,
|
||||
account_indexes: AccountSecondaryIndexes,
|
||||
account_secondary_indexes: AccountSecondaryIndexes,
|
||||
caching_enabled: bool,
|
||||
limit_load_slot_count_from_snapshot: Option<usize>,
|
||||
shrink_ratio: AccountShrinkThreshold,
|
||||
@ -147,18 +203,33 @@ where
|
||||
{
|
||||
macro_rules! INTO {
|
||||
($x:ident) => {{
|
||||
let (bank_fields, accounts_db_fields) = $x::deserialize_bank_fields(stream)?;
|
||||
let (full_snapshot_bank_fields, full_snapshot_accounts_db_fields) =
|
||||
$x::deserialize_bank_fields(snapshot_streams.full_snapshot_stream)?;
|
||||
let (incremental_snapshot_bank_fields, incremental_snapshot_accounts_db_fields) =
|
||||
if let Some(ref mut incremental_snapshot_stream) =
|
||||
snapshot_streams.incremental_snapshot_stream
|
||||
{
|
||||
let (bank_fields, accounts_db_fields) =
|
||||
$x::deserialize_bank_fields(incremental_snapshot_stream)?;
|
||||
(Some(bank_fields), Some(accounts_db_fields))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let snapshot_accounts_db_fields = SnapshotAccountsDbFields {
|
||||
full_snapshot_accounts_db_fields,
|
||||
incremental_snapshot_accounts_db_fields,
|
||||
};
|
||||
let bank = reconstruct_bank_from_fields(
|
||||
bank_fields,
|
||||
accounts_db_fields,
|
||||
incremental_snapshot_bank_fields.unwrap_or(full_snapshot_bank_fields),
|
||||
snapshot_accounts_db_fields,
|
||||
genesis_config,
|
||||
frozen_account_pubkeys,
|
||||
account_paths,
|
||||
unpacked_append_vec_map,
|
||||
debug_keys,
|
||||
additional_builtins,
|
||||
account_indexes,
|
||||
account_secondary_indexes,
|
||||
caching_enabled,
|
||||
limit_load_slot_count_from_snapshot,
|
||||
shrink_ratio,
|
||||
@ -243,14 +314,14 @@ impl<'a, C> IgnoreAsHelper for SerializableAccountsDb<'a, C> {}
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn reconstruct_bank_from_fields<E>(
|
||||
bank_fields: BankFieldsToDeserialize,
|
||||
accounts_db_fields: AccountsDbFields<E>,
|
||||
snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
|
||||
genesis_config: &GenesisConfig,
|
||||
frozen_account_pubkeys: &[Pubkey],
|
||||
account_paths: &[PathBuf],
|
||||
unpacked_append_vec_map: UnpackedAppendVecMap,
|
||||
debug_keys: Option<Arc<HashSet<Pubkey>>>,
|
||||
additional_builtins: Option<&Builtins>,
|
||||
account_indexes: AccountSecondaryIndexes,
|
||||
account_secondary_indexes: AccountSecondaryIndexes,
|
||||
caching_enabled: bool,
|
||||
limit_load_slot_count_from_snapshot: Option<usize>,
|
||||
shrink_ratio: AccountShrinkThreshold,
|
||||
@ -260,11 +331,11 @@ where
|
||||
E: SerializableStorage + std::marker::Sync,
|
||||
{
|
||||
let mut accounts_db = reconstruct_accountsdb_from_fields(
|
||||
accounts_db_fields,
|
||||
snapshot_accounts_db_fields,
|
||||
account_paths,
|
||||
unpacked_append_vec_map,
|
||||
&genesis_config.cluster_type,
|
||||
account_indexes,
|
||||
account_secondary_indexes,
|
||||
caching_enabled,
|
||||
limit_load_slot_count_from_snapshot,
|
||||
shrink_ratio,
|
||||
@ -310,11 +381,11 @@ where
|
||||
}
|
||||
|
||||
fn reconstruct_accountsdb_from_fields<E>(
|
||||
accounts_db_fields: AccountsDbFields<E>,
|
||||
snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
|
||||
account_paths: &[PathBuf],
|
||||
unpacked_append_vec_map: UnpackedAppendVecMap,
|
||||
cluster_type: &ClusterType,
|
||||
account_indexes: AccountSecondaryIndexes,
|
||||
account_secondary_indexes: AccountSecondaryIndexes,
|
||||
caching_enabled: bool,
|
||||
limit_load_slot_count_from_snapshot: Option<usize>,
|
||||
shrink_ratio: AccountShrinkThreshold,
|
||||
@ -326,11 +397,19 @@ where
|
||||
let mut accounts_db = AccountsDb::new_with_config(
|
||||
account_paths.to_vec(),
|
||||
cluster_type,
|
||||
account_indexes,
|
||||
account_secondary_indexes,
|
||||
caching_enabled,
|
||||
shrink_ratio,
|
||||
);
|
||||
let AccountsDbFields(storage, version, slot, bank_hash_info) = accounts_db_fields;
|
||||
|
||||
let AccountsDbFields(
|
||||
snapshot_storages,
|
||||
snapshot_version,
|
||||
snapshot_slot,
|
||||
snapshot_bank_hash_info,
|
||||
) = snapshot_accounts_db_fields.collapse_into()?;
|
||||
|
||||
let snapshot_storages = snapshot_storages.into_iter().collect::<Vec<_>>();
|
||||
|
||||
// Ensure all account paths exist
|
||||
for path in &accounts_db.paths {
|
||||
@ -338,13 +417,11 @@ where
|
||||
.unwrap_or_else(|err| panic!("Failed to create directory {}: {}", path.display(), err));
|
||||
}
|
||||
|
||||
let storage = storage.into_iter().collect::<Vec<_>>();
|
||||
|
||||
// Remap the deserialized AppendVec paths to point to correct local paths
|
||||
let mut storage = (0..storage.len())
|
||||
let mut storage = (0..snapshot_storages.len())
|
||||
.into_par_iter()
|
||||
.map(|i| {
|
||||
let (slot, slot_storage) = &storage[i];
|
||||
let (slot, slot_storage) = &snapshot_storages[i];
|
||||
let mut new_slot_storage = HashMap::new();
|
||||
for storage_entry in slot_storage {
|
||||
let file_name = AppendVec::file_name(*slot, storage_entry.id());
|
||||
@ -376,7 +453,7 @@ where
|
||||
.bank_hashes
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(slot, bank_hash_info);
|
||||
.insert(snapshot_slot, snapshot_bank_hash_info);
|
||||
|
||||
// Process deserialized data, set necessary fields in self
|
||||
let max_id: usize = *storage
|
||||
@ -400,7 +477,7 @@ where
|
||||
accounts_db.next_id.store(max_id + 1, Ordering::Relaxed);
|
||||
accounts_db
|
||||
.write_version
|
||||
.fetch_add(version, Ordering::Relaxed);
|
||||
.fetch_add(snapshot_version, Ordering::Relaxed);
|
||||
accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index);
|
||||
Ok(accounts_db)
|
||||
}
|
||||
|
@ -66,8 +66,13 @@ where
|
||||
R: Read,
|
||||
{
|
||||
// read and deserialise the accounts database directly from the stream
|
||||
let accounts_db_fields = C::deserialize_accounts_db_fields(stream)?;
|
||||
let snapshot_accounts_db_fields = SnapshotAccountsDbFields {
|
||||
full_snapshot_accounts_db_fields: accounts_db_fields,
|
||||
incremental_snapshot_accounts_db_fields: None,
|
||||
};
|
||||
reconstruct_accountsdb_from_fields(
|
||||
C::deserialize_accounts_db_fields(stream)?,
|
||||
snapshot_accounts_db_fields,
|
||||
account_paths,
|
||||
unpacked_append_vec_map,
|
||||
&ClusterType::Development,
|
||||
@ -219,9 +224,13 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) {
|
||||
let copied_accounts = TempDir::new().unwrap();
|
||||
let unpacked_append_vec_map =
|
||||
copy_append_vecs(&bank2.rc.accounts.accounts_db, copied_accounts.path()).unwrap();
|
||||
let mut dbank = crate::serde_snapshot::bank_from_stream(
|
||||
let mut snapshot_streams = SnapshotStreams {
|
||||
full_snapshot_stream: &mut reader,
|
||||
incremental_snapshot_stream: None,
|
||||
};
|
||||
let mut dbank = crate::serde_snapshot::bank_from_streams(
|
||||
serde_style,
|
||||
&mut reader,
|
||||
&mut snapshot_streams,
|
||||
&dbank_paths,
|
||||
unpacked_append_vec_map,
|
||||
&genesis_config,
|
||||
|
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user