Unpack snapshot AppendVecs directly into account paths

This commit is contained in:
Michael Vines
2021-03-10 09:49:10 -08:00
parent c078e01fa9
commit 1061d021c9
8 changed files with 210 additions and 167 deletions

View File

@@ -1,42 +1,43 @@
use crate::{
accounts_db::AccountsDb,
accounts_index::AccountIndex,
bank::{Bank, BankSlotDelta, Builtins},
bank_forks::ArchiveFormat,
hardened_unpack::{unpack_snapshot, UnpackError},
serde_snapshot::{
bank_from_stream, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages,
use {
crate::{
accounts_db::AccountsDb,
accounts_index::AccountIndex,
bank::{Bank, BankSlotDelta, Builtins},
bank_forks::ArchiveFormat,
hardened_unpack::{unpack_snapshot, UnpackError, UnpackedAppendVecMap},
serde_snapshot::{
bank_from_stream, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages,
},
snapshot_package::{
AccountsPackage, AccountsPackagePre, AccountsPackageSendError, AccountsPackageSender,
},
},
snapshot_package::{
AccountsPackage, AccountsPackagePre, AccountsPackageSendError, AccountsPackageSender,
bincode::{config::Options, serialize_into},
bzip2::bufread::BzDecoder,
flate2::read::GzDecoder,
log::*,
rayon::ThreadPool,
regex::Regex,
solana_measure::measure::Measure,
solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey},
std::{
cmp::Ordering,
collections::HashSet,
fmt,
fs::{self, File},
io::{
self, BufReader, BufWriter, Error as IoError, ErrorKind, Read, Seek, SeekFrom, Write,
},
path::{Path, PathBuf},
process::{self, ExitStatus},
str::FromStr,
sync::Arc,
},
tar::Archive,
thiserror::Error,
};
use bincode::{config::Options, serialize_into};
use bzip2::bufread::BzDecoder;
use flate2::read::GzDecoder;
use log::*;
use rayon::ThreadPool;
use regex::Regex;
use solana_measure::measure::Measure;
use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey};
use std::collections::HashSet;
use std::sync::Arc;
use std::{
cmp::Ordering,
fmt,
fs::{self, File},
io::{self, BufReader, BufWriter, Error as IoError, ErrorKind, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
process::{self, ExitStatus},
str::FromStr,
};
use tar::Archive;
use thiserror::Error;
pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
pub const TAR_SNAPSHOTS_DIR: &str = "snapshots";
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
pub const TAR_VERSION_FILE: &str = "version";
pub const MAX_SNAPSHOTS: usize = 8; // Save some snapshots but not too many
const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
@@ -256,9 +257,9 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
))
.tempdir_in(tar_dir)?;
let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR);
let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR);
let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE);
let staging_accounts_dir = staging_dir.path().join("accounts");
let staging_snapshots_dir = staging_dir.path().join("snapshots");
let staging_version_file = staging_dir.path().join("version");
fs::create_dir_all(&staging_accounts_dir)?;
// Add the snapshots to the staging directory
@@ -307,9 +308,9 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
"chS",
"-C",
staging_dir.path().to_str().unwrap(),
TAR_ACCOUNTS_DIR,
TAR_SNAPSHOTS_DIR,
TAR_VERSION_FILE,
"accounts",
"snapshots",
"version",
])
.stdin(process::Stdio::null())
.stdout(process::Stdio::piped())
@@ -595,26 +596,30 @@ pub fn bank_from_archive<P: AsRef<Path>>(
account_indexes: HashSet<AccountIndex>,
accounts_db_caching_enabled: bool,
) -> Result<Bank> {
// Untar the snapshot into a temporary directory
let unpack_dir = tempfile::Builder::new()
.prefix(TMP_SNAPSHOT_PREFIX)
.tempdir_in(snapshot_path)?;
untar_snapshot_in(&snapshot_tar, &unpack_dir, archive_format)?;
let unpacked_append_vec_map = untar_snapshot_in(
&snapshot_tar,
&unpack_dir.as_ref(),
account_paths,
archive_format,
)?;
let mut measure = Measure::start("bank rebuild from snapshot");
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR);
let unpacked_version_file = unpack_dir.as_ref().join(TAR_VERSION_FILE);
let unpacked_snapshots_dir = unpack_dir.as_ref().join("snapshots");
let unpacked_version_file = unpack_dir.as_ref().join("version");
let mut snapshot_version = String::new();
File::open(unpacked_version_file).and_then(|mut f| f.read_to_string(&mut snapshot_version))?;
let bank = rebuild_bank_from_snapshots(
snapshot_version.trim(),
account_paths,
frozen_account_pubkeys,
&unpacked_snapshots_dir,
unpacked_accounts_dir,
account_paths,
unpacked_append_vec_map,
genesis_config,
debug_keys,
additional_builtins,
@@ -722,56 +727,54 @@ pub fn purge_old_snapshot_archives<P: AsRef<Path>>(snapshot_output_dir: P) {
}
}
pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
fn untar_snapshot_in<P: AsRef<Path>>(
snapshot_tar: P,
unpack_dir: Q,
unpack_dir: &Path,
account_paths: &[PathBuf],
archive_format: ArchiveFormat,
) -> Result<()> {
) -> Result<UnpackedAppendVecMap> {
let mut measure = Measure::start("snapshot untar");
let tar_name = File::open(&snapshot_tar)?;
match archive_format {
let account_paths_map = match archive_format {
ArchiveFormat::TarBzip2 => {
let tar = BzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir)?;
unpack_snapshot(&mut archive, unpack_dir, account_paths)?
}
ArchiveFormat::TarGzip => {
let tar = GzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir)?;
unpack_snapshot(&mut archive, unpack_dir, account_paths)?
}
ArchiveFormat::TarZstd => {
let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?;
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir)?;
unpack_snapshot(&mut archive, unpack_dir, account_paths)?
}
ArchiveFormat::Tar => {
let tar = BufReader::new(tar_name);
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir)?;
unpack_snapshot(&mut archive, unpack_dir, account_paths)?
}
};
measure.stop();
info!("{}", measure);
Ok(())
Ok(account_paths_map)
}
#[allow(clippy::too_many_arguments)]
fn rebuild_bank_from_snapshots<P>(
fn rebuild_bank_from_snapshots(
snapshot_version: &str,
account_paths: &[PathBuf],
frozen_account_pubkeys: &[Pubkey],
unpacked_snapshots_dir: &Path,
append_vecs_path: P,
account_paths: &[PathBuf],
unpacked_append_vec_map: UnpackedAppendVecMap,
genesis_config: &GenesisConfig,
debug_keys: Option<Arc<HashSet<Pubkey>>>,
additional_builtins: Option<&Builtins>,
account_indexes: HashSet<AccountIndex>,
accounts_db_caching_enabled: bool,
) -> Result<Bank>
where
P: AsRef<Path>,
{
) -> Result<Bank> {
info!("snapshot version: {}", snapshot_version);
let snapshot_version_enum =
@@ -798,8 +801,8 @@ where
SnapshotVersion::V1_2_0 => bank_from_stream(
SerdeStyle::Newer,
&mut stream,
&append_vecs_path,
account_paths,
unpacked_append_vec_map,
genesis_config,
frozen_account_pubkeys,
debug_keys,
@@ -855,14 +858,20 @@ pub fn verify_snapshot_archive<P, Q, R>(
{
let temp_dir = tempfile::TempDir::new().unwrap();
let unpack_dir = temp_dir.path();
untar_snapshot_in(snapshot_archive, &unpack_dir, archive_format).unwrap();
untar_snapshot_in(
snapshot_archive,
&unpack_dir,
&[unpack_dir.to_path_buf()],
archive_format,
)
.unwrap();
// Check snapshots are the same
let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR);
let unpacked_snapshots = unpack_dir.join("snapshots");
assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
// Check the account entries are the same
let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR);
let unpacked_accounts = unpack_dir.join("accounts");
assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap());
}