diff --git a/Cargo.lock b/Cargo.lock index 4594373045..95f67322ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3631,6 +3631,7 @@ dependencies = [ "solana-sdk 0.22.4", "solana-stake-program 0.22.4", "solana-vote-program 0.22.4", + "symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "sys-info 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)", "tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 3dedae4510..089b9e58ba 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,16 +1,7 @@ -use crate::result::{Error, Result}; -use bincode::serialize_into; use solana_ledger::{ - snapshot_package::{SnapshotPackage, SnapshotPackageReceiver}, - snapshot_utils::{self, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR, TAR_VERSION_FILE}, + snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package, }; -use solana_measure::measure::Measure; -use solana_metrics::datapoint_info; -use solana_runtime::status_cache::SlotDelta; -use solana_sdk::transaction; use std::{ - fs::{self, File}, - io::{BufWriter, Error as IOError, ErrorKind}, sync::{ atomic::{AtomicBool, Ordering}, mpsc::RecvTimeoutError, @@ -19,8 +10,6 @@ use std::{ thread::{self, Builder, JoinHandle}, time::Duration, }; -use symlink; -use tempfile::TempDir; pub struct SnapshotPackagerService { t_snapshot_packager: JoinHandle<()>, @@ -35,12 +24,19 @@ impl SnapshotPackagerService { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = Self::run(&snapshot_package_receiver) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => info!("Error from package_snapshots: {:?}", e), + + match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) { + Ok(mut snapshot_package) => { + // Only package the latest + while let Ok(new_snapshot_package) = snapshot_package_receiver.try_recv() { + snapshot_package = new_snapshot_package; + } + if let Err(err) = archive_snapshot_package(&snapshot_package) { + warn!("Failed to create snapshot archive: {}", err); + } } + Err(RecvTimeoutError::Disconnected) => break, + Err(RecvTimeoutError::Timeout) => (), } }) .unwrap(); @@ -49,155 +45,6 @@ impl SnapshotPackagerService { } } - pub fn package_snapshots(snapshot_package: &SnapshotPackage) -> Result<()> { - info!( - "Generating snapshot tarball for root {}", - snapshot_package.root - ); - - Self::serialize_status_cache( - &snapshot_package.slot_deltas, - &snapshot_package.snapshot_links, - )?; - - let mut timer = Measure::start("snapshot_package-package_snapshots"); - let tar_dir = snapshot_package - .tar_output_file - .parent() - .expect("Tar output path is invalid"); - - fs::create_dir_all(tar_dir)?; - - // Create the staging directories - let staging_dir = TempDir::new()?; - let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR); - let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR); - let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE); - fs::create_dir_all(&staging_accounts_dir)?; - - // Add the snapshots to the staging directory - symlink::symlink_dir( - snapshot_package.snapshot_links.path(), - &staging_snapshots_dir, - )?; - - // Add the AppendVecs into the compressible list - for storage in &snapshot_package.storage_entries { - storage.flush()?; - let storage_path = storage.get_path(); - let output_path = staging_accounts_dir.join( - storage_path - .file_name() - .expect("Invalid AppendVec file path"), - ); - - // `storage_path` - The file path where the AppendVec itself is located - // `output_path` - The directory where the AppendVec will be placed in the staging directory. - let storage_path = - fs::canonicalize(storage_path).expect("Could not get absolute path for accounts"); - symlink::symlink_dir(storage_path, &output_path)?; - if !output_path.is_file() { - return Err(Self::get_io_error( - "Error trying to generate snapshot archive: storage path symlink is invalid", - )); - } - } - - // Write version file - { - use std::io::Write; - let snapshot_version = format!("{}\n", env!("CARGO_PKG_VERSION")); - let mut f = std::fs::File::create(staging_version_file)?; - //f.write_all(&snapshot_version.to_string().into_bytes())?; - f.write_all(&snapshot_version.into_bytes())?; - } - - // Tar the staging directory into the archive at `archive_path` - let archive_path = tar_dir.join("new_state.tar.bz2"); - let args = vec![ - "jcfhS", - archive_path.to_str().unwrap(), - "-C", - staging_dir.path().to_str().unwrap(), - TAR_ACCOUNTS_DIR, - TAR_SNAPSHOTS_DIR, - TAR_VERSION_FILE, - ]; - - let output = std::process::Command::new("tar").args(&args).output()?; - if !output.status.success() { - warn!("tar command failed with exit code: {}", output.status); - use std::str::from_utf8; - info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?")); - info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?")); - - return Err(Self::get_io_error(&format!( - "Error trying to generate snapshot archive: {}", - output.status - ))); - } - - // Once everything is successful, overwrite the previous tarball so that other validators - // can fetch this newly packaged snapshot - let metadata = fs::metadata(&archive_path)?; - fs::rename(&archive_path, &snapshot_package.tar_output_file)?; - - timer.stop(); - info!( - "Successfully created tarball. slot: {}, elapsed ms: {}, size={}", - snapshot_package.root, - timer.as_ms(), - metadata.len() - ); - datapoint_info!( - "snapshot-package", - ("slot", snapshot_package.root, i64), - ("duration_ms", timer.as_ms(), i64), - ("size", metadata.len(), i64) - ); - Ok(()) - } - - fn run(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> { - let mut snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?; - // Only package the latest - while let Ok(new_snapshot_package) = snapshot_receiver.try_recv() { - snapshot_package = new_snapshot_package; - } - Self::package_snapshots(&snapshot_package)?; - Ok(()) - } - - fn get_io_error(error: &str) -> Error { - warn!("Snapshot Packaging Error: {:?}", error); - Error::IO(IOError::new(ErrorKind::Other, error)) - } - - fn serialize_status_cache( - slot_deltas: &[SlotDelta>], - snapshot_links: &TempDir, - ) -> Result<()> { - // the status cache is stored as snapshot_path/status_cache - let snapshot_status_cache_file_path = snapshot_links - .path() - .join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME); - - let status_cache = File::create(&snapshot_status_cache_file_path)?; - // status cache writer - let mut status_cache_stream = BufWriter::new(status_cache); - - let mut status_cache_serialize = Measure::start("status_cache_serialize-ms"); - // write the status cache - serialize_into(&mut status_cache_stream, slot_deltas) - .map_err(|_| Self::get_io_error("serialize status cache error"))?; - status_cache_serialize.stop(); - inc_new_counter_info!( - "serialize-status-cache-ms", - status_cache_serialize.as_ms() as usize - ); - Ok(()) - } - pub fn join(self) -> thread::Result<()> { self.t_snapshot_packager.join() } @@ -206,11 +53,13 @@ impl SnapshotPackagerService { #[cfg(test)] mod tests { use super::*; - use solana_ledger::snapshot_utils; - use solana_runtime::accounts_db::AccountStorageEntry; + use bincode::serialize_into; + use solana_ledger::{snapshot_package::SnapshotPackage, snapshot_utils}; + use solana_runtime::{accounts_db::AccountStorageEntry, status_cache::SlotDelta}; + use solana_sdk::transaction; use std::{ - fs::{remove_dir_all, OpenOptions}, - io::Write, + fs::{self, remove_dir_all, File, OpenOptions}, + io::{BufWriter, Write}, path::{Path, PathBuf}, }; use tempfile::TempDir; @@ -278,7 +127,8 @@ mod tests { } // Create a packageable snapshot - let output_tar_path = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path); + let output_tar_path = + snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path); let snapshot_package = SnapshotPackage::new( 5, vec![], @@ -288,7 +138,7 @@ mod tests { ); // Make tarball from packageable snapshot - SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); + snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap(); // before we compare, stick an empty status_cache in this dir so that the package comparision works // This is needed since the status_cache is added by the packager and is not collected from @@ -299,7 +149,7 @@ mod tests { serialize_into(&mut status_cache_stream, &slot_deltas).unwrap(); status_cache_stream.flush().unwrap(); - // Check tarball is correct - snapshot_utils::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir); + // Check archive is correct + snapshot_utils::verify_snapshot_archive(output_tar_path, snapshots_dir, accounts_dir); } } diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index 6cf849cbb0..61079ba3ee 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -86,7 +86,7 @@ mod tests { .as_ref() .unwrap() .snapshot_path, - snapshot_utils::get_snapshot_tar_path(snapshot_package_output_path), + snapshot_utils::get_snapshot_archive_path(snapshot_package_output_path), ) .unwrap(); @@ -143,12 +143,15 @@ mod tests { slot_snapshot_paths .last() .expect("no snapshots found in path"), - snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path), + snapshot_utils::get_snapshot_archive_path( + &snapshot_config.snapshot_package_output_path, + ), &snapshot_config.snapshot_path, &last_bank.src.roots(), ) .unwrap(); - SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); + + snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap(); restore_from_snapshot(bank_forks, vec![accounts_dir.path().to_path_buf()]); } @@ -324,7 +327,7 @@ mod tests { serialize_into(&mut status_cache_stream, &slot_deltas).unwrap(); status_cache_stream.flush().unwrap(); - snapshot_utils::verify_snapshot_tar( + snapshot_utils::verify_snapshot_archive( saved_tar, saved_snapshots_dir.path(), saved_accounts_dir diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index e0f114df2d..fade508682 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -43,6 +43,7 @@ solana-sdk = { path = "../sdk", version = "0.22.4" } solana-stake-program = { path = "../programs/stake", version = "0.22.4" } solana-vote-program = { path = "../programs/vote", version = "0.22.4" } sys-info = "0.5.8" +symlink = "0.1.0" tar = "0.4.26" thiserror = "1.0" tempfile = "3.1.0" diff --git a/ledger/src/bank_forks.rs b/ledger/src/bank_forks.rs index 9a7a3c61f8..9ceca079c9 100644 --- a/ledger/src/bank_forks.rs +++ b/ledger/src/bank_forks.rs @@ -192,7 +192,7 @@ impl BankForks { root, &root_bank.src.roots(), snapshot_package_sender.as_ref().unwrap(), - snapshot_utils::get_snapshot_tar_path(&config.snapshot_package_output_path), + snapshot_utils::get_snapshot_archive_path(&config.snapshot_package_output_path), ); if r.is_err() { warn!("Error generating snapshot for bank: {}, err: {:?}", root, r); diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 89abac68fe..2241d7ab25 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -25,8 +25,9 @@ pub fn load( fs::create_dir_all(&snapshot_config.snapshot_path) .expect("Couldn't create snapshot directory"); - let tar = - snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path); + let tar = snapshot_utils::get_snapshot_archive_path( + &snapshot_config.snapshot_package_output_path, + ); if tar.exists() { info!("Loading snapshot package: {:?}", tar); // Fail hard here if snapshot fails to load, don't silently continue diff --git a/ledger/src/snapshot_utils.rs b/ledger/src/snapshot_utils.rs index d782c4b6cb..928069f752 100644 --- a/ledger/src/snapshot_utils.rs +++ b/ledger/src/snapshot_utils.rs @@ -11,12 +11,13 @@ use solana_runtime::{ use solana_sdk::{clock::Slot, transaction}; use std::{ cmp::Ordering, - fs, - fs::File, - io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read}, + fs::{self, File}, + io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Write}, path::{Path, PathBuf}, + process::ExitStatus, }; use tar::Archive; +use tempfile::TempDir; use thiserror::Error; pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; @@ -40,6 +41,12 @@ pub enum SnapshotError { #[error("file system error")] FsExtra(#[from] fs_extra::error::Error), + + #[error("archive generation failure {0}")] + ArchiveGenerationFailure(ExitStatus), + + #[error("storage path symlink is invalid")] + StoragePathSymlinkInvalid, } pub type Result = std::result::Result; @@ -105,6 +112,138 @@ pub fn package_snapshot, Q: AsRef>( Ok(package) } +pub fn archive_snapshot_package(snapshot_package: &SnapshotPackage) -> Result<()> { + info!( + "Generating snapshot tarball for root {}", + snapshot_package.root + ); + + fn serialize_status_cache( + slot_deltas: &[SlotDelta>], + snapshot_links: &TempDir, + ) -> Result<()> { + // the status cache is stored as snapshot_path/status_cache + let snapshot_status_cache_file_path = + snapshot_links.path().join(SNAPSHOT_STATUS_CACHE_FILE_NAME); + + let status_cache = File::create(&snapshot_status_cache_file_path)?; + // status cache writer + let mut status_cache_stream = BufWriter::new(status_cache); + + let mut status_cache_serialize = Measure::start("status_cache_serialize-ms"); + // write the status cache + serialize_into(&mut status_cache_stream, slot_deltas) + .map_err(|_| get_io_error("serialize status cache error"))?; + status_cache_serialize.stop(); + inc_new_counter_info!( + "serialize-status-cache-ms", + status_cache_serialize.as_ms() as usize + ); + Ok(()) + } + + serialize_status_cache( + &snapshot_package.slot_deltas, + &snapshot_package.snapshot_links, + )?; + + let mut timer = Measure::start("snapshot_package-package_snapshots"); + let tar_dir = snapshot_package + .tar_output_file + .parent() + .expect("Tar output path is invalid"); + + fs::create_dir_all(tar_dir)?; + + // Create the staging directories + let staging_dir = TempDir::new()?; + let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR); + let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR); + let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE); + fs::create_dir_all(&staging_accounts_dir)?; + + // Add the snapshots to the staging directory + symlink::symlink_dir( + snapshot_package.snapshot_links.path(), + &staging_snapshots_dir, + )?; + + // Add the AppendVecs into the compressible list + for storage in &snapshot_package.storage_entries { + storage.flush()?; + let storage_path = storage.get_path(); + let output_path = staging_accounts_dir.join( + storage_path + .file_name() + .expect("Invalid AppendVec file path"), + ); + + // `storage_path` - The file path where the AppendVec itself is located + // `output_path` - The directory where the AppendVec will be placed in the staging directory. + let storage_path = + fs::canonicalize(storage_path).expect("Could not get absolute path for accounts"); + symlink::symlink_dir(storage_path, &output_path)?; + if !output_path.is_file() { + return Err(get_io_error( + "Error trying to generate snapshot archive: storage path symlink is invalid", + )); + } + } + + // Write version file + { + let snapshot_version = format!("{}\n", env!("CARGO_PKG_VERSION")); + let mut f = std::fs::File::create(staging_version_file)?; + //f.write_all(&snapshot_version.to_string().into_bytes())?; + f.write_all(&snapshot_version.into_bytes())?; + } + + // Tar the staging directory into the archive at `archive_path` + let archive_path = tar_dir.join("new_state.tar.bz2"); + let args = vec![ + "jcfhS", + archive_path.to_str().unwrap(), + "-C", + staging_dir.path().to_str().unwrap(), + TAR_ACCOUNTS_DIR, + TAR_SNAPSHOTS_DIR, + TAR_VERSION_FILE, + ]; + + let output = std::process::Command::new("tar").args(&args).output()?; + if !output.status.success() { + warn!("tar command failed with exit code: {}", output.status); + use std::str::from_utf8; + info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?")); + info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?")); + + return Err(get_io_error(&format!( + "Error trying to generate snapshot archive: {}", + output.status + ))); + } + + // Once everything is successful, overwrite the previous tarball so that other validators + // can fetch this newly packaged snapshot + let metadata = fs::metadata(&archive_path)?; + fs::rename(&archive_path, &snapshot_package.tar_output_file)?; + + timer.stop(); + info!( + "Successfully created tarball. slot: {}, elapsed ms: {}, size={}", + snapshot_package.root, + timer.as_ms(), + metadata.len() + ); + datapoint_info!( + "snapshot-package", + ("slot", snapshot_package.root, i64), + ("duration_ms", timer.as_ms(), i64), + ("size", metadata.len(), i64) + ); + Ok(()) +} + pub fn get_snapshot_paths>(snapshot_path: P) -> Vec where P: std::fmt::Debug, @@ -249,7 +388,7 @@ pub fn bank_from_archive>( Ok(bank) } -pub fn get_snapshot_tar_path>(snapshot_output_dir: P) -> PathBuf { +pub fn get_snapshot_archive_path>(snapshot_output_dir: P) -> PathBuf { snapshot_output_dir.as_ref().join("snapshot.tar.bz2") } @@ -337,8 +476,11 @@ fn get_io_error(error: &str) -> SnapshotError { SnapshotError::IO(IOError::new(ErrorKind::Other, error)) } -pub fn verify_snapshot_tar(snapshot_tar: P, snapshots_to_verify: Q, storages_to_verify: R) -where +pub fn verify_snapshot_archive( + snapshot_tar: P, + snapshots_to_verify: Q, + storages_to_verify: R, +) where P: AsRef, Q: AsRef, R: AsRef, diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 0e0cb95724..e27306de4e 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -651,12 +651,13 @@ fn test_snapshot_restart_tower() { .as_ref() .unwrap() .snapshot_package_output_path; - let tar = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path); + let tar = snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path); wait_for_next_snapshot(&cluster, &tar); // Copy tar to validator's snapshot output directory - let validator_tar_path = - snapshot_utils::get_snapshot_tar_path(&validator_snapshot_test_config.snapshot_output_path); + let validator_tar_path = snapshot_utils::get_snapshot_archive_path( + &validator_snapshot_test_config.snapshot_output_path, + ); fs::hard_link(tar, &validator_tar_path).unwrap(); // Restart validator from snapshot, the validator's tower state in this snapshot @@ -702,7 +703,7 @@ fn test_snapshots_blockstore_floor() { trace!("Waiting for snapshot tar to be generated with slot",); - let tar = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path); + let tar = snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path); loop { if tar.exists() { trace!("snapshot tar exists"); @@ -712,8 +713,9 @@ fn test_snapshots_blockstore_floor() { } // Copy tar to validator's snapshot output directory - let validator_tar_path = - snapshot_utils::get_snapshot_tar_path(&validator_snapshot_test_config.snapshot_output_path); + let validator_tar_path = snapshot_utils::get_snapshot_archive_path( + &validator_snapshot_test_config.snapshot_output_path, + ); fs::hard_link(tar, &validator_tar_path).unwrap(); let slot_floor = snapshot_utils::bank_slot_from_archive(&validator_tar_path).unwrap(); @@ -803,7 +805,7 @@ fn test_snapshots_restart_validity() { expected_balances.extend(new_balances); - let tar = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path); + let tar = snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path); wait_for_next_snapshot(&cluster, &tar); // Create new account paths since validator exit is not guaranteed to cleanup RPC threads, diff --git a/validator/src/main.rs b/validator/src/main.rs index df1f85e4c6..37dff95e29 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -323,7 +323,8 @@ fn download_ledger( download_tar_bz2(rpc_addr, "genesis.tar.bz2", ledger_path, false)?; if !no_snapshot_fetch { - let snapshot_package = solana_ledger::snapshot_utils::get_snapshot_tar_path(ledger_path); + let snapshot_package = + solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path); if snapshot_package.exists() { fs::remove_file(&snapshot_package) .map_err(|err| format!("error removing {:?}: {}", snapshot_package, err))?;