automerge
This commit is contained in:
@@ -24,6 +24,7 @@ rand = "0.6.5"
|
||||
rand_chacha = "0.1.1"
|
||||
rayon = "1.2.0"
|
||||
reed-solomon-erasure = { package = "solana-reed-solomon-erasure", version = "4.0.1-3", features = ["simd-accel"] }
|
||||
regex = "1.3.4"
|
||||
serde = "1.0.104"
|
||||
serde_bytes = "0.11.3"
|
||||
solana-client = { path = "../client", version = "1.0.1" }
|
||||
|
@@ -10,7 +10,7 @@ use solana_sdk::{clock::Slot, timing};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
ops::Index,
|
||||
path::{Path, PathBuf},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
time::Instant,
|
||||
};
|
||||
@@ -192,7 +192,6 @@ impl BankForks {
|
||||
root,
|
||||
&root_bank.src.roots(),
|
||||
snapshot_package_sender.as_ref().unwrap(),
|
||||
snapshot_utils::get_snapshot_archive_path(&config.snapshot_package_output_path),
|
||||
);
|
||||
if r.is_err() {
|
||||
warn!("Error generating snapshot for bank: {}, err: {:?}", root, r);
|
||||
@@ -236,12 +235,11 @@ impl BankForks {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn generate_snapshot<P: AsRef<Path>>(
|
||||
pub fn generate_snapshot(
|
||||
&self,
|
||||
root: Slot,
|
||||
slots_to_snapshot: &[Slot],
|
||||
snapshot_package_sender: &SnapshotPackageSender,
|
||||
tar_output_file: P,
|
||||
) -> Result<()> {
|
||||
let config = self.snapshot_config.as_ref().unwrap();
|
||||
|
||||
@@ -267,9 +265,9 @@ impl BankForks {
|
||||
let package = snapshot_utils::package_snapshot(
|
||||
&bank,
|
||||
latest_slot_snapshot_paths,
|
||||
tar_output_file,
|
||||
&config.snapshot_path,
|
||||
slots_to_snapshot,
|
||||
&config.snapshot_package_output_path,
|
||||
storages,
|
||||
)?;
|
||||
|
||||
|
@@ -10,7 +10,7 @@ use crate::{
|
||||
};
|
||||
use log::*;
|
||||
use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash};
|
||||
use std::{fs, path::PathBuf, result, sync::Arc};
|
||||
use std::{fs, path::PathBuf, process, result, sync::Arc};
|
||||
|
||||
pub type LoadResult = result::Result<
|
||||
(
|
||||
@@ -52,37 +52,50 @@ pub fn load(
|
||||
fs::create_dir_all(&snapshot_config.snapshot_path)
|
||||
.expect("Couldn't create snapshot directory");
|
||||
|
||||
let tar = snapshot_utils::get_snapshot_archive_path(
|
||||
match snapshot_utils::get_highest_snapshot_archive_path(
|
||||
&snapshot_config.snapshot_package_output_path,
|
||||
);
|
||||
if tar.exists() {
|
||||
info!("Loading snapshot package: {:?}", tar);
|
||||
// Fail hard here if snapshot fails to load, don't silently continue
|
||||
) {
|
||||
Some((archive_filename, archive_snapshot_hash)) => {
|
||||
info!("Loading snapshot package: {:?}", archive_filename);
|
||||
// Fail hard here if snapshot fails to load, don't silently continue
|
||||
|
||||
if account_paths.is_empty() {
|
||||
panic!("Account paths not present when booting from snapshot")
|
||||
if account_paths.is_empty() {
|
||||
error!("Account paths not present when booting from snapshot");
|
||||
process::exit(1);
|
||||
}
|
||||
|
||||
let deserialized_bank = snapshot_utils::bank_from_archive(
|
||||
&account_paths,
|
||||
&snapshot_config.snapshot_path,
|
||||
&archive_filename,
|
||||
)
|
||||
.expect("Load from snapshot failed");
|
||||
|
||||
let deserialized_snapshot_hash = (
|
||||
deserialized_bank.slot(),
|
||||
deserialized_bank.get_accounts_hash(),
|
||||
);
|
||||
|
||||
if deserialized_snapshot_hash != archive_snapshot_hash {
|
||||
error!(
|
||||
"Snapshot has mismatch:\narchive: {:?}\ndeserialized: {:?}",
|
||||
archive_snapshot_hash, deserialized_snapshot_hash
|
||||
);
|
||||
process::exit(1);
|
||||
}
|
||||
|
||||
return to_loadresult(
|
||||
blockstore_processor::process_blockstore_from_root(
|
||||
genesis_config,
|
||||
blockstore,
|
||||
Arc::new(deserialized_bank),
|
||||
&process_options,
|
||||
&VerifyRecyclers::default(),
|
||||
),
|
||||
Some(deserialized_snapshot_hash),
|
||||
);
|
||||
}
|
||||
|
||||
let deserialized_bank = snapshot_utils::bank_from_archive(
|
||||
&account_paths,
|
||||
&snapshot_config.snapshot_path,
|
||||
&tar,
|
||||
)
|
||||
.expect("Load from snapshot failed");
|
||||
|
||||
let snapshot_hash = (deserialized_bank.slot(), deserialized_bank.hash());
|
||||
return to_loadresult(
|
||||
blockstore_processor::process_blockstore_from_root(
|
||||
genesis_config,
|
||||
blockstore,
|
||||
Arc::new(deserialized_bank),
|
||||
&process_options,
|
||||
&VerifyRecyclers::default(),
|
||||
),
|
||||
Some(snapshot_hash),
|
||||
);
|
||||
} else {
|
||||
info!("Snapshot package does not exist: {:?}", tar);
|
||||
None => info!("No snapshot package available"),
|
||||
}
|
||||
} else {
|
||||
info!("Snapshots disabled");
|
||||
|
@@ -3,6 +3,7 @@ use bincode::serialize_into;
|
||||
use bzip2::bufread::BzDecoder;
|
||||
use fs_extra::dir::CopyOptions;
|
||||
use log::*;
|
||||
use regex::Regex;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_runtime::{
|
||||
accounts_db::{SnapshotStorage, SnapshotStorages},
|
||||
@@ -11,7 +12,7 @@ use solana_runtime::{
|
||||
MAX_SNAPSHOT_DATA_FILE_SIZE,
|
||||
},
|
||||
};
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::{clock::Slot, hash::Hash};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
env,
|
||||
@@ -81,9 +82,9 @@ impl SlotSnapshotPaths {
|
||||
pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
bank: &Bank,
|
||||
snapshot_files: &SlotSnapshotPaths,
|
||||
snapshot_package_output_file: P,
|
||||
snapshot_path: Q,
|
||||
slots_to_snapshot: &[Slot],
|
||||
snapshot_package_output_path: P,
|
||||
snapshot_storages: SnapshotStorages,
|
||||
) -> Result<SnapshotPackage> {
|
||||
// Hard link all the snapshots we need for this package
|
||||
@@ -100,13 +101,18 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
// any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir)
|
||||
snapshot_files.copy_snapshot_directory(snapshot_hard_links_dir.path())?;
|
||||
|
||||
let snapshot_package_output_file = get_snapshot_archive_path(
|
||||
&snapshot_package_output_path,
|
||||
&(bank.slot(), bank.get_accounts_hash()),
|
||||
);
|
||||
|
||||
let package = SnapshotPackage::new(
|
||||
bank.slot(),
|
||||
bank.src.slot_deltas(slots_to_snapshot),
|
||||
snapshot_hard_links_dir,
|
||||
snapshot_storages,
|
||||
snapshot_package_output_file.as_ref().to_path_buf(),
|
||||
bank.hash(),
|
||||
snapshot_package_output_file,
|
||||
bank.get_accounts_hash(),
|
||||
);
|
||||
|
||||
Ok(package)
|
||||
@@ -205,9 +211,17 @@ pub fn archive_snapshot_package(snapshot_package: &SnapshotPackage) -> Result<()
|
||||
let metadata = fs::metadata(&archive_path)?;
|
||||
fs::rename(&archive_path, &snapshot_package.tar_output_file)?;
|
||||
|
||||
// Keep around at most two snapshot archives
|
||||
let archives = get_snapshot_archives(snapshot_package.tar_output_file.parent().unwrap());
|
||||
for old_archive in archives.into_iter().skip(2) {
|
||||
fs::remove_file(old_archive.0)
|
||||
.unwrap_or_else(|err| info!("Failed to remove old snapshot: {:}", err));
|
||||
}
|
||||
|
||||
timer.stop();
|
||||
info!(
|
||||
"Successfully created tarball. slot: {}, elapsed ms: {}, size={}",
|
||||
"Successfully created {:?}. slot: {}, elapsed ms: {}, size={}",
|
||||
snapshot_package.tar_output_file,
|
||||
snapshot_package.root,
|
||||
timer.as_ms(),
|
||||
metadata.len()
|
||||
@@ -415,32 +429,6 @@ pub fn remove_snapshot<P: AsRef<Path>>(slot: Slot, snapshot_path: P) -> Result<(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn bank_slot_from_archive<P: AsRef<Path>>(snapshot_tar: P) -> Result<Slot> {
|
||||
let tempdir = tempfile::TempDir::new()?;
|
||||
untar_snapshot_in(&snapshot_tar, &tempdir)?;
|
||||
let unpacked_snapshots_dir = tempdir.path().join(TAR_SNAPSHOTS_DIR);
|
||||
let local_account_paths = vec![tempdir.path().join("account_dummy")];
|
||||
let unpacked_accounts_dir = tempdir.path().join(TAR_ACCOUNTS_DIR);
|
||||
let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
|
||||
let last_root_paths = snapshot_paths
|
||||
.last()
|
||||
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
|
||||
let bank = deserialize_snapshot_data_file(
|
||||
&last_root_paths.snapshot_file_path,
|
||||
MAX_SNAPSHOT_DATA_FILE_SIZE,
|
||||
|stream| {
|
||||
let bank: Bank = deserialize_from_snapshot(stream.by_ref())?;
|
||||
bank.rc.accounts_from_stream(
|
||||
stream.by_ref(),
|
||||
&local_account_paths,
|
||||
&unpacked_accounts_dir,
|
||||
)?;
|
||||
Ok(bank)
|
||||
},
|
||||
)?;
|
||||
Ok(bank.slot())
|
||||
}
|
||||
|
||||
pub fn bank_from_archive<P: AsRef<Path>>(
|
||||
account_paths: &[PathBuf],
|
||||
snapshot_path: &PathBuf,
|
||||
@@ -496,8 +484,59 @@ fn is_snapshot_compression_disabled() -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_snapshot_archive_path<P: AsRef<Path>>(snapshot_output_dir: P) -> PathBuf {
|
||||
snapshot_output_dir.as_ref().join("snapshot.tar.bz2")
|
||||
pub fn get_snapshot_archive_path<P: AsRef<Path>>(
|
||||
snapshot_output_dir: P,
|
||||
snapshot_hash: &(Slot, Hash),
|
||||
) -> PathBuf {
|
||||
snapshot_output_dir.as_ref().join(format!(
|
||||
"snapshot-{}-{}.tar.bz2",
|
||||
snapshot_hash.0, snapshot_hash.1
|
||||
))
|
||||
}
|
||||
|
||||
fn snapshot_hash_of(archive_filename: &str) -> Option<(Slot, Hash)> {
|
||||
let snapshot_filename_regex = Regex::new(r"snapshot-(\d+)-([[:alnum:]]+)\.tar\.bz2$").unwrap();
|
||||
|
||||
if let Some(captures) = snapshot_filename_regex.captures(archive_filename) {
|
||||
let slot_str = captures.get(1).unwrap().as_str();
|
||||
let hash_str = captures.get(2).unwrap().as_str();
|
||||
|
||||
if let (Ok(slot), Ok(hash)) = (slot_str.parse::<Slot>(), hash_str.parse::<Hash>()) {
|
||||
return Some((slot, hash));
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn get_snapshot_archives<P: AsRef<Path>>(snapshot_output_dir: P) -> Vec<(PathBuf, (Slot, Hash))> {
|
||||
let files = fs::read_dir(&snapshot_output_dir)
|
||||
.unwrap_or_else(|err| panic!("Unable to read snapshot directory: {}", err));
|
||||
|
||||
let mut archives: Vec<_> = files
|
||||
.filter_map(|entry| {
|
||||
if let Ok(entry) = entry {
|
||||
let path = entry.path();
|
||||
if path.is_file() {
|
||||
if let Some(snapshot_hash) =
|
||||
snapshot_hash_of(path.file_name().unwrap().to_str().unwrap())
|
||||
{
|
||||
return Some((path, snapshot_hash));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.collect();
|
||||
|
||||
archives.sort_by(|a, b| (b.1).0.cmp(&(a.1).0)); // reverse sort by slot
|
||||
archives
|
||||
}
|
||||
|
||||
pub fn get_highest_snapshot_archive_path<P: AsRef<Path>>(
|
||||
snapshot_output_dir: P,
|
||||
) -> Option<(PathBuf, (Slot, Hash))> {
|
||||
let archives = get_snapshot_archives(snapshot_output_dir);
|
||||
archives.into_iter().next()
|
||||
}
|
||||
|
||||
pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
@@ -600,7 +639,7 @@ fn get_io_error(error: &str) -> SnapshotError {
|
||||
}
|
||||
|
||||
pub fn verify_snapshot_archive<P, Q, R>(
|
||||
snapshot_tar: P,
|
||||
snapshot_archive: P,
|
||||
snapshots_to_verify: Q,
|
||||
storages_to_verify: R,
|
||||
) where
|
||||
@@ -610,7 +649,7 @@ pub fn verify_snapshot_archive<P, Q, R>(
|
||||
{
|
||||
let temp_dir = tempfile::TempDir::new().unwrap();
|
||||
let unpack_dir = temp_dir.path();
|
||||
untar_snapshot_in(snapshot_tar, &unpack_dir).unwrap();
|
||||
untar_snapshot_in(snapshot_archive, &unpack_dir).unwrap();
|
||||
|
||||
// Check snapshots are the same
|
||||
let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR);
|
||||
@@ -732,4 +771,13 @@ mod tests {
|
||||
);
|
||||
assert_matches!(result, Err(SnapshotError::IO(ref message)) if message.to_string().starts_with("invalid snapshot data file"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_snapshot_hash_of() {
|
||||
assert_eq!(
|
||||
snapshot_hash_of(&format!("snapshot-42-{}.tar.bz2", Hash::default())),
|
||||
Some((42, Hash::default()))
|
||||
);
|
||||
assert!(snapshot_hash_of("invalid").is_none());
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user