Refactor status cache and remove complex serialize/deserialize (#5335)
automerge
This commit is contained in:
@ -57,7 +57,7 @@ pub struct BankForks {
|
||||
working_bank: Arc<Bank>,
|
||||
root: u64,
|
||||
snapshot_config: Option<SnapshotConfig>,
|
||||
last_snapshot: u64,
|
||||
slots_since_snapshot: Vec<u64>,
|
||||
confidence: HashMap<u64, Confidence>,
|
||||
}
|
||||
|
||||
@ -110,7 +110,7 @@ impl BankForks {
|
||||
working_bank,
|
||||
root: 0,
|
||||
snapshot_config: None,
|
||||
last_snapshot: 0,
|
||||
slots_since_snapshot: vec![],
|
||||
confidence: HashMap::new(),
|
||||
}
|
||||
}
|
||||
@ -175,7 +175,7 @@ impl BankForks {
|
||||
banks,
|
||||
working_bank,
|
||||
snapshot_config: None,
|
||||
last_snapshot: 0,
|
||||
slots_since_snapshot: vec![],
|
||||
confidence: HashMap::new(),
|
||||
}
|
||||
}
|
||||
@ -208,26 +208,27 @@ impl BankForks {
|
||||
.unwrap_or(0);
|
||||
root_bank.squash();
|
||||
let new_tx_count = root_bank.transaction_count();
|
||||
self.prune_non_root(root);
|
||||
self.slots_since_snapshot.push(root);
|
||||
|
||||
// Generate a snapshot if snapshots are configured and it's been an appropriate number
|
||||
// of banks since the last snapshot
|
||||
if self.snapshot_config.is_some() {
|
||||
if self.snapshot_config.is_some() && snapshot_package_sender.is_some() {
|
||||
let config = self
|
||||
.snapshot_config
|
||||
.as_ref()
|
||||
.expect("Called package_snapshot without a snapshot configuration");
|
||||
if root - self.last_snapshot >= config.snapshot_interval_slots as u64 {
|
||||
if root - self.slots_since_snapshot[0] >= config.snapshot_interval_slots as u64 {
|
||||
let mut snapshot_time = Measure::start("total-snapshot-ms");
|
||||
let r = self.generate_snapshot(
|
||||
root,
|
||||
&self.slots_since_snapshot[1..],
|
||||
snapshot_package_sender.as_ref().unwrap(),
|
||||
snapshot_utils::get_snapshot_tar_path(&config.snapshot_package_output_path),
|
||||
);
|
||||
if r.is_err() {
|
||||
warn!("Error generating snapshot for bank: {}, err: {:?}", root, r);
|
||||
} else {
|
||||
self.last_snapshot = root;
|
||||
self.slots_since_snapshot = vec![root];
|
||||
}
|
||||
|
||||
// Cleanup outdated snapshots
|
||||
@ -237,6 +238,8 @@ impl BankForks {
|
||||
}
|
||||
}
|
||||
|
||||
self.prune_non_root(root);
|
||||
|
||||
inc_new_counter_info!(
|
||||
"bank-forks_set_root_ms",
|
||||
timing::duration_as_ms(&set_root_start.elapsed()) as usize
|
||||
@ -254,10 +257,10 @@ impl BankForks {
|
||||
fn purge_old_snapshots(&self) {
|
||||
// Remove outdated snapshots
|
||||
let config = self.snapshot_config.as_ref().unwrap();
|
||||
let names = snapshot_utils::get_snapshot_names(&config.snapshot_path);
|
||||
let num_to_remove = names.len().saturating_sub(MAX_CACHE_ENTRIES);
|
||||
for old_root in &names[..num_to_remove] {
|
||||
let r = snapshot_utils::remove_snapshot(*old_root, &config.snapshot_path);
|
||||
let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path);
|
||||
let num_to_remove = slot_snapshot_paths.len().saturating_sub(MAX_CACHE_ENTRIES);
|
||||
for slot_files in &slot_snapshot_paths[..num_to_remove] {
|
||||
let r = snapshot_utils::remove_snapshot(slot_files.slot, &config.snapshot_path);
|
||||
if r.is_err() {
|
||||
warn!("Couldn't remove snapshot at: {:?}", config.snapshot_path);
|
||||
}
|
||||
@ -267,6 +270,7 @@ impl BankForks {
|
||||
fn generate_snapshot<P: AsRef<Path>>(
|
||||
&self,
|
||||
root: u64,
|
||||
slots_since_snapshot: &[u64],
|
||||
snapshot_package_sender: &SnapshotPackageSender,
|
||||
tar_output_file: P,
|
||||
) -> Result<()> {
|
||||
@ -277,19 +281,17 @@ impl BankForks {
|
||||
.get(root)
|
||||
.cloned()
|
||||
.expect("root must exist in BankForks");
|
||||
snapshot_utils::add_snapshot(&config.snapshot_path, &bank)?;
|
||||
|
||||
snapshot_utils::add_snapshot(&config.snapshot_path, &bank, slots_since_snapshot)?;
|
||||
// Package the relevant snapshots
|
||||
let names = snapshot_utils::get_snapshot_names(&config.snapshot_path);
|
||||
let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path);
|
||||
|
||||
// We only care about the last MAX_CACHE_ENTRIES snapshots of roots because
|
||||
// the status cache of anything older is thrown away by the bank in
|
||||
// status_cache.prune_roots()
|
||||
let start = names.len().saturating_sub(MAX_CACHE_ENTRIES);
|
||||
let start = slot_snapshot_paths.len().saturating_sub(MAX_CACHE_ENTRIES);
|
||||
let package = snapshot_utils::package_snapshot(
|
||||
&bank,
|
||||
&names[start..],
|
||||
&config.snapshot_path,
|
||||
&slot_snapshot_paths[start..],
|
||||
tar_output_file,
|
||||
)?;
|
||||
|
||||
@ -362,9 +364,10 @@ impl BankForks {
|
||||
|
||||
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
|
||||
let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR);
|
||||
let snapshot_paths = snapshot_utils::get_snapshot_paths(&unpacked_snapshots_dir);
|
||||
let bank = snapshot_utils::bank_from_snapshots(
|
||||
account_paths,
|
||||
&unpacked_snapshots_dir,
|
||||
&snapshot_paths,
|
||||
unpacked_accounts_dir,
|
||||
)?;
|
||||
|
||||
@ -378,34 +381,11 @@ impl BankForks {
|
||||
copy_options.overwrite = true;
|
||||
fs_extra::move_items(&paths, snapshot_config.snapshot_path(), ©_options)?;
|
||||
|
||||
/*let mut bank_maps = vec![];
|
||||
let status_cache_rc = StatusCacheRc::default();
|
||||
let id = (names[names.len() - 1] + 1) as usize;
|
||||
let mut bank0 =
|
||||
Bank::create_with_genesis(&genesis_block, account_paths.clone(), &status_cache_rc, id);
|
||||
bank0.freeze();
|
||||
let bank_root = snapshot_utils::load_snapshots(
|
||||
&names,
|
||||
&mut bank0,
|
||||
&mut bank_maps,
|
||||
&status_cache_rc,
|
||||
&snapshot_config.snapshot_path,
|
||||
);
|
||||
if bank_maps.is_empty() || bank_root.is_none() {
|
||||
return Err(Error::IO(IOError::new(
|
||||
ErrorKind::Other,
|
||||
"no snapshots loaded",
|
||||
)));
|
||||
}
|
||||
|
||||
let (banks, last_slot) = BankForks::setup_banks(&mut bank_maps, &bank.rc, &status_cache_rc);
|
||||
let working_bank = banks[&last_slot].clone();*/
|
||||
|
||||
let mut banks = HashMap::new();
|
||||
banks.insert(bank.slot(), bank.clone());
|
||||
let root = bank.slot();
|
||||
let names = snapshot_utils::get_snapshot_names(&snapshot_config.snapshot_path);
|
||||
if names.is_empty() {
|
||||
let snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path);
|
||||
if snapshot_paths.is_empty() {
|
||||
return Err(Error::IO(IOError::new(
|
||||
ErrorKind::Other,
|
||||
"no snapshots found",
|
||||
@ -416,9 +396,7 @@ impl BankForks {
|
||||
working_bank: bank,
|
||||
root,
|
||||
snapshot_config: None,
|
||||
last_snapshot: *names
|
||||
.last()
|
||||
.expect("untarred snapshot should have at least one snapshot"),
|
||||
slots_since_snapshot: vec![snapshot_paths.last().unwrap().slot],
|
||||
confidence: HashMap::new(),
|
||||
})
|
||||
}
|
||||
@ -432,7 +410,7 @@ mod tests {
|
||||
use crate::snapshot_package::SnapshotPackagerService;
|
||||
use fs_extra::dir::CopyOptions;
|
||||
use itertools::Itertools;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::hash::{hashv, Hash};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::system_transaction;
|
||||
@ -576,8 +554,14 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_snapshot_n() {
|
||||
// creates banks upto "last_slot" and runs the input function `f` on each bank created
|
||||
// also marks each bank as root and generates snapshots
|
||||
// finally tries to restore from the last bank's snapshot and compares the restored bank to the
|
||||
// `last_slot` bank
|
||||
fn run_bank_forks_snapshot_n<F>(last_slot: u64, f: F)
|
||||
where
|
||||
F: Fn(&mut Bank, &Keypair),
|
||||
{
|
||||
solana_logger::setup();
|
||||
let accounts_dir = TempDir::new().unwrap();
|
||||
let snapshot_dir = TempDir::new().unwrap();
|
||||
@ -587,55 +571,107 @@ mod tests {
|
||||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(10_000);
|
||||
for index in 0..4 {
|
||||
let bank0 = Bank::new_with_paths(
|
||||
&genesis_block,
|
||||
Some(accounts_dir.path().to_str().unwrap().to_string()),
|
||||
);
|
||||
bank0.freeze();
|
||||
let mut bank_forks = BankForks::new(0, bank0);
|
||||
let snapshot_config = SnapshotConfig::new(
|
||||
PathBuf::from(snapshot_dir.path()),
|
||||
PathBuf::from(snapshot_output_path.path()),
|
||||
100,
|
||||
);
|
||||
bank_forks.set_snapshot_config(snapshot_config.clone());
|
||||
let bank0 = bank_forks.get(0).unwrap();
|
||||
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap();
|
||||
for forks in 0..index {
|
||||
let bank = Bank::new_from_parent(&bank_forks[forks], &Pubkey::default(), forks + 1);
|
||||
let key1 = Keypair::new().pubkey();
|
||||
let tx = system_transaction::create_user_account(
|
||||
&mint_keypair,
|
||||
&key1,
|
||||
1,
|
||||
genesis_block.hash(),
|
||||
);
|
||||
assert_eq!(bank.process_transaction(&tx), Ok(()));
|
||||
bank.freeze();
|
||||
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, &bank).unwrap();
|
||||
bank_forks.insert(bank);
|
||||
}
|
||||
// Generate a snapshot package for last bank
|
||||
let last_bank = bank_forks.get(index.saturating_sub(1)).unwrap();
|
||||
let names: Vec<_> = (0..=index).collect();
|
||||
let snapshot_package = snapshot_utils::package_snapshot(
|
||||
last_bank,
|
||||
&names,
|
||||
let bank0 = Bank::new_with_paths(
|
||||
&genesis_block,
|
||||
Some(accounts_dir.path().to_str().unwrap().to_string()),
|
||||
);
|
||||
bank0.freeze();
|
||||
let mut bank_forks = BankForks::new(0, bank0);
|
||||
let snapshot_config = SnapshotConfig::new(
|
||||
PathBuf::from(snapshot_dir.path()),
|
||||
PathBuf::from(snapshot_output_path.path()),
|
||||
100,
|
||||
);
|
||||
bank_forks.set_snapshot_config(snapshot_config.clone());
|
||||
let bank0 = bank_forks.get(0).unwrap();
|
||||
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, &vec![]).unwrap();
|
||||
for slot in 0..last_slot {
|
||||
let mut bank = Bank::new_from_parent(&bank_forks[slot], &Pubkey::default(), slot + 1);
|
||||
f(&mut bank, &mint_keypair);
|
||||
let bank = bank_forks.insert(bank);
|
||||
// set root to make sure we don't end up with too many account storage entries
|
||||
// and to allow purging logic on status_cache to kick in
|
||||
bank_forks.set_root(bank.slot(), &None);
|
||||
snapshot_utils::add_snapshot(
|
||||
&snapshot_config.snapshot_path,
|
||||
snapshot_utils::get_snapshot_tar_path(
|
||||
&snapshot_config.snapshot_package_output_path,
|
||||
),
|
||||
&bank,
|
||||
&(vec![bank.slot()]),
|
||||
)
|
||||
.unwrap();
|
||||
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
|
||||
|
||||
restore_from_snapshot(
|
||||
bank_forks,
|
||||
accounts_dir.path().to_str().unwrap().to_string(),
|
||||
index,
|
||||
);
|
||||
}
|
||||
// Generate a snapshot package for last bank
|
||||
let last_bank = bank_forks.get(last_slot).unwrap();
|
||||
let slot_snapshot_paths =
|
||||
snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path);
|
||||
let snapshot_package = snapshot_utils::package_snapshot(
|
||||
last_bank,
|
||||
&slot_snapshot_paths,
|
||||
snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path),
|
||||
)
|
||||
.unwrap();
|
||||
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
|
||||
|
||||
restore_from_snapshot(
|
||||
bank_forks,
|
||||
accounts_dir.path().to_str().unwrap().to_string(),
|
||||
last_slot,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_snapshot_n() {
|
||||
// create banks upto slot 4 and create 1 new account in each bank. test that bank 4 snapshots
|
||||
// and restores correctly
|
||||
run_bank_forks_snapshot_n(4, |bank, mint_keypair| {
|
||||
let key1 = Keypair::new().pubkey();
|
||||
let tx = system_transaction::create_user_account(
|
||||
&mint_keypair,
|
||||
&key1,
|
||||
1,
|
||||
bank.last_blockhash(),
|
||||
);
|
||||
assert_eq!(bank.process_transaction(&tx), Ok(()));
|
||||
bank.freeze();
|
||||
});
|
||||
}
|
||||
|
||||
fn goto_end_of_slot(bank: &mut Bank) {
|
||||
let mut tick_hash = bank.last_blockhash();
|
||||
loop {
|
||||
tick_hash = hashv(&[&tick_hash.as_ref(), &[42]]);
|
||||
bank.register_tick(&tick_hash);
|
||||
if tick_hash == bank.last_blockhash() {
|
||||
bank.freeze();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bank_forks_status_cache_snapshot_n() {
|
||||
// create banks upto slot 256 while transferring 1 lamport into 2 different accounts each time
|
||||
// this is done to ensure the AccountStorageEntries keep getting cleaned up as the root moves
|
||||
// ahead. Also tests the status_cache purge and status cache snapshotting.
|
||||
// Makes sure that the last bank is restored correctly
|
||||
let key1 = Keypair::new().pubkey();
|
||||
let key2 = Keypair::new().pubkey();
|
||||
run_bank_forks_snapshot_n(256, |bank, mint_keypair| {
|
||||
let tx = system_transaction::transfer(
|
||||
&mint_keypair,
|
||||
&key1,
|
||||
1,
|
||||
bank.parent().unwrap().last_blockhash(),
|
||||
);
|
||||
assert_eq!(bank.process_transaction(&tx), Ok(()));
|
||||
let tx = system_transaction::transfer(
|
||||
&mint_keypair,
|
||||
&key2,
|
||||
1,
|
||||
bank.parent().unwrap().last_blockhash(),
|
||||
);
|
||||
assert_eq!(bank.process_transaction(&tx), Ok(()));
|
||||
goto_end_of_slot(bank);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -668,7 +704,7 @@ mod tests {
|
||||
|
||||
// Take snapshot of zeroth bank
|
||||
let bank0 = bank_forks.get(0).unwrap();
|
||||
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap();
|
||||
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, &vec![]).unwrap();
|
||||
|
||||
// Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted
|
||||
// and the snapshot purging logic will run on every snapshot taken. This means the three
|
||||
@ -715,6 +751,7 @@ mod tests {
|
||||
bank_forks
|
||||
.generate_snapshot(
|
||||
slot,
|
||||
&vec![],
|
||||
&package_sender,
|
||||
snapshot_config
|
||||
.snapshot_package_output_path
|
||||
@ -732,10 +769,10 @@ mod tests {
|
||||
// Purge all the outdated snapshots, including the ones needed to generate the package
|
||||
// currently sitting in the channel
|
||||
bank_forks.purge_old_snapshots();
|
||||
let mut snapshot_names = snapshot_utils::get_snapshot_names(&snapshots_dir);
|
||||
snapshot_names.sort();
|
||||
let mut snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshots_dir);
|
||||
snapshot_paths.sort();
|
||||
assert_eq!(
|
||||
snapshot_names,
|
||||
snapshot_paths.iter().map(|path| path.slot).collect_vec(),
|
||||
(3..=MAX_CACHE_ENTRIES as u64 + 2).collect_vec()
|
||||
);
|
||||
|
||||
|
@ -3,16 +3,54 @@ use crate::snapshot_package::SnapshotPackage;
|
||||
use bincode::{deserialize_from, serialize_into};
|
||||
use flate2::read::GzDecoder;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::status_cache::SlotDelta;
|
||||
use solana_sdk::transaction;
|
||||
use std::cmp::Ordering;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tar::Archive;
|
||||
|
||||
pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
|
||||
|
||||
#[derive(PartialEq, Ord, Eq, Debug)]
|
||||
pub struct SlotSnapshotPaths {
|
||||
pub slot: u64,
|
||||
pub snapshot_file_path: PathBuf,
|
||||
pub snapshot_status_cache_path: PathBuf,
|
||||
}
|
||||
|
||||
impl PartialOrd for SlotSnapshotPaths {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.slot.cmp(&other.slot))
|
||||
}
|
||||
}
|
||||
|
||||
impl SlotSnapshotPaths {
|
||||
fn hardlink_snapshot_directory<P: AsRef<Path>>(&self, snapshot_hardlink_dir: P) -> Result<()> {
|
||||
// Create a new directory in snapshot_hardlink_dir
|
||||
let new_slot_hardlink_dir = snapshot_hardlink_dir.as_ref().join(self.slot.to_string());
|
||||
let _ = fs::remove_dir_all(&new_slot_hardlink_dir);
|
||||
fs::create_dir_all(&new_slot_hardlink_dir)?;
|
||||
|
||||
// Hardlink the snapshot
|
||||
fs::hard_link(
|
||||
&self.snapshot_file_path,
|
||||
&new_slot_hardlink_dir.join(self.slot.to_string()),
|
||||
)?;
|
||||
// Hardlink the status cache
|
||||
fs::hard_link(
|
||||
&self.snapshot_status_cache_path,
|
||||
&new_slot_hardlink_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn package_snapshot<Q: AsRef<Path>>(
|
||||
bank: &Bank,
|
||||
snapshot_names: &[u64],
|
||||
snapshot_dir: P,
|
||||
snapshot_files: &[SlotSnapshotPaths],
|
||||
snapshot_package_output_file: Q,
|
||||
) -> Result<SnapshotPackage> {
|
||||
let slot = bank.slot();
|
||||
@ -46,15 +84,15 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
|
||||
// Any errors from this point on will cause the above SnapshotPackage to drop, clearing
|
||||
// any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir)
|
||||
for name in snapshot_names {
|
||||
hardlink_snapshot_directory(&snapshot_dir, &snapshot_hard_links_dir, *name)?;
|
||||
for files in snapshot_files {
|
||||
files.hardlink_snapshot_directory(&snapshot_hard_links_dir)?;
|
||||
}
|
||||
|
||||
Ok(package)
|
||||
}
|
||||
|
||||
pub fn get_snapshot_names<P: AsRef<Path>>(snapshot_path: P) -> Vec<u64> {
|
||||
let paths = fs::read_dir(snapshot_path).expect("Invalid snapshot path");
|
||||
pub fn get_snapshot_paths<P: AsRef<Path>>(snapshot_path: P) -> Vec<SlotSnapshotPaths> {
|
||||
let paths = fs::read_dir(&snapshot_path).expect("Invalid snapshot path");
|
||||
let mut names = paths
|
||||
.filter_map(|entry| {
|
||||
entry.ok().and_then(|e| {
|
||||
@ -64,37 +102,64 @@ pub fn get_snapshot_names<P: AsRef<Path>>(snapshot_path: P) -> Vec<u64> {
|
||||
.unwrap_or(None)
|
||||
})
|
||||
})
|
||||
.collect::<Vec<u64>>();
|
||||
.map(|slot| {
|
||||
let snapshot_path = snapshot_path.as_ref().join(slot.to_string());
|
||||
SlotSnapshotPaths {
|
||||
slot,
|
||||
snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)),
|
||||
snapshot_status_cache_path: snapshot_path.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<SlotSnapshotPaths>>();
|
||||
|
||||
names.sort();
|
||||
names
|
||||
}
|
||||
|
||||
pub fn add_snapshot<P: AsRef<Path>>(snapshot_path: P, bank: &Bank) -> Result<()> {
|
||||
pub fn add_snapshot<P: AsRef<Path>>(
|
||||
snapshot_path: P,
|
||||
bank: &Bank,
|
||||
slots_since_snapshot: &[u64],
|
||||
) -> Result<()> {
|
||||
let slot = bank.slot();
|
||||
// snapshot_path/slot
|
||||
let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot);
|
||||
fs::create_dir_all(slot_snapshot_dir.clone()).map_err(Error::from)?;
|
||||
|
||||
// the snapshot is stored as snapshot_path/slot/slot
|
||||
let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot));
|
||||
// the status cache is stored as snapshot_path/slot/slot_satus_cache
|
||||
let snapshot_status_cache_file_path = slot_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
|
||||
info!(
|
||||
"creating snapshot {}, path: {:?}",
|
||||
"creating snapshot {}, path: {:?} status_cache: {:?}",
|
||||
bank.slot(),
|
||||
snapshot_file_path
|
||||
snapshot_file_path,
|
||||
snapshot_status_cache_file_path
|
||||
);
|
||||
let file = File::create(&snapshot_file_path)?;
|
||||
let mut stream = BufWriter::new(file);
|
||||
let snapshot_file = File::create(&snapshot_file_path)?;
|
||||
// snapshot writer
|
||||
let mut snapshot_stream = BufWriter::new(snapshot_file);
|
||||
let status_cache = File::create(&snapshot_status_cache_file_path)?;
|
||||
// status cache writer
|
||||
let mut status_cache_stream = BufWriter::new(status_cache);
|
||||
|
||||
// Create the snapshot
|
||||
serialize_into(&mut stream, &*bank).map_err(|e| get_io_error(&e.to_string()))?;
|
||||
serialize_into(&mut stream, &bank.rc).map_err(|e| get_io_error(&e.to_string()))?;
|
||||
// TODO: Add status cache serialization code
|
||||
/*serialize_into(&mut stream, &bank.src).map_err(|e| get_io_error(&e.to_string()))?;*/
|
||||
serialize_into(&mut snapshot_stream, &*bank).map_err(|e| get_io_error(&e.to_string()))?;
|
||||
serialize_into(&mut snapshot_stream, &bank.rc).map_err(|e| get_io_error(&e.to_string()))?;
|
||||
// write the status cache
|
||||
serialize_into(
|
||||
&mut status_cache_stream,
|
||||
&bank.src.slot_deltas(slots_since_snapshot),
|
||||
)
|
||||
.map_err(|_| get_io_error("serialize bank status cache error"))?;
|
||||
|
||||
info!(
|
||||
"successfully created snapshot {}, path: {:?}",
|
||||
"successfully created snapshot {}, path: {:?} status_cache: {:?}",
|
||||
bank.slot(),
|
||||
snapshot_file_path
|
||||
snapshot_file_path,
|
||||
snapshot_status_cache_file_path
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -105,25 +170,20 @@ pub fn remove_snapshot<P: AsRef<Path>>(slot: u64, snapshot_path: P) -> Result<()
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn bank_from_snapshots<P, Q>(
|
||||
pub fn bank_from_snapshots<P>(
|
||||
local_account_paths: String,
|
||||
snapshot_path: P,
|
||||
append_vecs_path: Q,
|
||||
snapshot_paths: &[SlotSnapshotPaths],
|
||||
append_vecs_path: P,
|
||||
) -> Result<Bank>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
Q: AsRef<Path>,
|
||||
{
|
||||
// Rebuild the last root bank
|
||||
let names = get_snapshot_names(&snapshot_path);
|
||||
let last_root = names
|
||||
let last_root_paths = snapshot_paths
|
||||
.last()
|
||||
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
|
||||
let snapshot_file_name = get_snapshot_file_name(*last_root);
|
||||
let snapshot_dir = get_bank_snapshot_dir(&snapshot_path, *last_root);
|
||||
let snapshot_file_path = snapshot_dir.join(&snapshot_file_name);
|
||||
info!("Load from {:?}", snapshot_file_path);
|
||||
let file = File::open(snapshot_file_path)?;
|
||||
info!("Load from {:?}", &last_root_paths.snapshot_file_path);
|
||||
let file = File::open(&last_root_paths.snapshot_file_path)?;
|
||||
let mut stream = BufReader::new(file);
|
||||
let bank: Bank = deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()))?;
|
||||
|
||||
@ -131,39 +191,15 @@ where
|
||||
bank.rc
|
||||
.accounts_from_stream(&mut stream, local_account_paths, append_vecs_path)?;
|
||||
|
||||
for bank_slot in names.iter().rev() {
|
||||
let snapshot_file_name = get_snapshot_file_name(*bank_slot);
|
||||
let snapshot_dir = get_bank_snapshot_dir(&snapshot_path, *bank_slot);
|
||||
let snapshot_file_path = snapshot_dir.join(snapshot_file_name.clone());
|
||||
let file = File::open(snapshot_file_path)?;
|
||||
let mut stream = BufReader::new(file);
|
||||
let _bank: Result<Bank> =
|
||||
deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()));
|
||||
// merge the status caches from all previous banks
|
||||
for slot_paths in snapshot_paths.iter().rev() {
|
||||
let status_cache = File::open(&slot_paths.snapshot_status_cache_path)?;
|
||||
let mut stream = BufReader::new(status_cache);
|
||||
let slot_deltas: Vec<SlotDelta<transaction::Result<()>>> = deserialize_from(&mut stream)
|
||||
.map_err(|_| get_io_error("deserialize root error"))
|
||||
.unwrap_or_default();
|
||||
|
||||
// TODO: Uncomment and deserialize status cache here
|
||||
|
||||
/*let status_cache: Result<StatusCacheRc> = deserialize_from(&mut stream)
|
||||
.map_err(|e| get_io_error(&e.to_string()));
|
||||
if bank_root.is_some() {
|
||||
match bank {
|
||||
Ok(v) => {
|
||||
if status_cache.is_ok() {
|
||||
let status_cache = status_cache?;
|
||||
status_cache_rc.append(&status_cache);
|
||||
// On the last snapshot, purge all outdated status cache
|
||||
// entries
|
||||
if i == names.len() - 1 {
|
||||
status_cache_rc.purge_roots();
|
||||
}
|
||||
}
|
||||
|
||||
bank_maps.push((*bank_slot, parent_slot, v));
|
||||
}
|
||||
Err(_) => warn!("Load snapshot failed for {}", bank_slot),
|
||||
}
|
||||
} else {
|
||||
warn!("Load snapshot rc failed for {}", bank_slot);
|
||||
}*/
|
||||
bank.src.append(&slot_deltas);
|
||||
}
|
||||
|
||||
Ok(bank)
|
||||
@ -184,28 +220,6 @@ pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn hardlink_snapshot_directory<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
snapshot_dir: P,
|
||||
snapshot_hardlink_dir: Q,
|
||||
slot: u64,
|
||||
) -> Result<()> {
|
||||
// Create a new directory in snapshot_hardlink_dir
|
||||
let new_slot_hardlink_dir = snapshot_hardlink_dir.as_ref().join(slot.to_string());
|
||||
let _ = fs::remove_dir_all(&new_slot_hardlink_dir);
|
||||
fs::create_dir_all(&new_slot_hardlink_dir)?;
|
||||
|
||||
// Hardlink the contents of the directory
|
||||
let snapshot_file = snapshot_dir
|
||||
.as_ref()
|
||||
.join(slot.to_string())
|
||||
.join(slot.to_string());
|
||||
fs::hard_link(
|
||||
&snapshot_file,
|
||||
&new_slot_hardlink_dir.join(slot.to_string()),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_snapshot_file_name(slot: u64) -> String {
|
||||
slot.to_string()
|
||||
}
|
||||
|
Reference in New Issue
Block a user