Move snapshot archive generation out of the SnapshotPackagerService
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -3631,6 +3631,7 @@ dependencies = [
|
||||
"solana-sdk 0.22.4",
|
||||
"solana-stake-program 0.22.4",
|
||||
"solana-vote-program 0.22.4",
|
||||
"symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"sys-info 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -1,16 +1,7 @@
|
||||
use crate::result::{Error, Result};
|
||||
use bincode::serialize_into;
|
||||
use solana_ledger::{
|
||||
snapshot_package::{SnapshotPackage, SnapshotPackageReceiver},
|
||||
snapshot_utils::{self, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR, TAR_VERSION_FILE},
|
||||
snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package,
|
||||
};
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::datapoint_info;
|
||||
use solana_runtime::status_cache::SlotDelta;
|
||||
use solana_sdk::transaction;
|
||||
use std::{
|
||||
fs::{self, File},
|
||||
io::{BufWriter, Error as IOError, ErrorKind},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::RecvTimeoutError,
|
||||
@ -19,8 +10,6 @@ use std::{
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
use symlink;
|
||||
use tempfile::TempDir;
|
||||
|
||||
pub struct SnapshotPackagerService {
|
||||
t_snapshot_packager: JoinHandle<()>,
|
||||
@ -35,12 +24,19 @@ impl SnapshotPackagerService {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
if let Err(e) = Self::run(&snapshot_package_receiver) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => info!("Error from package_snapshots: {:?}", e),
|
||||
|
||||
match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) {
|
||||
Ok(mut snapshot_package) => {
|
||||
// Only package the latest
|
||||
while let Ok(new_snapshot_package) = snapshot_package_receiver.try_recv() {
|
||||
snapshot_package = new_snapshot_package;
|
||||
}
|
||||
if let Err(err) = archive_snapshot_package(&snapshot_package) {
|
||||
warn!("Failed to create snapshot archive: {}", err);
|
||||
}
|
||||
}
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
Err(RecvTimeoutError::Timeout) => (),
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
@ -49,155 +45,6 @@ impl SnapshotPackagerService {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn package_snapshots(snapshot_package: &SnapshotPackage) -> Result<()> {
|
||||
info!(
|
||||
"Generating snapshot tarball for root {}",
|
||||
snapshot_package.root
|
||||
);
|
||||
|
||||
Self::serialize_status_cache(
|
||||
&snapshot_package.slot_deltas,
|
||||
&snapshot_package.snapshot_links,
|
||||
)?;
|
||||
|
||||
let mut timer = Measure::start("snapshot_package-package_snapshots");
|
||||
let tar_dir = snapshot_package
|
||||
.tar_output_file
|
||||
.parent()
|
||||
.expect("Tar output path is invalid");
|
||||
|
||||
fs::create_dir_all(tar_dir)?;
|
||||
|
||||
// Create the staging directories
|
||||
let staging_dir = TempDir::new()?;
|
||||
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);
|
||||
let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR);
|
||||
let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE);
|
||||
fs::create_dir_all(&staging_accounts_dir)?;
|
||||
|
||||
// Add the snapshots to the staging directory
|
||||
symlink::symlink_dir(
|
||||
snapshot_package.snapshot_links.path(),
|
||||
&staging_snapshots_dir,
|
||||
)?;
|
||||
|
||||
// Add the AppendVecs into the compressible list
|
||||
for storage in &snapshot_package.storage_entries {
|
||||
storage.flush()?;
|
||||
let storage_path = storage.get_path();
|
||||
let output_path = staging_accounts_dir.join(
|
||||
storage_path
|
||||
.file_name()
|
||||
.expect("Invalid AppendVec file path"),
|
||||
);
|
||||
|
||||
// `storage_path` - The file path where the AppendVec itself is located
|
||||
// `output_path` - The directory where the AppendVec will be placed in the staging directory.
|
||||
let storage_path =
|
||||
fs::canonicalize(storage_path).expect("Could not get absolute path for accounts");
|
||||
symlink::symlink_dir(storage_path, &output_path)?;
|
||||
if !output_path.is_file() {
|
||||
return Err(Self::get_io_error(
|
||||
"Error trying to generate snapshot archive: storage path symlink is invalid",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Write version file
|
||||
{
|
||||
use std::io::Write;
|
||||
let snapshot_version = format!("{}\n", env!("CARGO_PKG_VERSION"));
|
||||
let mut f = std::fs::File::create(staging_version_file)?;
|
||||
//f.write_all(&snapshot_version.to_string().into_bytes())?;
|
||||
f.write_all(&snapshot_version.into_bytes())?;
|
||||
}
|
||||
|
||||
// Tar the staging directory into the archive at `archive_path`
|
||||
let archive_path = tar_dir.join("new_state.tar.bz2");
|
||||
let args = vec![
|
||||
"jcfhS",
|
||||
archive_path.to_str().unwrap(),
|
||||
"-C",
|
||||
staging_dir.path().to_str().unwrap(),
|
||||
TAR_ACCOUNTS_DIR,
|
||||
TAR_SNAPSHOTS_DIR,
|
||||
TAR_VERSION_FILE,
|
||||
];
|
||||
|
||||
let output = std::process::Command::new("tar").args(&args).output()?;
|
||||
if !output.status.success() {
|
||||
warn!("tar command failed with exit code: {}", output.status);
|
||||
use std::str::from_utf8;
|
||||
info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?"));
|
||||
info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?"));
|
||||
|
||||
return Err(Self::get_io_error(&format!(
|
||||
"Error trying to generate snapshot archive: {}",
|
||||
output.status
|
||||
)));
|
||||
}
|
||||
|
||||
// Once everything is successful, overwrite the previous tarball so that other validators
|
||||
// can fetch this newly packaged snapshot
|
||||
let metadata = fs::metadata(&archive_path)?;
|
||||
fs::rename(&archive_path, &snapshot_package.tar_output_file)?;
|
||||
|
||||
timer.stop();
|
||||
info!(
|
||||
"Successfully created tarball. slot: {}, elapsed ms: {}, size={}",
|
||||
snapshot_package.root,
|
||||
timer.as_ms(),
|
||||
metadata.len()
|
||||
);
|
||||
datapoint_info!(
|
||||
"snapshot-package",
|
||||
("slot", snapshot_package.root, i64),
|
||||
("duration_ms", timer.as_ms(), i64),
|
||||
("size", metadata.len(), i64)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> {
|
||||
let mut snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||
// Only package the latest
|
||||
while let Ok(new_snapshot_package) = snapshot_receiver.try_recv() {
|
||||
snapshot_package = new_snapshot_package;
|
||||
}
|
||||
Self::package_snapshots(&snapshot_package)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_io_error(error: &str) -> Error {
|
||||
warn!("Snapshot Packaging Error: {:?}", error);
|
||||
Error::IO(IOError::new(ErrorKind::Other, error))
|
||||
}
|
||||
|
||||
fn serialize_status_cache(
|
||||
slot_deltas: &[SlotDelta<transaction::Result<()>>],
|
||||
snapshot_links: &TempDir,
|
||||
) -> Result<()> {
|
||||
// the status cache is stored as snapshot_path/status_cache
|
||||
let snapshot_status_cache_file_path = snapshot_links
|
||||
.path()
|
||||
.join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME);
|
||||
|
||||
let status_cache = File::create(&snapshot_status_cache_file_path)?;
|
||||
// status cache writer
|
||||
let mut status_cache_stream = BufWriter::new(status_cache);
|
||||
|
||||
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
|
||||
// write the status cache
|
||||
serialize_into(&mut status_cache_stream, slot_deltas)
|
||||
.map_err(|_| Self::get_io_error("serialize status cache error"))?;
|
||||
status_cache_serialize.stop();
|
||||
inc_new_counter_info!(
|
||||
"serialize-status-cache-ms",
|
||||
status_cache_serialize.as_ms() as usize
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
self.t_snapshot_packager.join()
|
||||
}
|
||||
@ -206,11 +53,13 @@ impl SnapshotPackagerService {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use solana_ledger::snapshot_utils;
|
||||
use solana_runtime::accounts_db::AccountStorageEntry;
|
||||
use bincode::serialize_into;
|
||||
use solana_ledger::{snapshot_package::SnapshotPackage, snapshot_utils};
|
||||
use solana_runtime::{accounts_db::AccountStorageEntry, status_cache::SlotDelta};
|
||||
use solana_sdk::transaction;
|
||||
use std::{
|
||||
fs::{remove_dir_all, OpenOptions},
|
||||
io::Write,
|
||||
fs::{self, remove_dir_all, File, OpenOptions},
|
||||
io::{BufWriter, Write},
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tempfile::TempDir;
|
||||
@ -278,7 +127,8 @@ mod tests {
|
||||
}
|
||||
|
||||
// Create a packageable snapshot
|
||||
let output_tar_path = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path);
|
||||
let output_tar_path =
|
||||
snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path);
|
||||
let snapshot_package = SnapshotPackage::new(
|
||||
5,
|
||||
vec![],
|
||||
@ -288,7 +138,7 @@ mod tests {
|
||||
);
|
||||
|
||||
// Make tarball from packageable snapshot
|
||||
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
|
||||
snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap();
|
||||
|
||||
// before we compare, stick an empty status_cache in this dir so that the package comparision works
|
||||
// This is needed since the status_cache is added by the packager and is not collected from
|
||||
@ -299,7 +149,7 @@ mod tests {
|
||||
serialize_into(&mut status_cache_stream, &slot_deltas).unwrap();
|
||||
status_cache_stream.flush().unwrap();
|
||||
|
||||
// Check tarball is correct
|
||||
snapshot_utils::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir);
|
||||
// Check archive is correct
|
||||
snapshot_utils::verify_snapshot_archive(output_tar_path, snapshots_dir, accounts_dir);
|
||||
}
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ mod tests {
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.snapshot_path,
|
||||
snapshot_utils::get_snapshot_tar_path(snapshot_package_output_path),
|
||||
snapshot_utils::get_snapshot_archive_path(snapshot_package_output_path),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -143,12 +143,15 @@ mod tests {
|
||||
slot_snapshot_paths
|
||||
.last()
|
||||
.expect("no snapshots found in path"),
|
||||
snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path),
|
||||
snapshot_utils::get_snapshot_archive_path(
|
||||
&snapshot_config.snapshot_package_output_path,
|
||||
),
|
||||
&snapshot_config.snapshot_path,
|
||||
&last_bank.src.roots(),
|
||||
)
|
||||
.unwrap();
|
||||
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
|
||||
|
||||
snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap();
|
||||
|
||||
restore_from_snapshot(bank_forks, vec![accounts_dir.path().to_path_buf()]);
|
||||
}
|
||||
@ -324,7 +327,7 @@ mod tests {
|
||||
serialize_into(&mut status_cache_stream, &slot_deltas).unwrap();
|
||||
status_cache_stream.flush().unwrap();
|
||||
|
||||
snapshot_utils::verify_snapshot_tar(
|
||||
snapshot_utils::verify_snapshot_archive(
|
||||
saved_tar,
|
||||
saved_snapshots_dir.path(),
|
||||
saved_accounts_dir
|
||||
|
@ -43,6 +43,7 @@ solana-sdk = { path = "../sdk", version = "0.22.4" }
|
||||
solana-stake-program = { path = "../programs/stake", version = "0.22.4" }
|
||||
solana-vote-program = { path = "../programs/vote", version = "0.22.4" }
|
||||
sys-info = "0.5.8"
|
||||
symlink = "0.1.0"
|
||||
tar = "0.4.26"
|
||||
thiserror = "1.0"
|
||||
tempfile = "3.1.0"
|
||||
|
@ -192,7 +192,7 @@ impl BankForks {
|
||||
root,
|
||||
&root_bank.src.roots(),
|
||||
snapshot_package_sender.as_ref().unwrap(),
|
||||
snapshot_utils::get_snapshot_tar_path(&config.snapshot_package_output_path),
|
||||
snapshot_utils::get_snapshot_archive_path(&config.snapshot_package_output_path),
|
||||
);
|
||||
if r.is_err() {
|
||||
warn!("Error generating snapshot for bank: {}, err: {:?}", root, r);
|
||||
|
@ -25,8 +25,9 @@ pub fn load(
|
||||
fs::create_dir_all(&snapshot_config.snapshot_path)
|
||||
.expect("Couldn't create snapshot directory");
|
||||
|
||||
let tar =
|
||||
snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path);
|
||||
let tar = snapshot_utils::get_snapshot_archive_path(
|
||||
&snapshot_config.snapshot_package_output_path,
|
||||
);
|
||||
if tar.exists() {
|
||||
info!("Loading snapshot package: {:?}", tar);
|
||||
// Fail hard here if snapshot fails to load, don't silently continue
|
||||
|
@ -11,12 +11,13 @@ use solana_runtime::{
|
||||
use solana_sdk::{clock::Slot, transaction};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
fs,
|
||||
fs::File,
|
||||
io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read},
|
||||
fs::{self, File},
|
||||
io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Write},
|
||||
path::{Path, PathBuf},
|
||||
process::ExitStatus,
|
||||
};
|
||||
use tar::Archive;
|
||||
use tempfile::TempDir;
|
||||
use thiserror::Error;
|
||||
|
||||
pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
|
||||
@ -40,6 +41,12 @@ pub enum SnapshotError {
|
||||
|
||||
#[error("file system error")]
|
||||
FsExtra(#[from] fs_extra::error::Error),
|
||||
|
||||
#[error("archive generation failure {0}")]
|
||||
ArchiveGenerationFailure(ExitStatus),
|
||||
|
||||
#[error("storage path symlink is invalid")]
|
||||
StoragePathSymlinkInvalid,
|
||||
}
|
||||
pub type Result<T> = std::result::Result<T, SnapshotError>;
|
||||
|
||||
@ -105,6 +112,138 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
Ok(package)
|
||||
}
|
||||
|
||||
pub fn archive_snapshot_package(snapshot_package: &SnapshotPackage) -> Result<()> {
|
||||
info!(
|
||||
"Generating snapshot tarball for root {}",
|
||||
snapshot_package.root
|
||||
);
|
||||
|
||||
fn serialize_status_cache(
|
||||
slot_deltas: &[SlotDelta<transaction::Result<()>>],
|
||||
snapshot_links: &TempDir,
|
||||
) -> Result<()> {
|
||||
// the status cache is stored as snapshot_path/status_cache
|
||||
let snapshot_status_cache_file_path =
|
||||
snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
|
||||
|
||||
let status_cache = File::create(&snapshot_status_cache_file_path)?;
|
||||
// status cache writer
|
||||
let mut status_cache_stream = BufWriter::new(status_cache);
|
||||
|
||||
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
|
||||
// write the status cache
|
||||
serialize_into(&mut status_cache_stream, slot_deltas)
|
||||
.map_err(|_| get_io_error("serialize status cache error"))?;
|
||||
status_cache_serialize.stop();
|
||||
inc_new_counter_info!(
|
||||
"serialize-status-cache-ms",
|
||||
status_cache_serialize.as_ms() as usize
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
serialize_status_cache(
|
||||
&snapshot_package.slot_deltas,
|
||||
&snapshot_package.snapshot_links,
|
||||
)?;
|
||||
|
||||
let mut timer = Measure::start("snapshot_package-package_snapshots");
|
||||
let tar_dir = snapshot_package
|
||||
.tar_output_file
|
||||
.parent()
|
||||
.expect("Tar output path is invalid");
|
||||
|
||||
fs::create_dir_all(tar_dir)?;
|
||||
|
||||
// Create the staging directories
|
||||
let staging_dir = TempDir::new()?;
|
||||
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);
|
||||
let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR);
|
||||
let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE);
|
||||
fs::create_dir_all(&staging_accounts_dir)?;
|
||||
|
||||
// Add the snapshots to the staging directory
|
||||
symlink::symlink_dir(
|
||||
snapshot_package.snapshot_links.path(),
|
||||
&staging_snapshots_dir,
|
||||
)?;
|
||||
|
||||
// Add the AppendVecs into the compressible list
|
||||
for storage in &snapshot_package.storage_entries {
|
||||
storage.flush()?;
|
||||
let storage_path = storage.get_path();
|
||||
let output_path = staging_accounts_dir.join(
|
||||
storage_path
|
||||
.file_name()
|
||||
.expect("Invalid AppendVec file path"),
|
||||
);
|
||||
|
||||
// `storage_path` - The file path where the AppendVec itself is located
|
||||
// `output_path` - The directory where the AppendVec will be placed in the staging directory.
|
||||
let storage_path =
|
||||
fs::canonicalize(storage_path).expect("Could not get absolute path for accounts");
|
||||
symlink::symlink_dir(storage_path, &output_path)?;
|
||||
if !output_path.is_file() {
|
||||
return Err(get_io_error(
|
||||
"Error trying to generate snapshot archive: storage path symlink is invalid",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Write version file
|
||||
{
|
||||
let snapshot_version = format!("{}\n", env!("CARGO_PKG_VERSION"));
|
||||
let mut f = std::fs::File::create(staging_version_file)?;
|
||||
//f.write_all(&snapshot_version.to_string().into_bytes())?;
|
||||
f.write_all(&snapshot_version.into_bytes())?;
|
||||
}
|
||||
|
||||
// Tar the staging directory into the archive at `archive_path`
|
||||
let archive_path = tar_dir.join("new_state.tar.bz2");
|
||||
let args = vec![
|
||||
"jcfhS",
|
||||
archive_path.to_str().unwrap(),
|
||||
"-C",
|
||||
staging_dir.path().to_str().unwrap(),
|
||||
TAR_ACCOUNTS_DIR,
|
||||
TAR_SNAPSHOTS_DIR,
|
||||
TAR_VERSION_FILE,
|
||||
];
|
||||
|
||||
let output = std::process::Command::new("tar").args(&args).output()?;
|
||||
if !output.status.success() {
|
||||
warn!("tar command failed with exit code: {}", output.status);
|
||||
use std::str::from_utf8;
|
||||
info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?"));
|
||||
info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?"));
|
||||
|
||||
return Err(get_io_error(&format!(
|
||||
"Error trying to generate snapshot archive: {}",
|
||||
output.status
|
||||
)));
|
||||
}
|
||||
|
||||
// Once everything is successful, overwrite the previous tarball so that other validators
|
||||
// can fetch this newly packaged snapshot
|
||||
let metadata = fs::metadata(&archive_path)?;
|
||||
fs::rename(&archive_path, &snapshot_package.tar_output_file)?;
|
||||
|
||||
timer.stop();
|
||||
info!(
|
||||
"Successfully created tarball. slot: {}, elapsed ms: {}, size={}",
|
||||
snapshot_package.root,
|
||||
timer.as_ms(),
|
||||
metadata.len()
|
||||
);
|
||||
datapoint_info!(
|
||||
"snapshot-package",
|
||||
("slot", snapshot_package.root, i64),
|
||||
("duration_ms", timer.as_ms(), i64),
|
||||
("size", metadata.len(), i64)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_snapshot_paths<P: AsRef<Path>>(snapshot_path: P) -> Vec<SlotSnapshotPaths>
|
||||
where
|
||||
P: std::fmt::Debug,
|
||||
@ -249,7 +388,7 @@ pub fn bank_from_archive<P: AsRef<Path>>(
|
||||
Ok(bank)
|
||||
}
|
||||
|
||||
pub fn get_snapshot_tar_path<P: AsRef<Path>>(snapshot_output_dir: P) -> PathBuf {
|
||||
pub fn get_snapshot_archive_path<P: AsRef<Path>>(snapshot_output_dir: P) -> PathBuf {
|
||||
snapshot_output_dir.as_ref().join("snapshot.tar.bz2")
|
||||
}
|
||||
|
||||
@ -337,8 +476,11 @@ fn get_io_error(error: &str) -> SnapshotError {
|
||||
SnapshotError::IO(IOError::new(ErrorKind::Other, error))
|
||||
}
|
||||
|
||||
pub fn verify_snapshot_tar<P, Q, R>(snapshot_tar: P, snapshots_to_verify: Q, storages_to_verify: R)
|
||||
where
|
||||
pub fn verify_snapshot_archive<P, Q, R>(
|
||||
snapshot_tar: P,
|
||||
snapshots_to_verify: Q,
|
||||
storages_to_verify: R,
|
||||
) where
|
||||
P: AsRef<Path>,
|
||||
Q: AsRef<Path>,
|
||||
R: AsRef<Path>,
|
||||
|
@ -651,12 +651,13 @@ fn test_snapshot_restart_tower() {
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.snapshot_package_output_path;
|
||||
let tar = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path);
|
||||
let tar = snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path);
|
||||
wait_for_next_snapshot(&cluster, &tar);
|
||||
|
||||
// Copy tar to validator's snapshot output directory
|
||||
let validator_tar_path =
|
||||
snapshot_utils::get_snapshot_tar_path(&validator_snapshot_test_config.snapshot_output_path);
|
||||
let validator_tar_path = snapshot_utils::get_snapshot_archive_path(
|
||||
&validator_snapshot_test_config.snapshot_output_path,
|
||||
);
|
||||
fs::hard_link(tar, &validator_tar_path).unwrap();
|
||||
|
||||
// Restart validator from snapshot, the validator's tower state in this snapshot
|
||||
@ -702,7 +703,7 @@ fn test_snapshots_blockstore_floor() {
|
||||
|
||||
trace!("Waiting for snapshot tar to be generated with slot",);
|
||||
|
||||
let tar = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path);
|
||||
let tar = snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path);
|
||||
loop {
|
||||
if tar.exists() {
|
||||
trace!("snapshot tar exists");
|
||||
@ -712,8 +713,9 @@ fn test_snapshots_blockstore_floor() {
|
||||
}
|
||||
|
||||
// Copy tar to validator's snapshot output directory
|
||||
let validator_tar_path =
|
||||
snapshot_utils::get_snapshot_tar_path(&validator_snapshot_test_config.snapshot_output_path);
|
||||
let validator_tar_path = snapshot_utils::get_snapshot_archive_path(
|
||||
&validator_snapshot_test_config.snapshot_output_path,
|
||||
);
|
||||
fs::hard_link(tar, &validator_tar_path).unwrap();
|
||||
let slot_floor = snapshot_utils::bank_slot_from_archive(&validator_tar_path).unwrap();
|
||||
|
||||
@ -803,7 +805,7 @@ fn test_snapshots_restart_validity() {
|
||||
|
||||
expected_balances.extend(new_balances);
|
||||
|
||||
let tar = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path);
|
||||
let tar = snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path);
|
||||
wait_for_next_snapshot(&cluster, &tar);
|
||||
|
||||
// Create new account paths since validator exit is not guaranteed to cleanup RPC threads,
|
||||
|
@ -323,7 +323,8 @@ fn download_ledger(
|
||||
download_tar_bz2(rpc_addr, "genesis.tar.bz2", ledger_path, false)?;
|
||||
|
||||
if !no_snapshot_fetch {
|
||||
let snapshot_package = solana_ledger::snapshot_utils::get_snapshot_tar_path(ledger_path);
|
||||
let snapshot_package =
|
||||
solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path);
|
||||
if snapshot_package.exists() {
|
||||
fs::remove_file(&snapshot_package)
|
||||
.map_err(|err| format!("error removing {:?}: {}", snapshot_package, err))?;
|
||||
|
Reference in New Issue
Block a user