Move BankForks to solana_runtime (#10637)
* Move BankForks to solana_runtime * Update imports
This commit is contained in:
@ -12,6 +12,9 @@ edition = "2018"
|
||||
bincode = "1.2.1"
|
||||
bv = { version = "0.11.1", features = ["serde"] }
|
||||
byteorder = "1.3.4"
|
||||
bzip2 = "0.3.3"
|
||||
dir-diff = "0.3.2"
|
||||
flate2 = "1.0.14"
|
||||
fnv = "1.0.7"
|
||||
fs_extra = "1.1.0"
|
||||
itertools = "0.9.0"
|
||||
@ -25,6 +28,7 @@ num-traits = { version = "0.2" }
|
||||
num_cpus = "1.13.0"
|
||||
rand = "0.7.0"
|
||||
rayon = "1.3.0"
|
||||
regex = "1.3.9"
|
||||
serde = { version = "1.0.112", features = ["rc"] }
|
||||
serde_derive = "1.0.103"
|
||||
solana-config-program = { path = "../programs/config", version = "1.3.0" }
|
||||
@ -35,8 +39,11 @@ solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.3.0" }
|
||||
solana-sdk = { path = "../sdk", version = "1.3.0" }
|
||||
solana-stake-program = { path = "../programs/stake", version = "1.3.0" }
|
||||
solana-vote-program = { path = "../programs/vote", version = "1.3.0" }
|
||||
symlink = "0.1.0"
|
||||
tar = "0.4.28"
|
||||
tempfile = "3.1.0"
|
||||
thiserror = "1.0"
|
||||
zstd = "0.5.1"
|
||||
|
||||
[lib]
|
||||
crate-type = ["lib"]
|
||||
|
455
runtime/src/bank_forks.rs
Normal file
455
runtime/src/bank_forks.rs
Normal file
@ -0,0 +1,455 @@
|
||||
//! The `bank_forks` module implments BankForks a DAG of checkpointed Banks
|
||||
|
||||
use crate::snapshot_package::{AccountsPackageSendError, AccountsPackageSender};
|
||||
use crate::snapshot_utils::{self, SnapshotError};
|
||||
use crate::{bank::Bank, status_cache::MAX_CACHE_ENTRIES};
|
||||
use log::*;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::inc_new_counter_info;
|
||||
use solana_sdk::{clock::Slot, timing};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
ops::Index,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
time::Instant,
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub enum CompressionType {
|
||||
Bzip2,
|
||||
Gzip,
|
||||
Zstd,
|
||||
NoCompression,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct SnapshotConfig {
|
||||
// Generate a new snapshot every this many slots
|
||||
pub snapshot_interval_slots: u64,
|
||||
|
||||
// Where to store the latest packaged snapshot
|
||||
pub snapshot_package_output_path: PathBuf,
|
||||
|
||||
// Where to place the snapshots for recent slots
|
||||
pub snapshot_path: PathBuf,
|
||||
|
||||
pub compression: CompressionType,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum BankForksError {
|
||||
#[error("snapshot error")]
|
||||
SnapshotError(#[from] SnapshotError),
|
||||
|
||||
#[error("accounts package send error")]
|
||||
AccountsPackageSendError(#[from] AccountsPackageSendError),
|
||||
}
|
||||
type Result<T> = std::result::Result<T, BankForksError>;
|
||||
|
||||
pub struct BankForks {
|
||||
pub banks: HashMap<Slot, Arc<Bank>>,
|
||||
working_bank: Arc<Bank>,
|
||||
root: Slot,
|
||||
pub snapshot_config: Option<SnapshotConfig>,
|
||||
last_snapshot_slot: Slot,
|
||||
|
||||
pub accounts_hash_interval_slots: Slot,
|
||||
last_accounts_hash_slot: Slot,
|
||||
}
|
||||
|
||||
impl Index<u64> for BankForks {
|
||||
type Output = Arc<Bank>;
|
||||
fn index(&self, bank_slot: Slot) -> &Arc<Bank> {
|
||||
&self.banks[&bank_slot]
|
||||
}
|
||||
}
|
||||
|
||||
impl BankForks {
|
||||
pub fn new(bank: Bank) -> Self {
|
||||
let root = bank.slot();
|
||||
Self::new_from_banks(&[Arc::new(bank)], root)
|
||||
}
|
||||
|
||||
/// Create a map of bank slot id to the set of ancestors for the bank slot.
|
||||
pub fn ancestors(&self) -> HashMap<Slot, HashSet<Slot>> {
|
||||
let mut ancestors = HashMap::new();
|
||||
let root = self.root;
|
||||
for bank in self.banks.values() {
|
||||
let mut set: HashSet<Slot> = bank
|
||||
.ancestors
|
||||
.keys()
|
||||
.filter(|k| **k >= root)
|
||||
.cloned()
|
||||
.collect();
|
||||
set.remove(&bank.slot());
|
||||
ancestors.insert(bank.slot(), set);
|
||||
}
|
||||
ancestors
|
||||
}
|
||||
|
||||
/// Create a map of bank slot id to the set of all of its descendants
|
||||
#[allow(clippy::or_fun_call)]
|
||||
pub fn descendants(&self) -> HashMap<Slot, HashSet<Slot>> {
|
||||
let mut descendants = HashMap::new();
|
||||
for bank in self.banks.values() {
|
||||
let _ = descendants.entry(bank.slot()).or_insert(HashSet::new());
|
||||
let mut set: HashSet<Slot> = bank.ancestors.keys().cloned().collect();
|
||||
set.remove(&bank.slot());
|
||||
for parent in set {
|
||||
descendants
|
||||
.entry(parent)
|
||||
.or_insert(HashSet::new())
|
||||
.insert(bank.slot());
|
||||
}
|
||||
}
|
||||
descendants
|
||||
}
|
||||
|
||||
pub fn frozen_banks(&self) -> HashMap<Slot, Arc<Bank>> {
|
||||
self.banks
|
||||
.iter()
|
||||
.filter(|(_, b)| b.is_frozen())
|
||||
.map(|(k, b)| (*k, b.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn active_banks(&self) -> Vec<Slot> {
|
||||
self.banks
|
||||
.iter()
|
||||
.filter(|(_, v)| !v.is_frozen())
|
||||
.map(|(k, _v)| *k)
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn get(&self, bank_slot: Slot) -> Option<&Arc<Bank>> {
|
||||
self.banks.get(&bank_slot)
|
||||
}
|
||||
|
||||
pub fn root_bank(&self) -> &Arc<Bank> {
|
||||
self.banks.get(&self.root()).expect("Root bank must exist")
|
||||
}
|
||||
|
||||
pub fn new_from_banks(initial_forks: &[Arc<Bank>], root: Slot) -> Self {
|
||||
let mut banks = HashMap::new();
|
||||
|
||||
// Set working bank to the highest available bank
|
||||
let working_bank = initial_forks
|
||||
.iter()
|
||||
.max_by(|a, b| a.slot().cmp(&b.slot()))
|
||||
.expect("working bank")
|
||||
.clone();
|
||||
|
||||
// Iterate through the heads of all the different forks
|
||||
for bank in initial_forks {
|
||||
banks.insert(bank.slot(), bank.clone());
|
||||
let parents = bank.parents();
|
||||
for parent in parents {
|
||||
if banks.contains_key(&parent.slot()) {
|
||||
// All ancestors have already been inserted by another fork
|
||||
break;
|
||||
}
|
||||
banks.insert(parent.slot(), parent.clone());
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
root,
|
||||
banks,
|
||||
working_bank,
|
||||
snapshot_config: None,
|
||||
last_snapshot_slot: root,
|
||||
accounts_hash_interval_slots: std::u64::MAX,
|
||||
last_accounts_hash_slot: root,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, bank: Bank) -> Arc<Bank> {
|
||||
let bank = Arc::new(bank);
|
||||
let prev = self.banks.insert(bank.slot(), bank.clone());
|
||||
assert!(prev.is_none());
|
||||
|
||||
self.working_bank = bank.clone();
|
||||
bank
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, slot: Slot) -> Option<Arc<Bank>> {
|
||||
self.banks.remove(&slot)
|
||||
}
|
||||
|
||||
pub fn working_bank(&self) -> Arc<Bank> {
|
||||
self.working_bank.clone()
|
||||
}
|
||||
|
||||
pub fn set_root(
|
||||
&mut self,
|
||||
root: Slot,
|
||||
accounts_package_sender: &Option<AccountsPackageSender>,
|
||||
largest_confirmed_root: Option<Slot>,
|
||||
) {
|
||||
let old_epoch = self.root_bank().epoch();
|
||||
self.root = root;
|
||||
let set_root_start = Instant::now();
|
||||
let root_bank = self
|
||||
.banks
|
||||
.get(&root)
|
||||
.expect("root bank didn't exist in bank_forks");
|
||||
let new_epoch = root_bank.epoch();
|
||||
if old_epoch != new_epoch {
|
||||
info!(
|
||||
"Root entering
|
||||
epoch: {},
|
||||
next_epoch_start_slot: {},
|
||||
epoch_stakes: {:#?}",
|
||||
new_epoch,
|
||||
root_bank
|
||||
.epoch_schedule()
|
||||
.get_first_slot_in_epoch(new_epoch + 1),
|
||||
root_bank
|
||||
.epoch_stakes(new_epoch)
|
||||
.unwrap()
|
||||
.node_id_to_vote_accounts()
|
||||
);
|
||||
}
|
||||
let root_tx_count = root_bank
|
||||
.parents()
|
||||
.last()
|
||||
.map(|bank| bank.transaction_count())
|
||||
.unwrap_or(0);
|
||||
// Calculate the accounts hash at a fixed interval
|
||||
let mut is_root_bank_squashed = false;
|
||||
let mut banks = vec![root_bank];
|
||||
let parents = root_bank.parents();
|
||||
banks.extend(parents.iter());
|
||||
for bank in banks.iter() {
|
||||
let bank_slot = bank.slot();
|
||||
if bank.block_height() % self.accounts_hash_interval_slots == 0
|
||||
&& bank_slot > self.last_accounts_hash_slot
|
||||
{
|
||||
self.last_accounts_hash_slot = bank_slot;
|
||||
bank.squash();
|
||||
is_root_bank_squashed = bank_slot == root;
|
||||
|
||||
bank.clean_accounts();
|
||||
bank.update_accounts_hash();
|
||||
|
||||
if self.snapshot_config.is_some() && accounts_package_sender.is_some() {
|
||||
// Generate an accounts package
|
||||
let mut snapshot_time = Measure::start("total-snapshot-ms");
|
||||
let r = self.generate_accounts_package(
|
||||
bank_slot,
|
||||
&bank.src.roots(),
|
||||
accounts_package_sender.as_ref().unwrap(),
|
||||
);
|
||||
if r.is_err() {
|
||||
warn!(
|
||||
"Error generating snapshot for bank: {}, err: {:?}",
|
||||
bank_slot, r
|
||||
);
|
||||
} else {
|
||||
self.last_snapshot_slot = bank_slot;
|
||||
}
|
||||
|
||||
// Cleanup outdated snapshots
|
||||
self.purge_old_snapshots();
|
||||
snapshot_time.stop();
|
||||
inc_new_counter_info!("total-snapshot-ms", snapshot_time.as_ms() as usize);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !is_root_bank_squashed {
|
||||
root_bank.squash();
|
||||
}
|
||||
let new_tx_count = root_bank.transaction_count();
|
||||
|
||||
self.prune_non_root(root, largest_confirmed_root);
|
||||
|
||||
inc_new_counter_info!(
|
||||
"bank-forks_set_root_ms",
|
||||
timing::duration_as_ms(&set_root_start.elapsed()) as usize
|
||||
);
|
||||
inc_new_counter_info!(
|
||||
"bank-forks_set_root_tx_count",
|
||||
(new_tx_count - root_tx_count) as usize
|
||||
);
|
||||
}
|
||||
|
||||
pub fn root(&self) -> Slot {
|
||||
self.root
|
||||
}
|
||||
|
||||
pub fn purge_old_snapshots(&self) {
|
||||
// Remove outdated snapshots
|
||||
let config = self.snapshot_config.as_ref().unwrap();
|
||||
let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path);
|
||||
let num_to_remove = slot_snapshot_paths.len().saturating_sub(MAX_CACHE_ENTRIES);
|
||||
for slot_files in &slot_snapshot_paths[..num_to_remove] {
|
||||
let r = snapshot_utils::remove_snapshot(slot_files.slot, &config.snapshot_path);
|
||||
if r.is_err() {
|
||||
warn!("Couldn't remove snapshot at: {:?}", config.snapshot_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn generate_accounts_package(
|
||||
&self,
|
||||
root: Slot,
|
||||
slots_to_snapshot: &[Slot],
|
||||
accounts_package_sender: &AccountsPackageSender,
|
||||
) -> Result<()> {
|
||||
let config = self.snapshot_config.as_ref().unwrap();
|
||||
|
||||
// Add a snapshot for the new root
|
||||
let bank = self
|
||||
.get(root)
|
||||
.cloned()
|
||||
.expect("root must exist in BankForks");
|
||||
|
||||
let storages: Vec<_> = bank.get_snapshot_storages();
|
||||
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
|
||||
snapshot_utils::add_snapshot(&config.snapshot_path, &bank, &storages)?;
|
||||
add_snapshot_time.stop();
|
||||
inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize);
|
||||
|
||||
// Package the relevant snapshots
|
||||
let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path);
|
||||
let latest_slot_snapshot_paths = slot_snapshot_paths
|
||||
.last()
|
||||
.expect("no snapshots found in config snapshot_path");
|
||||
// We only care about the last bank's snapshot.
|
||||
// We'll ask the bank for MAX_CACHE_ENTRIES (on the rooted path) worth of statuses
|
||||
let package = snapshot_utils::package_snapshot(
|
||||
&bank,
|
||||
latest_slot_snapshot_paths,
|
||||
&config.snapshot_path,
|
||||
slots_to_snapshot,
|
||||
&config.snapshot_package_output_path,
|
||||
storages,
|
||||
config.compression.clone(),
|
||||
)?;
|
||||
|
||||
accounts_package_sender.send(package)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prune_non_root(&mut self, root: Slot, largest_confirmed_root: Option<Slot>) {
|
||||
let descendants = self.descendants();
|
||||
self.banks.retain(|slot, _| {
|
||||
*slot == root
|
||||
|| descendants[&root].contains(slot)
|
||||
|| (*slot < root
|
||||
&& *slot >= largest_confirmed_root.unwrap_or(root)
|
||||
&& descendants[slot].contains(&root))
|
||||
});
|
||||
datapoint_debug!(
|
||||
"bank_forks_purge_non_root",
|
||||
("num_banks_retained", self.banks.len(), i64),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn set_snapshot_config(&mut self, snapshot_config: Option<SnapshotConfig>) {
|
||||
self.snapshot_config = snapshot_config;
|
||||
}
|
||||
|
||||
pub fn snapshot_config(&self) -> &Option<SnapshotConfig> {
|
||||
&self.snapshot_config
|
||||
}
|
||||
|
||||
pub fn set_accounts_hash_interval_slots(&mut self, accounts_interval_slots: u64) {
|
||||
self.accounts_hash_interval_slots = accounts_interval_slots;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_new() {
|
||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||
let bank = Bank::new(&genesis_config);
|
||||
let mut bank_forks = BankForks::new(bank);
|
||||
let child_bank = Bank::new_from_parent(&bank_forks[0u64], &Pubkey::default(), 1);
|
||||
child_bank.register_tick(&Hash::default());
|
||||
bank_forks.insert(child_bank);
|
||||
assert_eq!(bank_forks[1u64].tick_height(), 1);
|
||||
assert_eq!(bank_forks.working_bank().tick_height(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_new_from_banks() {
|
||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||
let bank = Arc::new(Bank::new(&genesis_config));
|
||||
let child_bank = Arc::new(Bank::new_from_parent(&bank, &Pubkey::default(), 1));
|
||||
|
||||
let bank_forks = BankForks::new_from_banks(&[bank.clone(), child_bank.clone()], 0);
|
||||
assert_eq!(bank_forks.root(), 0);
|
||||
assert_eq!(bank_forks.working_bank().slot(), 1);
|
||||
|
||||
let bank_forks = BankForks::new_from_banks(&[child_bank, bank], 0);
|
||||
assert_eq!(bank_forks.root(), 0);
|
||||
assert_eq!(bank_forks.working_bank().slot(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_descendants() {
|
||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||
let bank = Bank::new(&genesis_config);
|
||||
let mut bank_forks = BankForks::new(bank);
|
||||
let bank0 = bank_forks[0].clone();
|
||||
let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
|
||||
bank_forks.insert(bank);
|
||||
let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 2);
|
||||
bank_forks.insert(bank);
|
||||
let descendants = bank_forks.descendants();
|
||||
let children: HashSet<u64> = [1u64, 2u64].to_vec().into_iter().collect();
|
||||
assert_eq!(children, *descendants.get(&0).unwrap());
|
||||
assert!(descendants[&1].is_empty());
|
||||
assert!(descendants[&2].is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_ancestors() {
|
||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||
let bank = Bank::new(&genesis_config);
|
||||
let mut bank_forks = BankForks::new(bank);
|
||||
let bank0 = bank_forks[0].clone();
|
||||
let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
|
||||
bank_forks.insert(bank);
|
||||
let bank = Bank::new_from_parent(&bank0, &Pubkey::default(), 2);
|
||||
bank_forks.insert(bank);
|
||||
let ancestors = bank_forks.ancestors();
|
||||
assert!(ancestors[&0].is_empty());
|
||||
let parents: Vec<u64> = ancestors[&1].iter().cloned().collect();
|
||||
assert_eq!(parents, vec![0]);
|
||||
let parents: Vec<u64> = ancestors[&2].iter().cloned().collect();
|
||||
assert_eq!(parents, vec![0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_frozen_banks() {
|
||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||
let bank = Bank::new(&genesis_config);
|
||||
let mut bank_forks = BankForks::new(bank);
|
||||
let child_bank = Bank::new_from_parent(&bank_forks[0u64], &Pubkey::default(), 1);
|
||||
bank_forks.insert(child_bank);
|
||||
assert!(bank_forks.frozen_banks().get(&0).is_some());
|
||||
assert!(bank_forks.frozen_banks().get(&1).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_active_banks() {
|
||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||
let bank = Bank::new(&genesis_config);
|
||||
let mut bank_forks = BankForks::new(bank);
|
||||
let child_bank = Bank::new_from_parent(&bank_forks[0u64], &Pubkey::default(), 1);
|
||||
bank_forks.insert(child_bank);
|
||||
assert_eq!(bank_forks.active_banks(), vec![1]);
|
||||
}
|
||||
}
|
497
runtime/src/hardened_unpack.rs
Normal file
497
runtime/src/hardened_unpack.rs
Normal file
@ -0,0 +1,497 @@
|
||||
use bzip2::bufread::BzDecoder;
|
||||
use log::*;
|
||||
use regex::Regex;
|
||||
use solana_sdk::genesis_config::GenesisConfig;
|
||||
use std::{
|
||||
fs::{self, File},
|
||||
io::{BufReader, Read},
|
||||
path::{
|
||||
Component::{CurDir, Normal},
|
||||
Path,
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
use tar::{
|
||||
Archive,
|
||||
EntryType::{Directory, GNUSparse, Regular},
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum UnpackError {
|
||||
#[error("IO error: {0}")]
|
||||
IO(#[from] std::io::Error),
|
||||
#[error("Archive error: {0}")]
|
||||
Archive(String),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, UnpackError>;
|
||||
|
||||
const MAX_SNAPSHOT_ARCHIVE_UNPACKED_SIZE: u64 = 500 * 1024 * 1024 * 1024; // 500 GiB
|
||||
const MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT: u64 = 500_000;
|
||||
pub const MAX_GENESIS_ARCHIVE_UNPACKED_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
|
||||
const MAX_GENESIS_ARCHIVE_UNPACKED_COUNT: u64 = 100;
|
||||
|
||||
fn checked_total_size_sum(total_size: u64, entry_size: u64, limit_size: u64) -> Result<u64> {
|
||||
let total_size = total_size.saturating_add(entry_size);
|
||||
if total_size > limit_size {
|
||||
return Err(UnpackError::Archive(format!(
|
||||
"too large archive: {} than limit: {}",
|
||||
total_size, limit_size,
|
||||
)));
|
||||
}
|
||||
Ok(total_size)
|
||||
}
|
||||
|
||||
fn checked_total_count_increment(total_count: u64, limit_count: u64) -> Result<u64> {
|
||||
let total_count = total_count + 1;
|
||||
if total_count > limit_count {
|
||||
return Err(UnpackError::Archive(format!(
|
||||
"too many files in snapshot: {:?}",
|
||||
total_count
|
||||
)));
|
||||
}
|
||||
Ok(total_count)
|
||||
}
|
||||
|
||||
fn check_unpack_result(unpack_result: bool, path: String) -> Result<()> {
|
||||
if !unpack_result {
|
||||
return Err(UnpackError::Archive(format!(
|
||||
"failed to unpack: {:?}",
|
||||
path
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn unpack_archive<A: Read, P: AsRef<Path>, C>(
|
||||
archive: &mut Archive<A>,
|
||||
unpack_dir: P,
|
||||
limit_size: u64,
|
||||
limit_count: u64,
|
||||
entry_checker: C,
|
||||
) -> Result<()>
|
||||
where
|
||||
C: Fn(&[&str], tar::EntryType) -> bool,
|
||||
{
|
||||
let mut total_size: u64 = 0;
|
||||
let mut total_count: u64 = 0;
|
||||
|
||||
let mut total_entries = 0;
|
||||
let mut last_log_update = Instant::now();
|
||||
for entry in archive.entries()? {
|
||||
let mut entry = entry?;
|
||||
let path = entry.path()?;
|
||||
let path_str = path.display().to_string();
|
||||
|
||||
// Although the `tar` crate safely skips at the actual unpacking, fail
|
||||
// first by ourselves when there are odd paths like including `..` or /
|
||||
// for our clearer pattern matching reasoning:
|
||||
// https://docs.rs/tar/0.4.26/src/tar/entry.rs.html#371
|
||||
let parts = path.components().map(|p| match p {
|
||||
CurDir => Some("."),
|
||||
Normal(c) => c.to_str(),
|
||||
_ => None, // Prefix (for Windows) and RootDir are forbidden
|
||||
});
|
||||
if parts.clone().any(|p| p.is_none()) {
|
||||
return Err(UnpackError::Archive(format!(
|
||||
"invalid path found: {:?}",
|
||||
path_str
|
||||
)));
|
||||
}
|
||||
|
||||
let parts: Vec<_> = parts.map(|p| p.unwrap()).collect();
|
||||
if !entry_checker(parts.as_slice(), entry.header().entry_type()) {
|
||||
return Err(UnpackError::Archive(format!(
|
||||
"extra entry found: {:?}",
|
||||
path_str
|
||||
)));
|
||||
}
|
||||
total_size = checked_total_size_sum(total_size, entry.header().size()?, limit_size)?;
|
||||
total_count = checked_total_count_increment(total_count, limit_count)?;
|
||||
|
||||
// unpack_in does its own sanitization
|
||||
// ref: https://docs.rs/tar/*/tar/struct.Entry.html#method.unpack_in
|
||||
check_unpack_result(entry.unpack_in(&unpack_dir)?, path_str)?;
|
||||
total_entries += 1;
|
||||
let now = Instant::now();
|
||||
if now.duration_since(last_log_update).as_secs() >= 10 {
|
||||
info!("unpacked {} entries so far...", total_entries);
|
||||
last_log_update = now;
|
||||
}
|
||||
}
|
||||
info!("unpacked {} entries total", total_entries);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn unpack_snapshot<A: Read, P: AsRef<Path>>(
|
||||
archive: &mut Archive<A>,
|
||||
unpack_dir: P,
|
||||
) -> Result<()> {
|
||||
unpack_archive(
|
||||
archive,
|
||||
unpack_dir,
|
||||
MAX_SNAPSHOT_ARCHIVE_UNPACKED_SIZE,
|
||||
MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT,
|
||||
is_valid_snapshot_archive_entry,
|
||||
)
|
||||
}
|
||||
|
||||
fn is_valid_snapshot_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool {
|
||||
let like_storage = Regex::new(r"^\d+\.\d+$").unwrap();
|
||||
let like_slot = Regex::new(r"^\d+$").unwrap();
|
||||
|
||||
trace!("validating: {:?} {:?}", parts, kind);
|
||||
match (parts, kind) {
|
||||
(["version"], Regular) => true,
|
||||
(["accounts"], Directory) => true,
|
||||
(["accounts", file], GNUSparse) if like_storage.is_match(file) => true,
|
||||
(["accounts", file], Regular) if like_storage.is_match(file) => true,
|
||||
(["snapshots"], Directory) => true,
|
||||
(["snapshots", "status_cache"], Regular) => true,
|
||||
(["snapshots", dir, file], Regular)
|
||||
if like_slot.is_match(dir) && like_slot.is_match(file) =>
|
||||
{
|
||||
true
|
||||
}
|
||||
(["snapshots", dir], Directory) if like_slot.is_match(dir) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn open_genesis_config(
|
||||
ledger_path: &Path,
|
||||
max_genesis_archive_unpacked_size: u64,
|
||||
) -> GenesisConfig {
|
||||
GenesisConfig::load(&ledger_path).unwrap_or_else(|load_err| {
|
||||
let genesis_package = ledger_path.join("genesis.tar.bz2");
|
||||
unpack_genesis_archive(
|
||||
&genesis_package,
|
||||
ledger_path,
|
||||
max_genesis_archive_unpacked_size,
|
||||
)
|
||||
.unwrap_or_else(|unpack_err| {
|
||||
warn!(
|
||||
"Failed to open ledger genesis_config at {:?}: {}, {}",
|
||||
ledger_path, load_err, unpack_err,
|
||||
);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
// loading must succeed at this moment
|
||||
GenesisConfig::load(&ledger_path).unwrap()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn unpack_genesis_archive(
|
||||
archive_filename: &Path,
|
||||
destination_dir: &Path,
|
||||
max_genesis_archive_unpacked_size: u64,
|
||||
) -> std::result::Result<(), UnpackError> {
|
||||
info!("Extracting {:?}...", archive_filename);
|
||||
let extract_start = Instant::now();
|
||||
|
||||
fs::create_dir_all(destination_dir)?;
|
||||
let tar_bz2 = File::open(&archive_filename)?;
|
||||
let tar = BzDecoder::new(BufReader::new(tar_bz2));
|
||||
let mut archive = Archive::new(tar);
|
||||
unpack_genesis(
|
||||
&mut archive,
|
||||
destination_dir,
|
||||
max_genesis_archive_unpacked_size,
|
||||
)?;
|
||||
info!(
|
||||
"Extracted {:?} in {:?}",
|
||||
archive_filename,
|
||||
Instant::now().duration_since(extract_start)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn unpack_genesis<A: Read, P: AsRef<Path>>(
|
||||
archive: &mut Archive<A>,
|
||||
unpack_dir: P,
|
||||
max_genesis_archive_unpacked_size: u64,
|
||||
) -> Result<()> {
|
||||
unpack_archive(
|
||||
archive,
|
||||
unpack_dir,
|
||||
max_genesis_archive_unpacked_size,
|
||||
MAX_GENESIS_ARCHIVE_UNPACKED_COUNT,
|
||||
is_valid_genesis_archive_entry,
|
||||
)
|
||||
}
|
||||
|
||||
fn is_valid_genesis_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool {
|
||||
trace!("validating: {:?} {:?}", parts, kind);
|
||||
match (parts, kind) {
|
||||
(["genesis.bin"], Regular) => true,
|
||||
(["rocksdb"], Directory) => true,
|
||||
(["rocksdb", ..], GNUSparse) => true,
|
||||
(["rocksdb", ..], Regular) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use assert_matches::assert_matches;
|
||||
use tar::{Builder, Header};
|
||||
|
||||
#[test]
|
||||
fn test_archive_is_valid_entry() {
|
||||
assert!(is_valid_snapshot_archive_entry(
|
||||
&["accounts", "0.0"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
assert!(is_valid_snapshot_archive_entry(
|
||||
&["snapshots"],
|
||||
tar::EntryType::Directory
|
||||
));
|
||||
assert!(is_valid_snapshot_archive_entry(
|
||||
&["snapshots", "3"],
|
||||
tar::EntryType::Directory
|
||||
));
|
||||
assert!(is_valid_snapshot_archive_entry(
|
||||
&["snapshots", "3", "3"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
assert!(is_valid_snapshot_archive_entry(
|
||||
&["version"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
assert!(is_valid_snapshot_archive_entry(
|
||||
&["accounts"],
|
||||
tar::EntryType::Directory
|
||||
));
|
||||
|
||||
assert!(!is_valid_snapshot_archive_entry(
|
||||
&["accounts", "0x0"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
assert!(!is_valid_snapshot_archive_entry(
|
||||
&["snapshots"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
assert!(!is_valid_snapshot_archive_entry(
|
||||
&["snapshots", "x0"],
|
||||
tar::EntryType::Directory
|
||||
));
|
||||
assert!(!is_valid_snapshot_archive_entry(
|
||||
&["snapshots", "0x"],
|
||||
tar::EntryType::Directory
|
||||
));
|
||||
assert!(!is_valid_snapshot_archive_entry(
|
||||
&["snapshots", "0", "aa"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
assert!(!is_valid_snapshot_archive_entry(
|
||||
&["aaaa"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_archive_is_valid_archive_entry() {
|
||||
assert!(is_valid_genesis_archive_entry(
|
||||
&["genesis.bin"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
assert!(is_valid_genesis_archive_entry(
|
||||
&["rocksdb"],
|
||||
tar::EntryType::Directory
|
||||
));
|
||||
assert!(is_valid_genesis_archive_entry(
|
||||
&["rocksdb", "foo"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
assert!(is_valid_genesis_archive_entry(
|
||||
&["rocksdb", "foo", "bar"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
|
||||
assert!(!is_valid_genesis_archive_entry(
|
||||
&["aaaa"],
|
||||
tar::EntryType::Regular
|
||||
));
|
||||
}
|
||||
|
||||
fn with_finalize_and_unpack<C>(archive: tar::Builder<Vec<u8>>, checker: C) -> Result<()>
|
||||
where
|
||||
C: Fn(&mut Archive<BufReader<&[u8]>>, &Path) -> Result<()>,
|
||||
{
|
||||
let data = archive.into_inner().unwrap();
|
||||
let reader = BufReader::new(&data[..]);
|
||||
let mut archive: Archive<std::io::BufReader<&[u8]>> = Archive::new(reader);
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
|
||||
checker(&mut archive, &temp_dir.into_path())
|
||||
}
|
||||
|
||||
fn finalize_and_unpack_snapshot(archive: tar::Builder<Vec<u8>>) -> Result<()> {
|
||||
with_finalize_and_unpack(archive, |a, b| unpack_snapshot(a, b))
|
||||
}
|
||||
|
||||
fn finalize_and_unpack_genesis(archive: tar::Builder<Vec<u8>>) -> Result<()> {
|
||||
with_finalize_and_unpack(archive, |a, b| {
|
||||
unpack_genesis(a, b, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE)
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_archive_unpack_snapshot_ok() {
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_path("version").unwrap();
|
||||
header.set_size(4);
|
||||
header.set_cksum();
|
||||
|
||||
let data: &[u8] = &[1, 2, 3, 4];
|
||||
|
||||
let mut archive = Builder::new(Vec::new());
|
||||
archive.append(&header, data).unwrap();
|
||||
|
||||
let result = finalize_and_unpack_snapshot(archive);
|
||||
assert_matches!(result, Ok(()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_archive_unpack_genesis_ok() {
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_path("genesis.bin").unwrap();
|
||||
header.set_size(4);
|
||||
header.set_cksum();
|
||||
|
||||
let data: &[u8] = &[1, 2, 3, 4];
|
||||
|
||||
let mut archive = Builder::new(Vec::new());
|
||||
archive.append(&header, data).unwrap();
|
||||
|
||||
let result = finalize_and_unpack_genesis(archive);
|
||||
assert_matches!(result, Ok(()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_archive_unpack_snapshot_invalid_path() {
|
||||
let mut header = Header::new_gnu();
|
||||
// bypass the sanitization of the .set_path()
|
||||
for (p, c) in header
|
||||
.as_old_mut()
|
||||
.name
|
||||
.iter_mut()
|
||||
.zip(b"foo/../../../dangerous".iter().chain(Some(&0)))
|
||||
{
|
||||
*p = *c;
|
||||
}
|
||||
header.set_size(4);
|
||||
header.set_cksum();
|
||||
|
||||
let data: &[u8] = &[1, 2, 3, 4];
|
||||
|
||||
let mut archive = Builder::new(Vec::new());
|
||||
archive.append(&header, data).unwrap();
|
||||
let result = finalize_and_unpack_snapshot(archive);
|
||||
assert_matches!(result, Err(UnpackError::Archive(ref message)) if message == "invalid path found: \"foo/../../../dangerous\"");
|
||||
}
|
||||
|
||||
fn with_archive_unpack_snapshot_invalid_path(path: &str) -> Result<()> {
|
||||
let mut header = Header::new_gnu();
|
||||
// bypass the sanitization of the .set_path()
|
||||
for (p, c) in header
|
||||
.as_old_mut()
|
||||
.name
|
||||
.iter_mut()
|
||||
.zip(path.as_bytes().iter().chain(Some(&0)))
|
||||
{
|
||||
*p = *c;
|
||||
}
|
||||
header.set_size(4);
|
||||
header.set_cksum();
|
||||
|
||||
let data: &[u8] = &[1, 2, 3, 4];
|
||||
|
||||
let mut archive = Builder::new(Vec::new());
|
||||
archive.append(&header, data).unwrap();
|
||||
with_finalize_and_unpack(archive, |unpacking_archive, path| {
|
||||
for entry in unpacking_archive.entries()? {
|
||||
if !entry?.unpack_in(path)? {
|
||||
return Err(UnpackError::Archive("failed!".to_string()));
|
||||
} else if !path.join(path).exists() {
|
||||
return Err(UnpackError::Archive("not existing!".to_string()));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_archive_unpack_itself() {
|
||||
assert_matches!(
|
||||
with_archive_unpack_snapshot_invalid_path("ryoqun/work"),
|
||||
Ok(())
|
||||
);
|
||||
// Absolute paths are neutralized as relative
|
||||
assert_matches!(
|
||||
with_archive_unpack_snapshot_invalid_path("/etc/passwd"),
|
||||
Ok(())
|
||||
);
|
||||
assert_matches!(with_archive_unpack_snapshot_invalid_path("../../../dangerous"), Err(UnpackError::Archive(ref message)) if message == "failed!");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_archive_unpack_snapshot_invalid_entry() {
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_path("foo").unwrap();
|
||||
header.set_size(4);
|
||||
header.set_cksum();
|
||||
|
||||
let data: &[u8] = &[1, 2, 3, 4];
|
||||
|
||||
let mut archive = Builder::new(Vec::new());
|
||||
archive.append(&header, data).unwrap();
|
||||
let result = finalize_and_unpack_snapshot(archive);
|
||||
assert_matches!(result, Err(UnpackError::Archive(ref message)) if message == "extra entry found: \"foo\"");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_archive_unpack_snapshot_too_large() {
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_path("version").unwrap();
|
||||
header.set_size(1024 * 1024 * 1024 * 1024 * 1024);
|
||||
header.set_cksum();
|
||||
|
||||
let data: &[u8] = &[1, 2, 3, 4];
|
||||
|
||||
let mut archive = Builder::new(Vec::new());
|
||||
archive.append(&header, data).unwrap();
|
||||
let result = finalize_and_unpack_snapshot(archive);
|
||||
assert_matches!(result, Err(UnpackError::Archive(ref message)) if message == &format!("too large archive: 1125899906842624 than limit: {}", MAX_SNAPSHOT_ARCHIVE_UNPACKED_SIZE));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_archive_unpack_snapshot_bad_unpack() {
|
||||
let result = check_unpack_result(false, "abc".to_string());
|
||||
assert_matches!(result, Err(UnpackError::Archive(ref message)) if message == "failed to unpack: \"abc\"");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_archive_checked_total_size_sum() {
|
||||
let result = checked_total_size_sum(500, 500, MAX_SNAPSHOT_ARCHIVE_UNPACKED_SIZE);
|
||||
assert_matches!(result, Ok(1000));
|
||||
|
||||
let result =
|
||||
checked_total_size_sum(u64::max_value() - 2, 2, MAX_SNAPSHOT_ARCHIVE_UNPACKED_SIZE);
|
||||
assert_matches!(result, Err(UnpackError::Archive(ref message)) if message == &format!("too large archive: 18446744073709551615 than limit: {}", MAX_SNAPSHOT_ARCHIVE_UNPACKED_SIZE));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_archive_checked_total_size_count() {
|
||||
let result = checked_total_count_increment(101, MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT);
|
||||
assert_matches!(result, Ok(102));
|
||||
|
||||
let result =
|
||||
checked_total_count_increment(999_999_999_999, MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT);
|
||||
assert_matches!(result, Err(UnpackError::Archive(ref message)) if message == "too many files in snapshot: 1000000000000");
|
||||
}
|
||||
}
|
@ -4,11 +4,13 @@ pub mod accounts_index;
|
||||
pub mod append_vec;
|
||||
pub mod bank;
|
||||
pub mod bank_client;
|
||||
pub mod bank_forks;
|
||||
mod blockhash_queue;
|
||||
pub mod bloom;
|
||||
pub mod builtin_programs;
|
||||
pub mod epoch_stakes;
|
||||
pub mod genesis_utils;
|
||||
pub mod hardened_unpack;
|
||||
mod legacy_system_instruction_processor0;
|
||||
pub mod loader_utils;
|
||||
pub mod log_collector;
|
||||
@ -17,6 +19,8 @@ mod native_loader;
|
||||
pub mod nonce_utils;
|
||||
pub mod rent_collector;
|
||||
pub mod serde_snapshot;
|
||||
pub mod snapshot_package;
|
||||
pub mod snapshot_utils;
|
||||
pub mod stakes;
|
||||
pub mod status_cache;
|
||||
mod system_instruction_processor;
|
||||
|
49
runtime/src/snapshot_package.rs
Normal file
49
runtime/src/snapshot_package.rs
Normal file
@ -0,0 +1,49 @@
|
||||
use crate::bank_forks::CompressionType;
|
||||
use crate::{accounts_db::SnapshotStorages, bank::BankSlotDelta};
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::hash::Hash;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::mpsc::{Receiver, SendError, Sender},
|
||||
};
|
||||
use tempfile::TempDir;
|
||||
|
||||
pub type AccountsPackageSender = Sender<AccountsPackage>;
|
||||
pub type AccountsPackageReceiver = Receiver<AccountsPackage>;
|
||||
pub type AccountsPackageSendError = SendError<AccountsPackage>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AccountsPackage {
|
||||
pub root: Slot,
|
||||
pub block_height: Slot,
|
||||
pub slot_deltas: Vec<BankSlotDelta>,
|
||||
pub snapshot_links: TempDir,
|
||||
pub storages: SnapshotStorages,
|
||||
pub tar_output_file: PathBuf,
|
||||
pub hash: Hash,
|
||||
pub compression: CompressionType,
|
||||
}
|
||||
|
||||
impl AccountsPackage {
|
||||
pub fn new(
|
||||
root: Slot,
|
||||
block_height: u64,
|
||||
slot_deltas: Vec<BankSlotDelta>,
|
||||
snapshot_links: TempDir,
|
||||
storages: SnapshotStorages,
|
||||
tar_output_file: PathBuf,
|
||||
hash: Hash,
|
||||
compression: CompressionType,
|
||||
) -> Self {
|
||||
Self {
|
||||
root,
|
||||
block_height,
|
||||
slot_deltas,
|
||||
snapshot_links,
|
||||
storages,
|
||||
tar_output_file,
|
||||
hash,
|
||||
compression,
|
||||
}
|
||||
}
|
||||
}
|
909
runtime/src/snapshot_utils.rs
Normal file
909
runtime/src/snapshot_utils.rs
Normal file
@ -0,0 +1,909 @@
|
||||
use crate::bank_forks::CompressionType;
|
||||
use crate::hardened_unpack::{unpack_snapshot, UnpackError};
|
||||
use crate::snapshot_package::AccountsPackage;
|
||||
use crate::{
|
||||
bank::{Bank, BankSlotDelta},
|
||||
serde_snapshot::{
|
||||
bankrc_from_stream, bankrc_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages,
|
||||
},
|
||||
};
|
||||
use bincode::serialize_into;
|
||||
use bzip2::bufread::BzDecoder;
|
||||
use flate2::read::GzDecoder;
|
||||
use fs_extra::dir::CopyOptions;
|
||||
use log::*;
|
||||
use regex::Regex;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
fmt,
|
||||
fs::{self, File},
|
||||
io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write},
|
||||
path::{Path, PathBuf},
|
||||
process::ExitStatus,
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
};
|
||||
use tar::Archive;
|
||||
use tempfile::TempDir;
|
||||
use thiserror::Error;
|
||||
|
||||
pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
|
||||
pub const TAR_SNAPSHOTS_DIR: &str = "snapshots";
|
||||
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
|
||||
pub const TAR_VERSION_FILE: &str = "version";
|
||||
|
||||
const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
|
||||
const VERSION_STRING_V1_1_0: &str = "1.1.0";
|
||||
const VERSION_STRING_V1_2_0: &str = "1.2.0";
|
||||
const OUTPUT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0;
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||
pub enum SnapshotVersion {
|
||||
V1_1_0,
|
||||
V1_2_0,
|
||||
}
|
||||
|
||||
impl From<SnapshotVersion> for &'static str {
|
||||
fn from(snapshot_version: SnapshotVersion) -> &'static str {
|
||||
match snapshot_version {
|
||||
SnapshotVersion::V1_1_0 => VERSION_STRING_V1_1_0,
|
||||
SnapshotVersion::V1_2_0 => VERSION_STRING_V1_2_0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for SnapshotVersion {
|
||||
type Err = &'static str;
|
||||
|
||||
fn from_str(version_string: &str) -> std::result::Result<Self, Self::Err> {
|
||||
match version_string {
|
||||
VERSION_STRING_V1_1_0 => Ok(SnapshotVersion::V1_1_0),
|
||||
VERSION_STRING_V1_2_0 => Ok(SnapshotVersion::V1_2_0),
|
||||
_ => Err("unsupported snapshot version"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SnapshotVersion {
|
||||
pub fn as_str(self) -> &'static str {
|
||||
<&str as From<Self>>::from(self)
|
||||
}
|
||||
|
||||
fn maybe_from_string(version_string: &str) -> Option<SnapshotVersion> {
|
||||
version_string.parse::<Self>().ok()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Ord, Eq, Debug)]
|
||||
pub struct SlotSnapshotPaths {
|
||||
pub slot: Slot,
|
||||
pub snapshot_file_path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SnapshotError {
|
||||
#[error("I/O error")]
|
||||
IO(#[from] std::io::Error),
|
||||
|
||||
#[error("serialization error")]
|
||||
Serialize(#[from] bincode::Error),
|
||||
|
||||
#[error("file system error")]
|
||||
FsExtra(#[from] fs_extra::error::Error),
|
||||
|
||||
#[error("archive generation failure {0}")]
|
||||
ArchiveGenerationFailure(ExitStatus),
|
||||
|
||||
#[error("storage path symlink is invalid")]
|
||||
StoragePathSymlinkInvalid,
|
||||
|
||||
#[error("Unpack error")]
|
||||
UnpackError(#[from] UnpackError),
|
||||
}
|
||||
pub type Result<T> = std::result::Result<T, SnapshotError>;
|
||||
|
||||
impl PartialOrd for SlotSnapshotPaths {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.slot.cmp(&other.slot))
|
||||
}
|
||||
}
|
||||
|
||||
impl SlotSnapshotPaths {
|
||||
fn copy_snapshot_directory<P: AsRef<Path>>(&self, snapshot_hardlink_dir: P) -> Result<()> {
|
||||
// Create a new directory in snapshot_hardlink_dir
|
||||
let new_slot_hardlink_dir = snapshot_hardlink_dir.as_ref().join(self.slot.to_string());
|
||||
let _ = fs::remove_dir_all(&new_slot_hardlink_dir);
|
||||
fs::create_dir_all(&new_slot_hardlink_dir)?;
|
||||
|
||||
// Copy the snapshot
|
||||
fs::copy(
|
||||
&self.snapshot_file_path,
|
||||
&new_slot_hardlink_dir.join(self.slot.to_string()),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
bank: &Bank,
|
||||
snapshot_files: &SlotSnapshotPaths,
|
||||
snapshot_path: Q,
|
||||
slots_to_snapshot: &[Slot],
|
||||
snapshot_package_output_path: P,
|
||||
snapshot_storages: SnapshotStorages,
|
||||
compression: CompressionType,
|
||||
) -> Result<AccountsPackage> {
|
||||
// Hard link all the snapshots we need for this package
|
||||
let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?;
|
||||
|
||||
// Create a snapshot package
|
||||
info!(
|
||||
"Snapshot for bank: {} has {} account storage entries",
|
||||
bank.slot(),
|
||||
snapshot_storages.len()
|
||||
);
|
||||
|
||||
// Any errors from this point on will cause the above AccountsPackage to drop, clearing
|
||||
// any temporary state created for the AccountsPackage (like the snapshot_hard_links_dir)
|
||||
snapshot_files.copy_snapshot_directory(snapshot_hard_links_dir.path())?;
|
||||
|
||||
let snapshot_package_output_file = get_snapshot_archive_path(
|
||||
&snapshot_package_output_path,
|
||||
&(bank.slot(), bank.get_accounts_hash()),
|
||||
&compression,
|
||||
);
|
||||
|
||||
let package = AccountsPackage::new(
|
||||
bank.slot(),
|
||||
bank.block_height(),
|
||||
bank.src.slot_deltas(slots_to_snapshot),
|
||||
snapshot_hard_links_dir,
|
||||
snapshot_storages,
|
||||
snapshot_package_output_file,
|
||||
bank.get_accounts_hash(),
|
||||
compression,
|
||||
);
|
||||
|
||||
Ok(package)
|
||||
}
|
||||
|
||||
fn get_compression_ext(compression: &CompressionType) -> (&'static str, &'static str) {
|
||||
match compression {
|
||||
CompressionType::Bzip2 => ("bzip2", ".tar.bz2"),
|
||||
CompressionType::Gzip => ("gzip", ".tar.gz"),
|
||||
CompressionType::Zstd => ("zstd", ".tar.zst"),
|
||||
CompressionType::NoCompression => ("", ".tar"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()> {
|
||||
info!(
|
||||
"Generating snapshot archive for slot {}",
|
||||
snapshot_package.root
|
||||
);
|
||||
|
||||
serialize_status_cache(
|
||||
snapshot_package.root,
|
||||
&snapshot_package.slot_deltas,
|
||||
&snapshot_package.snapshot_links,
|
||||
)?;
|
||||
|
||||
let mut timer = Measure::start("snapshot_package-package_snapshots");
|
||||
let tar_dir = snapshot_package
|
||||
.tar_output_file
|
||||
.parent()
|
||||
.expect("Tar output path is invalid");
|
||||
|
||||
fs::create_dir_all(tar_dir)?;
|
||||
|
||||
// Create the staging directories
|
||||
let staging_dir = TempDir::new()?;
|
||||
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);
|
||||
let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR);
|
||||
let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE);
|
||||
fs::create_dir_all(&staging_accounts_dir)?;
|
||||
|
||||
// Add the snapshots to the staging directory
|
||||
symlink::symlink_dir(
|
||||
snapshot_package.snapshot_links.path(),
|
||||
&staging_snapshots_dir,
|
||||
)?;
|
||||
|
||||
// Add the AppendVecs into the compressible list
|
||||
for storage in snapshot_package.storages.iter().flatten() {
|
||||
storage.flush()?;
|
||||
let storage_path = storage.get_path();
|
||||
let output_path = staging_accounts_dir.join(
|
||||
storage_path
|
||||
.file_name()
|
||||
.expect("Invalid AppendVec file path"),
|
||||
);
|
||||
|
||||
// `storage_path` - The file path where the AppendVec itself is located
|
||||
// `output_path` - The directory where the AppendVec will be placed in the staging directory.
|
||||
let storage_path =
|
||||
fs::canonicalize(storage_path).expect("Could not get absolute path for accounts");
|
||||
symlink::symlink_dir(storage_path, &output_path)?;
|
||||
if !output_path.is_file() {
|
||||
return Err(SnapshotError::StoragePathSymlinkInvalid);
|
||||
}
|
||||
}
|
||||
|
||||
// Write version file
|
||||
{
|
||||
let mut f = std::fs::File::create(staging_version_file)?;
|
||||
f.write_all(OUTPUT_SNAPSHOT_VERSION.as_str().as_bytes())?;
|
||||
}
|
||||
|
||||
let (compression_option, file_ext) = get_compression_ext(&snapshot_package.compression);
|
||||
let archive_options = "cfhS";
|
||||
|
||||
// Tar the staging directory into the archive at `archive_path`
|
||||
let archive_file = format!("new_state{}", file_ext);
|
||||
let archive_path = tar_dir.join(archive_file);
|
||||
let args = vec![
|
||||
archive_options,
|
||||
archive_path.to_str().unwrap(),
|
||||
"--use-compress-program",
|
||||
compression_option,
|
||||
"-C",
|
||||
staging_dir.path().to_str().unwrap(),
|
||||
TAR_ACCOUNTS_DIR,
|
||||
TAR_SNAPSHOTS_DIR,
|
||||
TAR_VERSION_FILE,
|
||||
];
|
||||
|
||||
let output = std::process::Command::new("tar").args(&args).output()?;
|
||||
if !output.status.success() {
|
||||
warn!("tar command failed with exit code: {}", output.status);
|
||||
use std::str::from_utf8;
|
||||
info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?"));
|
||||
info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?"));
|
||||
|
||||
return Err(SnapshotError::ArchiveGenerationFailure(output.status));
|
||||
}
|
||||
|
||||
// Once everything is successful, overwrite the previous tarball so that other validators
|
||||
// can fetch this newly packaged snapshot
|
||||
let metadata = fs::metadata(&archive_path)?;
|
||||
fs::rename(&archive_path, &snapshot_package.tar_output_file)?;
|
||||
|
||||
// Keep around at most two snapshot archives
|
||||
let archives = get_snapshot_archives(snapshot_package.tar_output_file.parent().unwrap());
|
||||
for old_archive in archives.into_iter().skip(2) {
|
||||
fs::remove_file(old_archive.0)
|
||||
.unwrap_or_else(|err| info!("Failed to remove old snapshot: {:}", err));
|
||||
}
|
||||
|
||||
timer.stop();
|
||||
info!(
|
||||
"Successfully created {:?}. slot: {}, elapsed ms: {}, size={}",
|
||||
snapshot_package.tar_output_file,
|
||||
snapshot_package.root,
|
||||
timer.as_ms(),
|
||||
metadata.len()
|
||||
);
|
||||
datapoint_info!(
|
||||
"snapshot-package",
|
||||
("slot", snapshot_package.root, i64),
|
||||
("duration_ms", timer.as_ms(), i64),
|
||||
("size", metadata.len(), i64)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_snapshot_paths<P: AsRef<Path>>(snapshot_path: P) -> Vec<SlotSnapshotPaths>
|
||||
where
|
||||
P: fmt::Debug,
|
||||
{
|
||||
match fs::read_dir(&snapshot_path) {
|
||||
Ok(paths) => {
|
||||
let mut names = paths
|
||||
.filter_map(|entry| {
|
||||
entry.ok().and_then(|e| {
|
||||
e.path()
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str().map(|s| s.parse::<u64>().ok()))
|
||||
.unwrap_or(None)
|
||||
})
|
||||
})
|
||||
.map(|slot| {
|
||||
let snapshot_path = snapshot_path.as_ref().join(slot.to_string());
|
||||
SlotSnapshotPaths {
|
||||
slot,
|
||||
snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<SlotSnapshotPaths>>();
|
||||
|
||||
names.sort();
|
||||
names
|
||||
}
|
||||
Err(err) => {
|
||||
info!(
|
||||
"Unable to read snapshot directory {:?}: {}",
|
||||
snapshot_path, err
|
||||
);
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize_snapshot_data_file<F>(data_file_path: &Path, serializer: F) -> Result<u64>
|
||||
where
|
||||
F: FnOnce(&mut BufWriter<File>) -> Result<()>,
|
||||
{
|
||||
serialize_snapshot_data_file_capped::<F>(
|
||||
data_file_path,
|
||||
MAX_SNAPSHOT_DATA_FILE_SIZE,
|
||||
serializer,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn deserialize_snapshot_data_file<F, T>(data_file_path: &Path, deserializer: F) -> Result<T>
|
||||
where
|
||||
F: FnOnce(&mut BufReader<File>) -> Result<T>,
|
||||
{
|
||||
deserialize_snapshot_data_file_capped::<F, T>(
|
||||
data_file_path,
|
||||
MAX_SNAPSHOT_DATA_FILE_SIZE,
|
||||
deserializer,
|
||||
)
|
||||
}
|
||||
|
||||
fn serialize_snapshot_data_file_capped<F>(
|
||||
data_file_path: &Path,
|
||||
maximum_file_size: u64,
|
||||
serializer: F,
|
||||
) -> Result<u64>
|
||||
where
|
||||
F: FnOnce(&mut BufWriter<File>) -> Result<()>,
|
||||
{
|
||||
let data_file = File::create(data_file_path)?;
|
||||
let mut data_file_stream = BufWriter::new(data_file);
|
||||
serializer(&mut data_file_stream)?;
|
||||
data_file_stream.flush()?;
|
||||
|
||||
let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?;
|
||||
if consumed_size > maximum_file_size {
|
||||
let error_message = format!(
|
||||
"too large snapshot data file to serialize: {:?} has {} bytes",
|
||||
data_file_path, consumed_size
|
||||
);
|
||||
return Err(get_io_error(&error_message));
|
||||
}
|
||||
Ok(consumed_size)
|
||||
}
|
||||
|
||||
fn deserialize_snapshot_data_file_capped<F, T>(
|
||||
data_file_path: &Path,
|
||||
maximum_file_size: u64,
|
||||
deserializer: F,
|
||||
) -> Result<T>
|
||||
where
|
||||
F: FnOnce(&mut BufReader<File>) -> Result<T>,
|
||||
{
|
||||
let file_size = fs::metadata(&data_file_path)?.len();
|
||||
|
||||
if file_size > maximum_file_size {
|
||||
let error_message = format!(
|
||||
"too large snapshot data file to deserialize: {:?} has {} bytes",
|
||||
data_file_path, file_size
|
||||
);
|
||||
return Err(get_io_error(&error_message));
|
||||
}
|
||||
|
||||
let data_file = File::open(data_file_path)?;
|
||||
let mut data_file_stream = BufReader::new(data_file);
|
||||
|
||||
let ret = deserializer(&mut data_file_stream)?;
|
||||
|
||||
let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?;
|
||||
|
||||
if file_size != consumed_size {
|
||||
let error_message = format!(
|
||||
"invalid snapshot data file: {:?} has {} bytes, however consumed {} bytes to deserialize",
|
||||
data_file_path, file_size, consumed_size
|
||||
);
|
||||
return Err(get_io_error(&error_message));
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn add_snapshot<P: AsRef<Path>>(
|
||||
snapshot_path: P,
|
||||
bank: &Bank,
|
||||
snapshot_storages: &[SnapshotStorage],
|
||||
) -> Result<SlotSnapshotPaths> {
|
||||
let slot = bank.slot();
|
||||
// snapshot_path/slot
|
||||
let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot);
|
||||
fs::create_dir_all(slot_snapshot_dir.clone())?;
|
||||
|
||||
// the bank snapshot is stored as snapshot_path/slot/slot
|
||||
let snapshot_bank_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot));
|
||||
info!(
|
||||
"Creating snapshot for slot {}, path: {:?}",
|
||||
slot, snapshot_bank_file_path,
|
||||
);
|
||||
|
||||
let mut bank_serialize = Measure::start("bank-serialize-ms");
|
||||
let bank_snapshot_serializer = move |stream: &mut BufWriter<File>| -> Result<()> {
|
||||
serialize_into(stream.by_ref(), bank)?;
|
||||
bankrc_to_stream(
|
||||
SerdeStyle::NEWER,
|
||||
stream.by_ref(),
|
||||
&bank.rc,
|
||||
snapshot_storages,
|
||||
)?;
|
||||
Ok(())
|
||||
};
|
||||
let consumed_size =
|
||||
serialize_snapshot_data_file(&snapshot_bank_file_path, bank_snapshot_serializer)?;
|
||||
bank_serialize.stop();
|
||||
|
||||
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
|
||||
datapoint_info!(
|
||||
"snapshot-bank-file",
|
||||
("slot", slot, i64),
|
||||
("size", consumed_size, i64)
|
||||
);
|
||||
|
||||
inc_new_counter_info!("bank-serialize-ms", bank_serialize.as_ms() as usize);
|
||||
|
||||
info!(
|
||||
"{} for slot {} at {:?}",
|
||||
bank_serialize, slot, snapshot_bank_file_path,
|
||||
);
|
||||
|
||||
Ok(SlotSnapshotPaths {
|
||||
slot,
|
||||
snapshot_file_path: snapshot_bank_file_path,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn serialize_status_cache(
|
||||
slot: Slot,
|
||||
slot_deltas: &[BankSlotDelta],
|
||||
snapshot_links: &TempDir,
|
||||
) -> Result<()> {
|
||||
// the status cache is stored as snapshot_path/status_cache
|
||||
let snapshot_status_cache_file_path =
|
||||
snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
|
||||
|
||||
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
|
||||
let consumed_size = serialize_snapshot_data_file(&snapshot_status_cache_file_path, |stream| {
|
||||
serialize_into(stream, slot_deltas)?;
|
||||
Ok(())
|
||||
})?;
|
||||
status_cache_serialize.stop();
|
||||
|
||||
// Monitor sizes because they're capped to MAX_SNAPSHOT_DATA_FILE_SIZE
|
||||
datapoint_info!(
|
||||
"snapshot-status-cache-file",
|
||||
("slot", slot, i64),
|
||||
("size", consumed_size, i64)
|
||||
);
|
||||
|
||||
inc_new_counter_info!(
|
||||
"serialize-status-cache-ms",
|
||||
status_cache_serialize.as_ms() as usize
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn remove_snapshot<P: AsRef<Path>>(slot: Slot, snapshot_path: P) -> Result<()> {
|
||||
let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot);
|
||||
// Remove the snapshot directory for this slot
|
||||
fs::remove_dir_all(slot_snapshot_dir)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn bank_from_archive<P: AsRef<Path>>(
|
||||
account_paths: &[PathBuf],
|
||||
frozen_account_pubkeys: &[Pubkey],
|
||||
snapshot_path: &PathBuf,
|
||||
snapshot_tar: P,
|
||||
compression: CompressionType,
|
||||
genesis_config: &GenesisConfig,
|
||||
) -> Result<Bank> {
|
||||
// Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()`
|
||||
let unpack_dir = tempfile::tempdir_in(snapshot_path)?;
|
||||
untar_snapshot_in(&snapshot_tar, &unpack_dir, compression)?;
|
||||
|
||||
let mut measure = Measure::start("bank rebuild from snapshot");
|
||||
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
|
||||
let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR);
|
||||
let unpacked_version_file = unpack_dir.as_ref().join(TAR_VERSION_FILE);
|
||||
|
||||
let mut snapshot_version = String::new();
|
||||
File::open(unpacked_version_file).and_then(|mut f| f.read_to_string(&mut snapshot_version))?;
|
||||
|
||||
let bank = rebuild_bank_from_snapshots(
|
||||
snapshot_version.trim(),
|
||||
account_paths,
|
||||
frozen_account_pubkeys,
|
||||
&unpacked_snapshots_dir,
|
||||
unpacked_accounts_dir,
|
||||
genesis_config,
|
||||
)?;
|
||||
|
||||
if !bank.verify_snapshot_bank() {
|
||||
panic!("Snapshot bank for slot {} failed to verify", bank.slot());
|
||||
}
|
||||
measure.stop();
|
||||
info!("{}", measure);
|
||||
|
||||
// Move the unpacked snapshots into `snapshot_path`
|
||||
let dir_files = fs::read_dir(&unpacked_snapshots_dir).unwrap_or_else(|err| {
|
||||
panic!(
|
||||
"Invalid snapshot path {:?}: {}",
|
||||
unpacked_snapshots_dir, err
|
||||
)
|
||||
});
|
||||
let paths: Vec<PathBuf> = dir_files
|
||||
.filter_map(|entry| entry.ok().map(|e| e.path()))
|
||||
.collect();
|
||||
let mut copy_options = CopyOptions::new();
|
||||
copy_options.overwrite = true;
|
||||
fs_extra::move_items(&paths, &snapshot_path, ©_options)?;
|
||||
|
||||
Ok(bank)
|
||||
}
|
||||
|
||||
pub fn get_snapshot_archive_path<P: AsRef<Path>>(
|
||||
snapshot_output_dir: P,
|
||||
snapshot_hash: &(Slot, Hash),
|
||||
compression: &CompressionType,
|
||||
) -> PathBuf {
|
||||
snapshot_output_dir.as_ref().join(format!(
|
||||
"snapshot-{}-{}{}",
|
||||
snapshot_hash.0,
|
||||
snapshot_hash.1,
|
||||
get_compression_ext(compression).1,
|
||||
))
|
||||
}
|
||||
|
||||
fn compression_type_from_str(compress: &str) -> Option<CompressionType> {
|
||||
match compress {
|
||||
"bz2" => Some(CompressionType::Bzip2),
|
||||
"gz" => Some(CompressionType::Gzip),
|
||||
"zst" => Some(CompressionType::Zstd),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn snapshot_hash_of(archive_filename: &str) -> Option<(Slot, Hash, CompressionType)> {
|
||||
let snapshot_filename_regex =
|
||||
Regex::new(r"snapshot-(\d+)-([[:alnum:]]+)\.tar\.(bz2|zst|gz)$").unwrap();
|
||||
|
||||
if let Some(captures) = snapshot_filename_regex.captures(archive_filename) {
|
||||
let slot_str = captures.get(1).unwrap().as_str();
|
||||
let hash_str = captures.get(2).unwrap().as_str();
|
||||
let ext = captures.get(3).unwrap().as_str();
|
||||
|
||||
if let (Ok(slot), Ok(hash), Some(compression)) = (
|
||||
slot_str.parse::<Slot>(),
|
||||
hash_str.parse::<Hash>(),
|
||||
compression_type_from_str(ext),
|
||||
) {
|
||||
return Some((slot, hash, compression));
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn get_snapshot_archives<P: AsRef<Path>>(
|
||||
snapshot_output_dir: P,
|
||||
) -> Vec<(PathBuf, (Slot, Hash, CompressionType))> {
|
||||
match fs::read_dir(&snapshot_output_dir) {
|
||||
Err(err) => {
|
||||
info!("Unable to read snapshot directory: {}", err);
|
||||
vec![]
|
||||
}
|
||||
Ok(files) => {
|
||||
let mut archives: Vec<_> = files
|
||||
.filter_map(|entry| {
|
||||
if let Ok(entry) = entry {
|
||||
let path = entry.path();
|
||||
if path.is_file() {
|
||||
if let Some(snapshot_hash) =
|
||||
snapshot_hash_of(path.file_name().unwrap().to_str().unwrap())
|
||||
{
|
||||
return Some((path, snapshot_hash));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.collect();
|
||||
|
||||
archives.sort_by(|a, b| (b.1).0.cmp(&(a.1).0)); // reverse sort by slot
|
||||
archives
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_highest_snapshot_archive_path<P: AsRef<Path>>(
|
||||
snapshot_output_dir: P,
|
||||
) -> Option<(PathBuf, (Slot, Hash, CompressionType))> {
|
||||
let archives = get_snapshot_archives(snapshot_output_dir);
|
||||
archives.into_iter().next()
|
||||
}
|
||||
|
||||
pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
snapshot_tar: P,
|
||||
unpack_dir: Q,
|
||||
compression: CompressionType,
|
||||
) -> Result<()> {
|
||||
let mut measure = Measure::start("snapshot untar");
|
||||
let tar_name = File::open(&snapshot_tar)?;
|
||||
match compression {
|
||||
CompressionType::Bzip2 => {
|
||||
let tar = BzDecoder::new(BufReader::new(tar_name));
|
||||
let mut archive = Archive::new(tar);
|
||||
unpack_snapshot(&mut archive, unpack_dir)?;
|
||||
}
|
||||
CompressionType::Gzip => {
|
||||
let tar = GzDecoder::new(BufReader::new(tar_name));
|
||||
let mut archive = Archive::new(tar);
|
||||
unpack_snapshot(&mut archive, unpack_dir)?;
|
||||
}
|
||||
CompressionType::Zstd => {
|
||||
let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?;
|
||||
let mut archive = Archive::new(tar);
|
||||
unpack_snapshot(&mut archive, unpack_dir)?;
|
||||
}
|
||||
CompressionType::NoCompression => {
|
||||
let tar = BufReader::new(tar_name);
|
||||
let mut archive = Archive::new(tar);
|
||||
unpack_snapshot(&mut archive, unpack_dir)?;
|
||||
}
|
||||
};
|
||||
measure.stop();
|
||||
info!("{}", measure);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rebuild_bank_from_snapshots<P>(
|
||||
snapshot_version: &str,
|
||||
account_paths: &[PathBuf],
|
||||
frozen_account_pubkeys: &[Pubkey],
|
||||
unpacked_snapshots_dir: &PathBuf,
|
||||
append_vecs_path: P,
|
||||
genesis_config: &GenesisConfig,
|
||||
) -> Result<Bank>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
info!("snapshot version: {}", snapshot_version);
|
||||
|
||||
let snapshot_version_enum =
|
||||
SnapshotVersion::maybe_from_string(snapshot_version).ok_or_else(|| {
|
||||
get_io_error(&format!(
|
||||
"unsupported snapshot version: {}",
|
||||
snapshot_version
|
||||
))
|
||||
})?;
|
||||
let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
|
||||
if snapshot_paths.len() > 1 {
|
||||
return Err(get_io_error("invalid snapshot format"));
|
||||
}
|
||||
let root_paths = snapshot_paths
|
||||
.pop()
|
||||
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
|
||||
|
||||
info!("Loading bank from {:?}", &root_paths.snapshot_file_path);
|
||||
let bank = deserialize_snapshot_data_file(&root_paths.snapshot_file_path, |mut stream| {
|
||||
let mut bank: Bank = bincode::config()
|
||||
.limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
|
||||
.deserialize_from(&mut stream)?;
|
||||
|
||||
info!("Rebuilding accounts...");
|
||||
|
||||
let mut bankrc = match snapshot_version_enum {
|
||||
SnapshotVersion::V1_1_0 => bankrc_from_stream(
|
||||
SerdeStyle::OLDER,
|
||||
account_paths,
|
||||
bank.slot(),
|
||||
&mut stream,
|
||||
&append_vecs_path,
|
||||
),
|
||||
SnapshotVersion::V1_2_0 => bankrc_from_stream(
|
||||
SerdeStyle::NEWER,
|
||||
account_paths,
|
||||
bank.slot(),
|
||||
&mut stream,
|
||||
&append_vecs_path,
|
||||
),
|
||||
}?;
|
||||
Arc::get_mut(&mut Arc::get_mut(&mut bankrc.accounts).unwrap().accounts_db)
|
||||
.unwrap()
|
||||
.freeze_accounts(&bank.ancestors, frozen_account_pubkeys);
|
||||
|
||||
bank.rc = bankrc;
|
||||
bank.operating_mode = Some(genesis_config.operating_mode);
|
||||
bank.finish_init();
|
||||
Ok(bank)
|
||||
})?;
|
||||
|
||||
let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
|
||||
let slot_deltas = deserialize_snapshot_data_file(&status_cache_path, |stream| {
|
||||
info!("Rebuilding status cache...");
|
||||
let slot_deltas: Vec<BankSlotDelta> = bincode::config()
|
||||
.limit(MAX_SNAPSHOT_DATA_FILE_SIZE)
|
||||
.deserialize_from(stream)?;
|
||||
Ok(slot_deltas)
|
||||
})?;
|
||||
|
||||
bank.src.append(&slot_deltas);
|
||||
|
||||
info!("Loaded bank for slot: {}", bank.slot());
|
||||
Ok(bank)
|
||||
}
|
||||
|
||||
fn get_snapshot_file_name(slot: Slot) -> String {
|
||||
slot.to_string()
|
||||
}
|
||||
|
||||
fn get_bank_snapshot_dir<P: AsRef<Path>>(path: P, slot: Slot) -> PathBuf {
|
||||
path.as_ref().join(slot.to_string())
|
||||
}
|
||||
|
||||
fn get_io_error(error: &str) -> SnapshotError {
|
||||
warn!("Snapshot Error: {:?}", error);
|
||||
SnapshotError::IO(IOError::new(ErrorKind::Other, error))
|
||||
}
|
||||
|
||||
pub fn verify_snapshot_archive<P, Q, R>(
|
||||
snapshot_archive: P,
|
||||
snapshots_to_verify: Q,
|
||||
storages_to_verify: R,
|
||||
compression: CompressionType,
|
||||
) where
|
||||
P: AsRef<Path>,
|
||||
Q: AsRef<Path>,
|
||||
R: AsRef<Path>,
|
||||
{
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
let unpack_dir = temp_dir.path();
|
||||
untar_snapshot_in(snapshot_archive, &unpack_dir, compression).unwrap();
|
||||
|
||||
// Check snapshots are the same
|
||||
let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR);
|
||||
assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
|
||||
|
||||
// Check the account entries are the same
|
||||
let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR);
|
||||
assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap());
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use assert_matches::assert_matches;
|
||||
use bincode::{deserialize_from, serialize_into};
|
||||
use std::mem::size_of;
|
||||
|
||||
#[test]
|
||||
fn test_serialize_snapshot_data_file_under_limit() {
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
let expected_consumed_size = size_of::<u32>() as u64;
|
||||
let consumed_size = serialize_snapshot_data_file_capped(
|
||||
&temp_dir.path().join("data-file"),
|
||||
expected_consumed_size,
|
||||
|stream| {
|
||||
serialize_into(stream, &2323_u32)?;
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(consumed_size, expected_consumed_size);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serialize_snapshot_data_file_over_limit() {
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
let expected_consumed_size = size_of::<u32>() as u64;
|
||||
let result = serialize_snapshot_data_file_capped(
|
||||
&temp_dir.path().join("data-file"),
|
||||
expected_consumed_size - 1,
|
||||
|stream| {
|
||||
serialize_into(stream, &2323_u32)?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("too large snapshot data file to serialize"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_snapshot_data_file_under_limit() {
|
||||
let expected_data = 2323_u32;
|
||||
let expected_consumed_size = size_of::<u32>() as u64;
|
||||
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
serialize_snapshot_data_file_capped(
|
||||
&temp_dir.path().join("data-file"),
|
||||
expected_consumed_size,
|
||||
|stream| {
|
||||
serialize_into(stream, &expected_data)?;
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let actual_data = deserialize_snapshot_data_file_capped(
|
||||
&temp_dir.path().join("data-file"),
|
||||
expected_consumed_size,
|
||||
|stream| Ok(deserialize_from::<_, u32>(stream)?),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(actual_data, expected_data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_snapshot_data_file_over_limit() {
|
||||
let expected_data = 2323_u32;
|
||||
let expected_consumed_size = size_of::<u32>() as u64;
|
||||
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
serialize_snapshot_data_file_capped(
|
||||
&temp_dir.path().join("data-file"),
|
||||
expected_consumed_size,
|
||||
|stream| {
|
||||
serialize_into(stream, &expected_data)?;
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let result = deserialize_snapshot_data_file_capped(
|
||||
&temp_dir.path().join("data-file"),
|
||||
expected_consumed_size - 1,
|
||||
|stream| Ok(deserialize_from::<_, u32>(stream)?),
|
||||
);
|
||||
assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_snapshot_data_file_extra_data() {
|
||||
let expected_data = 2323_u32;
|
||||
let expected_consumed_size = size_of::<u32>() as u64;
|
||||
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
serialize_snapshot_data_file_capped(
|
||||
&temp_dir.path().join("data-file"),
|
||||
expected_consumed_size * 2,
|
||||
|stream| {
|
||||
serialize_into(stream.by_ref(), &expected_data)?;
|
||||
serialize_into(stream.by_ref(), &expected_data)?;
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let result = deserialize_snapshot_data_file_capped(
|
||||
&temp_dir.path().join("data-file"),
|
||||
expected_consumed_size * 2,
|
||||
|stream| Ok(deserialize_from::<_, u32>(stream)?),
|
||||
);
|
||||
assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_snapshot_hash_of() {
|
||||
assert_eq!(
|
||||
snapshot_hash_of(&format!("snapshot-42-{}.tar.bz2", Hash::default())),
|
||||
Some((42, Hash::default(), CompressionType::Bzip2))
|
||||
);
|
||||
assert_eq!(
|
||||
snapshot_hash_of(&format!("snapshot-43-{}.tar.zst", Hash::default())),
|
||||
Some((43, Hash::default(), CompressionType::Zstd))
|
||||
);
|
||||
|
||||
assert!(snapshot_hash_of("invalid").is_none());
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user