@ -282,7 +282,7 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::snapshot_package::SnapshotPackagerService;
|
use crate::snapshot_packager_service::SnapshotPackagerService;
|
||||||
use bincode::serialize_into;
|
use bincode::serialize_into;
|
||||||
use fs_extra::dir::CopyOptions;
|
use fs_extra::dir::CopyOptions;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
@ -376,7 +376,11 @@ mod tests {
|
|||||||
|
|
||||||
let deserialized_bank = snapshot_utils::bank_from_archive(
|
let deserialized_bank = snapshot_utils::bank_from_archive(
|
||||||
account_paths,
|
account_paths,
|
||||||
old_bank_forks.snapshot_config.as_ref().unwrap(),
|
&old_bank_forks
|
||||||
|
.snapshot_config
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.snapshot_path,
|
||||||
snapshot_utils::get_snapshot_tar_path(snapshot_package_output_path),
|
snapshot_utils::get_snapshot_tar_path(snapshot_package_output_path),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -55,6 +55,7 @@ pub mod service;
|
|||||||
pub mod sigverify;
|
pub mod sigverify;
|
||||||
pub mod sigverify_stage;
|
pub mod sigverify_stage;
|
||||||
pub mod snapshot_package;
|
pub mod snapshot_package;
|
||||||
|
pub mod snapshot_packager_service;
|
||||||
pub mod snapshot_utils;
|
pub mod snapshot_utils;
|
||||||
pub mod storage_stage;
|
pub mod storage_stage;
|
||||||
pub mod streamer;
|
pub mod streamer;
|
||||||
|
@ -1,36 +1,20 @@
|
|||||||
use crate::result::{Error, Result};
|
|
||||||
use crate::service::Service;
|
|
||||||
use crate::snapshot_utils;
|
|
||||||
use bincode::serialize_into;
|
|
||||||
use solana_measure::measure::Measure;
|
|
||||||
use solana_metrics::datapoint_info;
|
|
||||||
use solana_runtime::accounts_db::AccountStorageEntry;
|
use solana_runtime::accounts_db::AccountStorageEntry;
|
||||||
use solana_runtime::status_cache::SlotDelta;
|
use solana_runtime::status_cache::SlotDelta;
|
||||||
use solana_sdk::transaction::Result as TransactionResult;
|
use solana_sdk::transaction::Result as TransactionResult;
|
||||||
use std::fs;
|
|
||||||
use std::fs::File;
|
|
||||||
use std::io::{BufWriter, Error as IOError, ErrorKind};
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::mpsc::{Receiver, Sender};
|
||||||
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
|
||||||
use std::time::Duration;
|
|
||||||
use symlink;
|
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
pub type SnapshotPackageSender = Sender<SnapshotPackage>;
|
pub type SnapshotPackageSender = Sender<SnapshotPackage>;
|
||||||
pub type SnapshotPackageReceiver = Receiver<SnapshotPackage>;
|
pub type SnapshotPackageReceiver = Receiver<SnapshotPackage>;
|
||||||
|
|
||||||
pub const TAR_SNAPSHOTS_DIR: &str = "snapshots";
|
|
||||||
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
|
|
||||||
|
|
||||||
pub struct SnapshotPackage {
|
pub struct SnapshotPackage {
|
||||||
root: u64,
|
pub root: u64,
|
||||||
slot_deltas: Vec<SlotDelta<TransactionResult<()>>>,
|
pub slot_deltas: Vec<SlotDelta<TransactionResult<()>>>,
|
||||||
snapshot_links: TempDir,
|
pub snapshot_links: TempDir,
|
||||||
storage_entries: Vec<Arc<AccountStorageEntry>>,
|
pub storage_entries: Vec<Arc<AccountStorageEntry>>,
|
||||||
tar_output_file: PathBuf,
|
pub tar_output_file: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SnapshotPackage {
|
impl SnapshotPackage {
|
||||||
@ -50,243 +34,3 @@ impl SnapshotPackage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SnapshotPackagerService {
|
|
||||||
t_snapshot_packager: JoinHandle<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SnapshotPackagerService {
|
|
||||||
pub fn new(snapshot_package_receiver: SnapshotPackageReceiver, exit: &Arc<AtomicBool>) -> Self {
|
|
||||||
let exit = exit.clone();
|
|
||||||
let t_snapshot_packager = Builder::new()
|
|
||||||
.name("solana-snapshot-packager".to_string())
|
|
||||||
.spawn(move || loop {
|
|
||||||
if exit.load(Ordering::Relaxed) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if let Err(e) = Self::run(&snapshot_package_receiver) {
|
|
||||||
match e {
|
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
|
||||||
_ => info!("Error from package_snapshots: {:?}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
Self {
|
|
||||||
t_snapshot_packager,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn package_snapshots(snapshot_package: &SnapshotPackage) -> Result<()> {
|
|
||||||
info!(
|
|
||||||
"Generating snapshot tarball for root {}",
|
|
||||||
snapshot_package.root
|
|
||||||
);
|
|
||||||
|
|
||||||
Self::serialize_status_cache(
|
|
||||||
&snapshot_package.slot_deltas,
|
|
||||||
&snapshot_package.snapshot_links,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let mut timer = Measure::start("snapshot_package-package_snapshots");
|
|
||||||
let tar_dir = snapshot_package
|
|
||||||
.tar_output_file
|
|
||||||
.parent()
|
|
||||||
.expect("Tar output path is invalid");
|
|
||||||
|
|
||||||
fs::create_dir_all(tar_dir)?;
|
|
||||||
|
|
||||||
// Create the staging directories
|
|
||||||
let staging_dir = TempDir::new()?;
|
|
||||||
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);
|
|
||||||
let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR);
|
|
||||||
fs::create_dir_all(&staging_accounts_dir)?;
|
|
||||||
|
|
||||||
// Add the snapshots to the staging directory
|
|
||||||
symlink::symlink_dir(
|
|
||||||
snapshot_package.snapshot_links.path(),
|
|
||||||
&staging_snapshots_dir,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// Add the AppendVecs into the compressible list
|
|
||||||
for storage in &snapshot_package.storage_entries {
|
|
||||||
storage.flush()?;
|
|
||||||
let storage_path = storage.get_path();
|
|
||||||
let output_path = staging_accounts_dir.join(
|
|
||||||
storage_path
|
|
||||||
.file_name()
|
|
||||||
.expect("Invalid AppendVec file path"),
|
|
||||||
);
|
|
||||||
|
|
||||||
// `storage_path` - The file path where the AppendVec itself is located
|
|
||||||
// `output_path` - The directory where the AppendVec will be placed in the staging directory.
|
|
||||||
symlink::symlink_dir(storage_path, output_path)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tar the staging directory into the archive at `archive_path`
|
|
||||||
let archive_path = tar_dir.join("new_state.tar.bz2");
|
|
||||||
let mut args = vec!["jcfhS"];
|
|
||||||
args.push(archive_path.to_str().unwrap());
|
|
||||||
args.push("-C");
|
|
||||||
args.push(staging_dir.path().to_str().unwrap());
|
|
||||||
args.push(TAR_ACCOUNTS_DIR);
|
|
||||||
args.push(TAR_SNAPSHOTS_DIR);
|
|
||||||
|
|
||||||
let output = std::process::Command::new("tar").args(&args).output()?;
|
|
||||||
if !output.status.success() {
|
|
||||||
warn!("tar command failed with exit code: {}", output.status);
|
|
||||||
use std::str::from_utf8;
|
|
||||||
info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?"));
|
|
||||||
info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?"));
|
|
||||||
|
|
||||||
return Err(Self::get_io_error(&format!(
|
|
||||||
"Error trying to generate snapshot archive: {}",
|
|
||||||
output.status
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Once everything is successful, overwrite the previous tarball so that other validators
|
|
||||||
// can fetch this newly packaged snapshot
|
|
||||||
let _ = fs::remove_file(&snapshot_package.tar_output_file);
|
|
||||||
let metadata = fs::metadata(&archive_path)?;
|
|
||||||
fs::rename(&archive_path, &snapshot_package.tar_output_file)?;
|
|
||||||
|
|
||||||
timer.stop();
|
|
||||||
info!(
|
|
||||||
"Successfully created tarball. slot: {}, elapsed ms: {}, size={}",
|
|
||||||
snapshot_package.root,
|
|
||||||
timer.as_ms(),
|
|
||||||
metadata.len()
|
|
||||||
);
|
|
||||||
datapoint_info!(
|
|
||||||
"snapshot-package",
|
|
||||||
("slot", snapshot_package.root, i64),
|
|
||||||
("duration_ms", timer.as_ms(), i64),
|
|
||||||
("size", metadata.len(), i64)
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> {
|
|
||||||
let mut snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?;
|
|
||||||
// Only package the latest
|
|
||||||
while let Ok(new_snapshot_package) = snapshot_receiver.try_recv() {
|
|
||||||
snapshot_package = new_snapshot_package;
|
|
||||||
}
|
|
||||||
Self::package_snapshots(&snapshot_package)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_io_error(error: &str) -> Error {
|
|
||||||
warn!("Snapshot Packaging Error: {:?}", error);
|
|
||||||
Error::IO(IOError::new(ErrorKind::Other, error))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_status_cache(
|
|
||||||
slot_deltas: &[SlotDelta<TransactionResult<()>>],
|
|
||||||
snapshot_links: &TempDir,
|
|
||||||
) -> Result<()> {
|
|
||||||
// the status cache is stored as snapshot_path/status_cache
|
|
||||||
let snapshot_status_cache_file_path = snapshot_links
|
|
||||||
.path()
|
|
||||||
.join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME);
|
|
||||||
|
|
||||||
let status_cache = File::create(&snapshot_status_cache_file_path)?;
|
|
||||||
// status cache writer
|
|
||||||
let mut status_cache_stream = BufWriter::new(status_cache);
|
|
||||||
|
|
||||||
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
|
|
||||||
// write the status cache
|
|
||||||
serialize_into(&mut status_cache_stream, slot_deltas)
|
|
||||||
.map_err(|_| Self::get_io_error("serialize status cache error"))?;
|
|
||||||
status_cache_serialize.stop();
|
|
||||||
inc_new_counter_info!(
|
|
||||||
"serialize-status-cache-ms",
|
|
||||||
status_cache_serialize.as_ms() as usize
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Service for SnapshotPackagerService {
|
|
||||||
type JoinReturnType = ();
|
|
||||||
|
|
||||||
fn join(self) -> thread::Result<()> {
|
|
||||||
self.t_snapshot_packager.join()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use crate::snapshot_utils;
|
|
||||||
use std::fs::OpenOptions;
|
|
||||||
use std::io::Write;
|
|
||||||
use tempfile::TempDir;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_package_snapshots() {
|
|
||||||
// Create temporary placeholder directory for all test files
|
|
||||||
let temp_dir = TempDir::new().unwrap();
|
|
||||||
let accounts_dir = temp_dir.path().join("accounts");
|
|
||||||
let snapshots_dir = temp_dir.path().join("snapshots");
|
|
||||||
let snapshot_package_output_path = temp_dir.path().join("snapshots_output");
|
|
||||||
fs::create_dir_all(&snapshot_package_output_path).unwrap();
|
|
||||||
|
|
||||||
// Create some storage entries
|
|
||||||
let storage_entries: Vec<_> = (0..5)
|
|
||||||
.map(|i| Arc::new(AccountStorageEntry::new(&accounts_dir, 0, i, 10)))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// Create some fake snapshot
|
|
||||||
fs::create_dir_all(&snapshots_dir).unwrap();
|
|
||||||
let snapshots_paths: Vec<_> = (0..5)
|
|
||||||
.map(|i| {
|
|
||||||
let fake_snapshot_path = snapshots_dir.join(format!("fake_snapshot_{}", i));
|
|
||||||
let mut fake_snapshot_file = OpenOptions::new()
|
|
||||||
.read(true)
|
|
||||||
.write(true)
|
|
||||||
.create(true)
|
|
||||||
.open(&fake_snapshot_path)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
fake_snapshot_file.write_all(b"Hello, world!").unwrap();
|
|
||||||
fake_snapshot_path
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// Create directory of hard links for snapshots
|
|
||||||
let link_snapshots_dir = tempfile::tempdir_in(temp_dir.path()).unwrap();
|
|
||||||
for snapshots_path in snapshots_paths {
|
|
||||||
let snapshot_file_name = snapshots_path.file_name().unwrap();
|
|
||||||
let link_path = link_snapshots_dir.path().join(snapshot_file_name);
|
|
||||||
fs::hard_link(&snapshots_path, &link_path).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a packageable snapshot
|
|
||||||
let output_tar_path = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path);
|
|
||||||
let snapshot_package = SnapshotPackage::new(
|
|
||||||
5,
|
|
||||||
vec![],
|
|
||||||
link_snapshots_dir,
|
|
||||||
storage_entries.clone(),
|
|
||||||
output_tar_path.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Make tarball from packageable snapshot
|
|
||||||
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
|
|
||||||
|
|
||||||
// before we compare, stick an empty status_cache in this dir so that the package comparision works
|
|
||||||
// This is needed since the status_cache is added by the packager and is not collected from
|
|
||||||
// the source dir for snapshots
|
|
||||||
let slot_deltas: Vec<SlotDelta<TransactionResult<()>>> = vec![];
|
|
||||||
let dummy_status_cache = File::create(snapshots_dir.join("status_cache")).unwrap();
|
|
||||||
let mut status_cache_stream = BufWriter::new(dummy_status_cache);
|
|
||||||
serialize_into(&mut status_cache_stream, &slot_deltas).unwrap();
|
|
||||||
status_cache_stream.flush().unwrap();
|
|
||||||
|
|
||||||
// Check tarball is correct
|
|
||||||
snapshot_utils::tests::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
260
core/src/snapshot_packager_service.rs
Normal file
260
core/src/snapshot_packager_service.rs
Normal file
@ -0,0 +1,260 @@
|
|||||||
|
use crate::result::{Error, Result};
|
||||||
|
use crate::service::Service;
|
||||||
|
use crate::snapshot_package::{SnapshotPackage, SnapshotPackageReceiver};
|
||||||
|
use crate::snapshot_utils::{self, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
|
||||||
|
use bincode::serialize_into;
|
||||||
|
use solana_measure::measure::Measure;
|
||||||
|
use solana_metrics::datapoint_info;
|
||||||
|
use solana_runtime::status_cache::SlotDelta;
|
||||||
|
use solana_sdk::transaction::Result as TransactionResult;
|
||||||
|
use std::fs;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::{BufWriter, Error as IOError, ErrorKind};
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::RecvTimeoutError;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
|
use std::time::Duration;
|
||||||
|
use symlink;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
pub struct SnapshotPackagerService {
|
||||||
|
t_snapshot_packager: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SnapshotPackagerService {
|
||||||
|
pub fn new(snapshot_package_receiver: SnapshotPackageReceiver, exit: &Arc<AtomicBool>) -> Self {
|
||||||
|
let exit = exit.clone();
|
||||||
|
let t_snapshot_packager = Builder::new()
|
||||||
|
.name("solana-snapshot-packager".to_string())
|
||||||
|
.spawn(move || loop {
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if let Err(e) = Self::run(&snapshot_package_receiver) {
|
||||||
|
match e {
|
||||||
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
|
_ => info!("Error from package_snapshots: {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
Self {
|
||||||
|
t_snapshot_packager,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn package_snapshots(snapshot_package: &SnapshotPackage) -> Result<()> {
|
||||||
|
info!(
|
||||||
|
"Generating snapshot tarball for root {}",
|
||||||
|
snapshot_package.root
|
||||||
|
);
|
||||||
|
|
||||||
|
Self::serialize_status_cache(
|
||||||
|
&snapshot_package.slot_deltas,
|
||||||
|
&snapshot_package.snapshot_links,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let mut timer = Measure::start("snapshot_package-package_snapshots");
|
||||||
|
let tar_dir = snapshot_package
|
||||||
|
.tar_output_file
|
||||||
|
.parent()
|
||||||
|
.expect("Tar output path is invalid");
|
||||||
|
|
||||||
|
fs::create_dir_all(tar_dir)?;
|
||||||
|
|
||||||
|
// Create the staging directories
|
||||||
|
let staging_dir = TempDir::new()?;
|
||||||
|
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);
|
||||||
|
let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR);
|
||||||
|
fs::create_dir_all(&staging_accounts_dir)?;
|
||||||
|
|
||||||
|
// Add the snapshots to the staging directory
|
||||||
|
symlink::symlink_dir(
|
||||||
|
snapshot_package.snapshot_links.path(),
|
||||||
|
&staging_snapshots_dir,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// Add the AppendVecs into the compressible list
|
||||||
|
for storage in &snapshot_package.storage_entries {
|
||||||
|
storage.flush()?;
|
||||||
|
let storage_path = storage.get_path();
|
||||||
|
let output_path = staging_accounts_dir.join(
|
||||||
|
storage_path
|
||||||
|
.file_name()
|
||||||
|
.expect("Invalid AppendVec file path"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// `storage_path` - The file path where the AppendVec itself is located
|
||||||
|
// `output_path` - The directory where the AppendVec will be placed in the staging directory.
|
||||||
|
symlink::symlink_dir(storage_path, output_path)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tar the staging directory into the archive at `archive_path`
|
||||||
|
let archive_path = tar_dir.join("new_state.tar.bz2");
|
||||||
|
let mut args = vec!["jcfhS"];
|
||||||
|
args.push(archive_path.to_str().unwrap());
|
||||||
|
args.push("-C");
|
||||||
|
args.push(staging_dir.path().to_str().unwrap());
|
||||||
|
args.push(TAR_ACCOUNTS_DIR);
|
||||||
|
args.push(TAR_SNAPSHOTS_DIR);
|
||||||
|
|
||||||
|
let output = std::process::Command::new("tar").args(&args).output()?;
|
||||||
|
if !output.status.success() {
|
||||||
|
warn!("tar command failed with exit code: {}", output.status);
|
||||||
|
use std::str::from_utf8;
|
||||||
|
info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?"));
|
||||||
|
info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?"));
|
||||||
|
|
||||||
|
return Err(Self::get_io_error(&format!(
|
||||||
|
"Error trying to generate snapshot archive: {}",
|
||||||
|
output.status
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Once everything is successful, overwrite the previous tarball so that other validators
|
||||||
|
// can fetch this newly packaged snapshot
|
||||||
|
let _ = fs::remove_file(&snapshot_package.tar_output_file);
|
||||||
|
let metadata = fs::metadata(&archive_path)?;
|
||||||
|
fs::rename(&archive_path, &snapshot_package.tar_output_file)?;
|
||||||
|
|
||||||
|
timer.stop();
|
||||||
|
info!(
|
||||||
|
"Successfully created tarball. slot: {}, elapsed ms: {}, size={}",
|
||||||
|
snapshot_package.root,
|
||||||
|
timer.as_ms(),
|
||||||
|
metadata.len()
|
||||||
|
);
|
||||||
|
datapoint_info!(
|
||||||
|
"snapshot-package",
|
||||||
|
("slot", snapshot_package.root, i64),
|
||||||
|
("duration_ms", timer.as_ms(), i64),
|
||||||
|
("size", metadata.len(), i64)
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> {
|
||||||
|
let mut snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||||
|
// Only package the latest
|
||||||
|
while let Ok(new_snapshot_package) = snapshot_receiver.try_recv() {
|
||||||
|
snapshot_package = new_snapshot_package;
|
||||||
|
}
|
||||||
|
Self::package_snapshots(&snapshot_package)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_io_error(error: &str) -> Error {
|
||||||
|
warn!("Snapshot Packaging Error: {:?}", error);
|
||||||
|
Error::IO(IOError::new(ErrorKind::Other, error))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_status_cache(
|
||||||
|
slot_deltas: &[SlotDelta<TransactionResult<()>>],
|
||||||
|
snapshot_links: &TempDir,
|
||||||
|
) -> Result<()> {
|
||||||
|
// the status cache is stored as snapshot_path/status_cache
|
||||||
|
let snapshot_status_cache_file_path = snapshot_links
|
||||||
|
.path()
|
||||||
|
.join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME);
|
||||||
|
|
||||||
|
let status_cache = File::create(&snapshot_status_cache_file_path)?;
|
||||||
|
// status cache writer
|
||||||
|
let mut status_cache_stream = BufWriter::new(status_cache);
|
||||||
|
|
||||||
|
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
|
||||||
|
// write the status cache
|
||||||
|
serialize_into(&mut status_cache_stream, slot_deltas)
|
||||||
|
.map_err(|_| Self::get_io_error("serialize status cache error"))?;
|
||||||
|
status_cache_serialize.stop();
|
||||||
|
inc_new_counter_info!(
|
||||||
|
"serialize-status-cache-ms",
|
||||||
|
status_cache_serialize.as_ms() as usize
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service for SnapshotPackagerService {
|
||||||
|
type JoinReturnType = ();
|
||||||
|
|
||||||
|
fn join(self) -> thread::Result<()> {
|
||||||
|
self.t_snapshot_packager.join()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::snapshot_utils;
|
||||||
|
use solana_runtime::accounts_db::AccountStorageEntry;
|
||||||
|
use std::fs::OpenOptions;
|
||||||
|
use std::io::Write;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_package_snapshots() {
|
||||||
|
// Create temporary placeholder directory for all test files
|
||||||
|
let temp_dir = TempDir::new().unwrap();
|
||||||
|
let accounts_dir = temp_dir.path().join("accounts");
|
||||||
|
let snapshots_dir = temp_dir.path().join("snapshots");
|
||||||
|
let snapshot_package_output_path = temp_dir.path().join("snapshots_output");
|
||||||
|
fs::create_dir_all(&snapshot_package_output_path).unwrap();
|
||||||
|
|
||||||
|
// Create some storage entries
|
||||||
|
let storage_entries: Vec<_> = (0..5)
|
||||||
|
.map(|i| Arc::new(AccountStorageEntry::new(&accounts_dir, 0, i, 10)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Create some fake snapshot
|
||||||
|
fs::create_dir_all(&snapshots_dir).unwrap();
|
||||||
|
let snapshots_paths: Vec<_> = (0..5)
|
||||||
|
.map(|i| {
|
||||||
|
let fake_snapshot_path = snapshots_dir.join(format!("fake_snapshot_{}", i));
|
||||||
|
let mut fake_snapshot_file = OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.write(true)
|
||||||
|
.create(true)
|
||||||
|
.open(&fake_snapshot_path)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
fake_snapshot_file.write_all(b"Hello, world!").unwrap();
|
||||||
|
fake_snapshot_path
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Create directory of hard links for snapshots
|
||||||
|
let link_snapshots_dir = tempfile::tempdir_in(temp_dir.path()).unwrap();
|
||||||
|
for snapshots_path in snapshots_paths {
|
||||||
|
let snapshot_file_name = snapshots_path.file_name().unwrap();
|
||||||
|
let link_path = link_snapshots_dir.path().join(snapshot_file_name);
|
||||||
|
fs::hard_link(&snapshots_path, &link_path).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a packageable snapshot
|
||||||
|
let output_tar_path = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path);
|
||||||
|
let snapshot_package = SnapshotPackage::new(
|
||||||
|
5,
|
||||||
|
vec![],
|
||||||
|
link_snapshots_dir,
|
||||||
|
storage_entries.clone(),
|
||||||
|
output_tar_path.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Make tarball from packageable snapshot
|
||||||
|
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
|
||||||
|
|
||||||
|
// before we compare, stick an empty status_cache in this dir so that the package comparision works
|
||||||
|
// This is needed since the status_cache is added by the packager and is not collected from
|
||||||
|
// the source dir for snapshots
|
||||||
|
let slot_deltas: Vec<SlotDelta<TransactionResult<()>>> = vec![];
|
||||||
|
let dummy_status_cache = File::create(snapshots_dir.join("status_cache")).unwrap();
|
||||||
|
let mut status_cache_stream = BufWriter::new(dummy_status_cache);
|
||||||
|
serialize_into(&mut status_cache_stream, &slot_deltas).unwrap();
|
||||||
|
status_cache_stream.flush().unwrap();
|
||||||
|
|
||||||
|
// Check tarball is correct
|
||||||
|
snapshot_utils::tests::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir);
|
||||||
|
}
|
||||||
|
}
|
@ -1,7 +1,5 @@
|
|||||||
use crate::bank_forks::SnapshotConfig;
|
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use crate::snapshot_package::SnapshotPackage;
|
use crate::snapshot_package::SnapshotPackage;
|
||||||
use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
|
|
||||||
use bincode::{deserialize_from, serialize_into};
|
use bincode::{deserialize_from, serialize_into};
|
||||||
use bzip2::bufread::BzDecoder;
|
use bzip2::bufread::BzDecoder;
|
||||||
use fs_extra::dir::CopyOptions;
|
use fs_extra::dir::CopyOptions;
|
||||||
@ -17,6 +15,8 @@ use std::path::{Path, PathBuf};
|
|||||||
use tar::Archive;
|
use tar::Archive;
|
||||||
|
|
||||||
pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
|
pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
|
||||||
|
pub const TAR_SNAPSHOTS_DIR: &str = "snapshots";
|
||||||
|
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
|
||||||
|
|
||||||
#[derive(PartialEq, Ord, Eq, Debug)]
|
#[derive(PartialEq, Ord, Eq, Debug)]
|
||||||
pub struct SlotSnapshotPaths {
|
pub struct SlotSnapshotPaths {
|
||||||
@ -179,11 +179,11 @@ pub fn bank_slot_from_archive<P: AsRef<Path>>(snapshot_tar: P) -> Result<u64> {
|
|||||||
|
|
||||||
pub fn bank_from_archive<P: AsRef<Path>>(
|
pub fn bank_from_archive<P: AsRef<Path>>(
|
||||||
account_paths: String,
|
account_paths: String,
|
||||||
snapshot_config: &SnapshotConfig,
|
snapshot_path: &PathBuf,
|
||||||
snapshot_tar: P,
|
snapshot_tar: P,
|
||||||
) -> Result<Bank> {
|
) -> Result<Bank> {
|
||||||
// Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()`
|
// Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()`
|
||||||
let unpack_dir = tempfile::tempdir_in(&snapshot_config.snapshot_path)?;
|
let unpack_dir = tempfile::tempdir_in(snapshot_path)?;
|
||||||
untar_snapshot_in(&snapshot_tar, &unpack_dir)?;
|
untar_snapshot_in(&snapshot_tar, &unpack_dir)?;
|
||||||
|
|
||||||
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
|
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
|
||||||
@ -198,7 +198,7 @@ pub fn bank_from_archive<P: AsRef<Path>>(
|
|||||||
panic!("Snapshot bank failed to verify");
|
panic!("Snapshot bank failed to verify");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move the unpacked snapshots into `snapshot_config.snapshot_path`
|
// Move the unpacked snapshots into `snapshot_path`
|
||||||
let dir_files = fs::read_dir(&unpacked_snapshots_dir).unwrap_or_else(|err| {
|
let dir_files = fs::read_dir(&unpacked_snapshots_dir).unwrap_or_else(|err| {
|
||||||
panic!(
|
panic!(
|
||||||
"Invalid snapshot path {:?}: {}",
|
"Invalid snapshot path {:?}: {}",
|
||||||
@ -210,7 +210,7 @@ pub fn bank_from_archive<P: AsRef<Path>>(
|
|||||||
.collect();
|
.collect();
|
||||||
let mut copy_options = CopyOptions::new();
|
let mut copy_options = CopyOptions::new();
|
||||||
copy_options.overwrite = true;
|
copy_options.overwrite = true;
|
||||||
fs_extra::move_items(&paths, &snapshot_config.snapshot_path, ©_options)?;
|
fs_extra::move_items(&paths, &snapshot_path, ©_options)?;
|
||||||
|
|
||||||
Ok(bank)
|
Ok(bank)
|
||||||
}
|
}
|
||||||
@ -284,7 +284,6 @@ fn get_io_error(error: &str) -> Error {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod tests {
|
pub mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
|
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
pub fn verify_snapshot_tar<P, Q, R>(
|
pub fn verify_snapshot_tar<P, Q, R>(
|
||||||
|
@ -23,7 +23,7 @@ use crate::retransmit_stage::RetransmitStage;
|
|||||||
use crate::rpc_subscriptions::RpcSubscriptions;
|
use crate::rpc_subscriptions::RpcSubscriptions;
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::shred_fetch_stage::ShredFetchStage;
|
use crate::shred_fetch_stage::ShredFetchStage;
|
||||||
use crate::snapshot_package::SnapshotPackagerService;
|
use crate::snapshot_packager_service::SnapshotPackagerService;
|
||||||
use crate::storage_stage::{StorageStage, StorageState};
|
use crate::storage_stage::{StorageStage, StorageState};
|
||||||
use solana_ledger::blocktree::{Blocktree, CompletedSlotsReceiver};
|
use solana_ledger::blocktree::{Blocktree, CompletedSlotsReceiver};
|
||||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||||
|
@ -411,7 +411,7 @@ fn get_bank_forks(
|
|||||||
account_paths
|
account_paths
|
||||||
.clone()
|
.clone()
|
||||||
.expect("Account paths not present when booting from snapshot"),
|
.expect("Account paths not present when booting from snapshot"),
|
||||||
snapshot_config,
|
&snapshot_config.snapshot_path,
|
||||||
&tar,
|
&tar,
|
||||||
)
|
)
|
||||||
.expect("Load from snapshot failed");
|
.expect("Load from snapshot failed");
|
||||||
|
Reference in New Issue
Block a user