Snapshot pipefitting through the validator cli (#5617)
* Handle 404 errors better * Snapshot pipefitting through the validator cli * Add download progress bar * Log the current entrypoint slot
This commit is contained in:
@ -16,35 +16,14 @@ use std::time::Instant;
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct SnapshotConfig {
|
||||
snapshot_path: PathBuf,
|
||||
snapshot_package_output_path: PathBuf,
|
||||
snapshot_interval_slots: usize,
|
||||
}
|
||||
// Generate a new snapshot every this many slots
|
||||
pub snapshot_interval_slots: usize,
|
||||
|
||||
impl SnapshotConfig {
|
||||
pub fn new(
|
||||
snapshot_path: PathBuf,
|
||||
snapshot_package_output_path: PathBuf,
|
||||
snapshot_interval_slots: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
snapshot_path,
|
||||
snapshot_package_output_path,
|
||||
snapshot_interval_slots,
|
||||
}
|
||||
}
|
||||
// Where to store the latest packaged snapshot
|
||||
pub snapshot_package_output_path: PathBuf,
|
||||
|
||||
pub fn snapshot_path(&self) -> &Path {
|
||||
self.snapshot_path.as_path()
|
||||
}
|
||||
|
||||
pub fn snapshot_package_output_path(&self) -> &Path {
|
||||
&self.snapshot_package_output_path.as_path()
|
||||
}
|
||||
|
||||
pub fn snapshot_interval_slots(&self) -> usize {
|
||||
self.snapshot_interval_slots
|
||||
}
|
||||
// Where to place the snapshots for recent slots
|
||||
pub snapshot_path: PathBuf,
|
||||
}
|
||||
|
||||
pub struct BankForks {
|
||||
@ -234,10 +213,7 @@ impl BankForks {
|
||||
// Generate a snapshot if snapshots are configured and it's been an appropriate number
|
||||
// of banks since the last snapshot
|
||||
if self.snapshot_config.is_some() && snapshot_package_sender.is_some() {
|
||||
let config = self
|
||||
.snapshot_config
|
||||
.as_ref()
|
||||
.expect("Called package_snapshot without a snapshot configuration");
|
||||
let config = self.snapshot_config.as_ref().unwrap();
|
||||
info!("setting snapshot root: {}", root);
|
||||
if root - self.slots_since_snapshot[0] >= config.snapshot_interval_slots as u64 {
|
||||
let mut snapshot_time = Measure::start("total-snapshot-ms");
|
||||
@ -308,6 +284,7 @@ impl BankForks {
|
||||
.cloned()
|
||||
.expect("root must exist in BankForks");
|
||||
snapshot_utils::add_snapshot(&config.snapshot_path, &bank, slots_since_snapshot)?;
|
||||
|
||||
// Package the relevant snapshots
|
||||
let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path);
|
||||
|
||||
@ -835,7 +812,7 @@ mod tests {
|
||||
genesis_block_info: GenesisBlockInfo,
|
||||
}
|
||||
|
||||
fn setup_snapshot_test(snapshot_interval: usize) -> SnapshotTestConfig {
|
||||
fn setup_snapshot_test(snapshot_interval_slots: usize) -> SnapshotTestConfig {
|
||||
let accounts_dir = TempDir::new().unwrap();
|
||||
let snapshot_dir = TempDir::new().unwrap();
|
||||
let snapshot_output_path = TempDir::new().unwrap();
|
||||
@ -847,11 +824,11 @@ mod tests {
|
||||
bank0.freeze();
|
||||
let mut bank_forks = BankForks::new(0, bank0);
|
||||
|
||||
let snapshot_config = SnapshotConfig::new(
|
||||
PathBuf::from(snapshot_dir.path()),
|
||||
PathBuf::from(snapshot_output_path.path()),
|
||||
snapshot_interval,
|
||||
);
|
||||
let snapshot_config = SnapshotConfig {
|
||||
snapshot_interval_slots,
|
||||
snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()),
|
||||
snapshot_path: PathBuf::from(snapshot_dir.path()),
|
||||
};
|
||||
bank_forks.set_snapshot_config(snapshot_config.clone());
|
||||
SnapshotTestConfig {
|
||||
accounts_dir,
|
||||
|
@ -14,7 +14,6 @@ use std::fs::File;
|
||||
use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tar::Archive;
|
||||
use tempfile::TempDir;
|
||||
|
||||
const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
|
||||
|
||||
@ -92,29 +91,43 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
Ok(package)
|
||||
}
|
||||
|
||||
pub fn get_snapshot_paths<P: AsRef<Path>>(snapshot_path: P) -> Vec<SlotSnapshotPaths> {
|
||||
let paths = fs::read_dir(&snapshot_path).expect("Invalid snapshot path");
|
||||
let mut names = paths
|
||||
.filter_map(|entry| {
|
||||
entry.ok().and_then(|e| {
|
||||
e.path()
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str().map(|s| s.parse::<u64>().ok()))
|
||||
.unwrap_or(None)
|
||||
})
|
||||
})
|
||||
.map(|slot| {
|
||||
let snapshot_path = snapshot_path.as_ref().join(slot.to_string());
|
||||
SlotSnapshotPaths {
|
||||
slot,
|
||||
snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)),
|
||||
snapshot_status_cache_path: snapshot_path.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<SlotSnapshotPaths>>();
|
||||
pub fn get_snapshot_paths<P: AsRef<Path>>(snapshot_path: P) -> Vec<SlotSnapshotPaths>
|
||||
where
|
||||
P: std::fmt::Debug,
|
||||
{
|
||||
match fs::read_dir(&snapshot_path) {
|
||||
Ok(paths) => {
|
||||
let mut names = paths
|
||||
.filter_map(|entry| {
|
||||
entry.ok().and_then(|e| {
|
||||
e.path()
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str().map(|s| s.parse::<u64>().ok()))
|
||||
.unwrap_or(None)
|
||||
})
|
||||
})
|
||||
.map(|slot| {
|
||||
let snapshot_path = snapshot_path.as_ref().join(slot.to_string());
|
||||
SlotSnapshotPaths {
|
||||
slot,
|
||||
snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)),
|
||||
snapshot_status_cache_path: snapshot_path
|
||||
.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<SlotSnapshotPaths>>();
|
||||
|
||||
names.sort();
|
||||
names
|
||||
names.sort();
|
||||
names
|
||||
}
|
||||
Err(err) => {
|
||||
info!(
|
||||
"Unable to read snapshot directory {:?}: {}",
|
||||
snapshot_path, err
|
||||
);
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_snapshot<P: AsRef<Path>>(
|
||||
@ -172,7 +185,7 @@ pub fn remove_snapshot<P: AsRef<Path>>(slot: u64, snapshot_path: P) -> Result<()
|
||||
}
|
||||
|
||||
pub fn bank_slot_from_archive<P: AsRef<Path>>(snapshot_tar: P) -> Result<u64> {
|
||||
let tempdir = TempDir::new()?;
|
||||
let tempdir = tempfile::TempDir::new()?;
|
||||
untar_snapshot_in(&snapshot_tar, &tempdir)?;
|
||||
let unpacked_snapshots_dir = tempdir.path().join(TAR_SNAPSHOTS_DIR);
|
||||
let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
|
||||
@ -191,7 +204,7 @@ pub fn bank_from_archive<P: AsRef<Path>>(
|
||||
snapshot_tar: P,
|
||||
) -> Result<Bank> {
|
||||
// Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()`
|
||||
let unpack_dir = tempfile::tempdir_in(snapshot_config.snapshot_path())?;
|
||||
let unpack_dir = tempfile::tempdir_in(&snapshot_config.snapshot_path)?;
|
||||
untar_snapshot_in(&snapshot_tar, &unpack_dir)?;
|
||||
|
||||
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
|
||||
@ -199,14 +212,19 @@ pub fn bank_from_archive<P: AsRef<Path>>(
|
||||
let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
|
||||
let bank = rebuild_bank_from_snapshots(account_paths, &snapshot_paths, unpacked_accounts_dir)?;
|
||||
|
||||
// Move the unpacked snapshots into `snapshot_config.snapshot_path()`
|
||||
let dir_files = fs::read_dir(unpacked_snapshots_dir).expect("Invalid snapshot path");
|
||||
// Move the unpacked snapshots into `snapshot_config.snapshot_path`
|
||||
let dir_files = fs::read_dir(&unpacked_snapshots_dir).unwrap_or_else(|err| {
|
||||
panic!(
|
||||
"Invalid snapshot path {:?}: {}",
|
||||
unpacked_snapshots_dir, err
|
||||
)
|
||||
});
|
||||
let paths: Vec<PathBuf> = dir_files
|
||||
.filter_map(|entry| entry.ok().map(|e| e.path()))
|
||||
.collect();
|
||||
let mut copy_options = CopyOptions::new();
|
||||
copy_options.overwrite = true;
|
||||
fs_extra::move_items(&paths, snapshot_config.snapshot_path(), ©_options)?;
|
||||
fs_extra::move_items(&paths, &snapshot_config.snapshot_path, ©_options)?;
|
||||
|
||||
Ok(bank)
|
||||
}
|
||||
@ -238,7 +256,7 @@ where
|
||||
let last_root_paths = snapshot_paths
|
||||
.last()
|
||||
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
|
||||
info!("Load from {:?}", &last_root_paths.snapshot_file_path);
|
||||
info!("Loading from {:?}", &last_root_paths.snapshot_file_path);
|
||||
let file = File::open(&last_root_paths.snapshot_file_path)?;
|
||||
let mut stream = BufReader::new(file);
|
||||
let bank: Bank = deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()))?;
|
||||
|
@ -375,22 +375,20 @@ fn get_bank_forks(
|
||||
dev_halt_at_slot: Option<Slot>,
|
||||
) -> (BankForks, Vec<BankForksInfo>, LeaderScheduleCache) {
|
||||
let (mut bank_forks, bank_forks_info, leader_schedule_cache) = {
|
||||
let mut result = None;
|
||||
if snapshot_config.is_some() {
|
||||
let snapshot_config = snapshot_config.as_ref().unwrap();
|
||||
|
||||
// Blow away any remnants in the snapshots directory
|
||||
let _ = fs::remove_dir_all(snapshot_config.snapshot_path());
|
||||
fs::create_dir_all(&snapshot_config.snapshot_path())
|
||||
if let Some(snapshot_config) = snapshot_config.as_ref() {
|
||||
info!(
|
||||
"Initializing snapshot path: {:?}",
|
||||
snapshot_config.snapshot_path
|
||||
);
|
||||
let _ = fs::remove_dir_all(&snapshot_config.snapshot_path);
|
||||
fs::create_dir_all(&snapshot_config.snapshot_path)
|
||||
.expect("Couldn't create snapshot directory");
|
||||
|
||||
// Get the path to the tar
|
||||
let tar = snapshot_utils::get_snapshot_tar_path(
|
||||
&snapshot_config.snapshot_package_output_path(),
|
||||
&snapshot_config.snapshot_package_output_path,
|
||||
);
|
||||
|
||||
// Check that the snapshot tar exists, try to load the snapshot if it does
|
||||
if tar.exists() {
|
||||
info!("Loading snapshot package: {:?}", tar);
|
||||
// Fail hard here if snapshot fails to load, don't silently continue
|
||||
let deserialized_bank = snapshot_utils::bank_from_archive(
|
||||
account_paths
|
||||
@ -401,33 +399,29 @@ fn get_bank_forks(
|
||||
)
|
||||
.expect("Load from snapshot failed");
|
||||
|
||||
result = Some(
|
||||
blocktree_processor::process_blocktree_from_root(
|
||||
blocktree,
|
||||
Arc::new(deserialized_bank),
|
||||
verify_ledger,
|
||||
dev_halt_at_slot,
|
||||
)
|
||||
.expect("processing blocktree after loading snapshot failed"),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// If a snapshot doesn't exist
|
||||
if result.is_none() {
|
||||
result = Some(
|
||||
blocktree_processor::process_blocktree(
|
||||
&genesis_block,
|
||||
&blocktree,
|
||||
account_paths,
|
||||
return blocktree_processor::process_blocktree_from_root(
|
||||
blocktree,
|
||||
Arc::new(deserialized_bank),
|
||||
verify_ledger,
|
||||
dev_halt_at_slot,
|
||||
)
|
||||
.expect("process_blocktree failed"),
|
||||
);
|
||||
.expect("processing blocktree after loading snapshot failed");
|
||||
} else {
|
||||
info!("Snapshot package does not exist: {:?}", tar);
|
||||
}
|
||||
} else {
|
||||
info!("Snapshots disabled");
|
||||
}
|
||||
|
||||
result.unwrap()
|
||||
info!("Processing ledger from genesis");
|
||||
blocktree_processor::process_blocktree(
|
||||
&genesis_block,
|
||||
&blocktree,
|
||||
account_paths,
|
||||
verify_ledger,
|
||||
dev_halt_at_slot,
|
||||
)
|
||||
.expect("process_blocktree failed")
|
||||
};
|
||||
|
||||
if snapshot_config.is_some() {
|
||||
|
Reference in New Issue
Block a user