cache account hash info (#19426)
* cache account hash info * ledger_path -> accounts_hash_cache_path
This commit is contained in:
parent
dca49a614f
commit
b57e86abf2
@ -20,6 +20,7 @@ use solana_runtime::{
|
|||||||
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
|
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::{
|
use std::{
|
||||||
|
path::{Path, PathBuf},
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
mpsc::RecvTimeoutError,
|
mpsc::RecvTimeoutError,
|
||||||
@ -43,6 +44,7 @@ impl AccountsHashVerifier {
|
|||||||
halt_on_trusted_validators_accounts_hash_mismatch: bool,
|
halt_on_trusted_validators_accounts_hash_mismatch: bool,
|
||||||
fault_injection_rate_slots: u64,
|
fault_injection_rate_slots: u64,
|
||||||
snapshot_config: Option<SnapshotConfig>,
|
snapshot_config: Option<SnapshotConfig>,
|
||||||
|
ledger_path: PathBuf,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
let cluster_info = cluster_info.clone();
|
let cluster_info = cluster_info.clone();
|
||||||
@ -74,6 +76,7 @@ impl AccountsHashVerifier {
|
|||||||
fault_injection_rate_slots,
|
fault_injection_rate_slots,
|
||||||
snapshot_config.as_ref(),
|
snapshot_config.as_ref(),
|
||||||
thread_pool.as_ref(),
|
thread_pool.as_ref(),
|
||||||
|
&ledger_path,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Err(RecvTimeoutError::Disconnected) => break,
|
Err(RecvTimeoutError::Disconnected) => break,
|
||||||
@ -99,8 +102,9 @@ impl AccountsHashVerifier {
|
|||||||
fault_injection_rate_slots: u64,
|
fault_injection_rate_slots: u64,
|
||||||
snapshot_config: Option<&SnapshotConfig>,
|
snapshot_config: Option<&SnapshotConfig>,
|
||||||
thread_pool: Option<&ThreadPool>,
|
thread_pool: Option<&ThreadPool>,
|
||||||
|
ledger_path: &Path,
|
||||||
) {
|
) {
|
||||||
Self::verify_accounts_package_hash(&accounts_package, thread_pool);
|
Self::verify_accounts_package_hash(&accounts_package, thread_pool, ledger_path);
|
||||||
|
|
||||||
Self::push_accounts_hashes_to_cluster(
|
Self::push_accounts_hashes_to_cluster(
|
||||||
&accounts_package,
|
&accounts_package,
|
||||||
@ -118,11 +122,13 @@ impl AccountsHashVerifier {
|
|||||||
fn verify_accounts_package_hash(
|
fn verify_accounts_package_hash(
|
||||||
accounts_package: &AccountsPackage,
|
accounts_package: &AccountsPackage,
|
||||||
thread_pool: Option<&ThreadPool>,
|
thread_pool: Option<&ThreadPool>,
|
||||||
|
ledger_path: &Path,
|
||||||
) {
|
) {
|
||||||
let mut measure_hash = Measure::start("hash");
|
let mut measure_hash = Measure::start("hash");
|
||||||
if let Some(expected_hash) = accounts_package.hash_for_testing {
|
if let Some(expected_hash) = accounts_package.hash_for_testing {
|
||||||
let sorted_storages = SortedStorages::new(&accounts_package.snapshot_storages);
|
let sorted_storages = SortedStorages::new(&accounts_package.snapshot_storages);
|
||||||
let (hash, lamports) = AccountsDb::calculate_accounts_hash_without_index(
|
let (hash, lamports) = AccountsDb::calculate_accounts_hash_without_index(
|
||||||
|
ledger_path,
|
||||||
&sorted_storages,
|
&sorted_storages,
|
||||||
thread_pool,
|
thread_pool,
|
||||||
HashStats::default(),
|
HashStats::default(),
|
||||||
@ -357,6 +363,8 @@ mod tests {
|
|||||||
snapshot_type: None,
|
snapshot_type: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let ledger_path = TempDir::new().unwrap();
|
||||||
|
|
||||||
AccountsHashVerifier::process_accounts_package(
|
AccountsHashVerifier::process_accounts_package(
|
||||||
accounts_package,
|
accounts_package,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
@ -368,6 +376,7 @@ mod tests {
|
|||||||
0,
|
0,
|
||||||
Some(&snapshot_config),
|
Some(&snapshot_config),
|
||||||
None,
|
None,
|
||||||
|
ledger_path.path(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// sleep for 1ms to create a newer timestmap for gossip entry
|
// sleep for 1ms to create a newer timestmap for gossip entry
|
||||||
|
@ -224,6 +224,7 @@ impl Tvu {
|
|||||||
tvu_config.halt_on_trusted_validators_accounts_hash_mismatch,
|
tvu_config.halt_on_trusted_validators_accounts_hash_mismatch,
|
||||||
tvu_config.accounts_hash_fault_injection_slots,
|
tvu_config.accounts_hash_fault_injection_slots,
|
||||||
snapshot_config.clone(),
|
snapshot_config.clone(),
|
||||||
|
blockstore.ledger_path().to_path_buf(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let (snapshot_request_sender, snapshot_request_handler) = match snapshot_config {
|
let (snapshot_request_sender, snapshot_request_handler) = match snapshot_config {
|
||||||
|
@ -928,6 +928,7 @@ mod tests {
|
|||||||
snapshot_test_config.snapshot_config.clone(),
|
snapshot_test_config.snapshot_config.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let tmpdir = TempDir::new().unwrap();
|
||||||
let accounts_hash_verifier = AccountsHashVerifier::new(
|
let accounts_hash_verifier = AccountsHashVerifier::new(
|
||||||
accounts_package_receiver,
|
accounts_package_receiver,
|
||||||
Some(pending_snapshot_package),
|
Some(pending_snapshot_package),
|
||||||
@ -937,6 +938,7 @@ mod tests {
|
|||||||
false,
|
false,
|
||||||
0,
|
0,
|
||||||
Some(snapshot_test_config.snapshot_config.clone()),
|
Some(snapshot_test_config.snapshot_config.clone()),
|
||||||
|
tmpdir.path().to_path_buf(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let accounts_background_service = AccountsBackgroundService::new(
|
let accounts_background_service = AccountsBackgroundService::new(
|
||||||
|
@ -1893,8 +1893,10 @@ fn main() {
|
|||||||
.ok()
|
.ok()
|
||||||
.map(|bins| AccountsIndexConfig { bins: Some(bins) });
|
.map(|bins| AccountsIndexConfig { bins: Some(bins) });
|
||||||
|
|
||||||
let accounts_db_config =
|
let accounts_db_config = Some(AccountsDbConfig {
|
||||||
accounts_index_config.map(|x| AccountsDbConfig { index: Some(x) });
|
index: accounts_index_config,
|
||||||
|
accounts_hash_cache_path: Some(ledger_path.clone()),
|
||||||
|
});
|
||||||
|
|
||||||
let process_options = ProcessOptions {
|
let process_options = ProcessOptions {
|
||||||
dev_halt_at_slot: value_t!(arg_matches, "halt_at_slot", Slot).ok(),
|
dev_halt_at_slot: value_t!(arg_matches, "halt_at_slot", Slot).ok(),
|
||||||
|
@ -29,6 +29,7 @@ use crate::{
|
|||||||
},
|
},
|
||||||
ancestors::Ancestors,
|
ancestors::Ancestors,
|
||||||
append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion},
|
append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion},
|
||||||
|
cache_hash_data::CacheHashData,
|
||||||
contains::Contains,
|
contains::Contains,
|
||||||
pubkey_bins::PubkeyBinCalculator16,
|
pubkey_bins::PubkeyBinCalculator16,
|
||||||
read_only_accounts_cache::ReadOnlyAccountsCache,
|
read_only_accounts_cache::ReadOnlyAccountsCache,
|
||||||
@ -61,6 +62,7 @@ use std::{
|
|||||||
boxed::Box,
|
boxed::Box,
|
||||||
collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
|
collections::{hash_map::Entry, BTreeSet, HashMap, HashSet},
|
||||||
convert::TryFrom,
|
convert::TryFrom,
|
||||||
|
hash::{Hash as StdHash, Hasher as StdHasher},
|
||||||
io::{Error as IoError, Result as IoResult},
|
io::{Error as IoError, Result as IoResult},
|
||||||
ops::{Range, RangeBounds},
|
ops::{Range, RangeBounds},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
@ -101,7 +103,7 @@ pub const BINS_PER_PASS: usize = PUBKEY_BINS_FOR_CALCULATING_HASHES / NUM_SCAN_P
|
|||||||
// If this is too big, we don't get enough parallelism of scanning storages.
|
// If this is too big, we don't get enough parallelism of scanning storages.
|
||||||
// If this is too small, then we produce too many output vectors to iterate.
|
// If this is too small, then we produce too many output vectors to iterate.
|
||||||
// Metrics indicate a sweet spot in the 2.5k-5k range for mnb.
|
// Metrics indicate a sweet spot in the 2.5k-5k range for mnb.
|
||||||
const MAX_ITEMS_PER_CHUNK: Slot = 5_000;
|
const MAX_ITEMS_PER_CHUNK: Slot = 2_500;
|
||||||
|
|
||||||
// A specially reserved storage id just for entries in the cache, so that
|
// A specially reserved storage id just for entries in the cache, so that
|
||||||
// operations that take a storage entry can maintain a common interface
|
// operations that take a storage entry can maintain a common interface
|
||||||
@ -124,9 +126,11 @@ const CACHE_VIRTUAL_STORED_SIZE: usize = 0;
|
|||||||
|
|
||||||
pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
|
pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
|
||||||
index: Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING),
|
index: Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING),
|
||||||
|
accounts_hash_cache_path: None,
|
||||||
};
|
};
|
||||||
pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig {
|
pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig {
|
||||||
index: Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS),
|
index: Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS),
|
||||||
|
accounts_hash_cache_path: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub type BinnedHashData = Vec<Vec<CalculateHashIntermediate>>;
|
pub type BinnedHashData = Vec<Vec<CalculateHashIntermediate>>;
|
||||||
@ -134,6 +138,7 @@ pub type BinnedHashData = Vec<Vec<CalculateHashIntermediate>>;
|
|||||||
#[derive(Debug, Default, Clone)]
|
#[derive(Debug, Default, Clone)]
|
||||||
pub struct AccountsDbConfig {
|
pub struct AccountsDbConfig {
|
||||||
pub index: Option<AccountsIndexConfig>,
|
pub index: Option<AccountsIndexConfig>,
|
||||||
|
pub accounts_hash_cache_path: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct FoundStoredAccount<'a> {
|
struct FoundStoredAccount<'a> {
|
||||||
@ -956,6 +961,12 @@ pub struct AccountsDb {
|
|||||||
/// Set of storage paths to pick from
|
/// Set of storage paths to pick from
|
||||||
pub(crate) paths: Vec<PathBuf>,
|
pub(crate) paths: Vec<PathBuf>,
|
||||||
|
|
||||||
|
accounts_hash_cache_path: PathBuf,
|
||||||
|
|
||||||
|
// used by tests
|
||||||
|
// holds this until we are dropped
|
||||||
|
temp_accounts_hash_cache_path: Option<TempDir>,
|
||||||
|
|
||||||
pub shrink_paths: RwLock<Option<Vec<PathBuf>>>,
|
pub shrink_paths: RwLock<Option<Vec<PathBuf>>>,
|
||||||
|
|
||||||
/// Directory of paths this accounts_db needs to hold/remove
|
/// Directory of paths this accounts_db needs to hold/remove
|
||||||
@ -1423,13 +1434,26 @@ type GenerateIndexAccountsMap<'a> = HashMap<Pubkey, IndexAccountMapEntry<'a>>;
|
|||||||
|
|
||||||
impl AccountsDb {
|
impl AccountsDb {
|
||||||
pub fn default_for_tests() -> Self {
|
pub fn default_for_tests() -> Self {
|
||||||
Self::default_with_accounts_index(AccountInfoAccountsIndex::default_for_tests())
|
Self::default_with_accounts_index(AccountInfoAccountsIndex::default_for_tests(), None)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_with_accounts_index(accounts_index: AccountInfoAccountsIndex) -> Self {
|
fn default_with_accounts_index(
|
||||||
|
accounts_index: AccountInfoAccountsIndex,
|
||||||
|
accounts_hash_cache_path: Option<PathBuf>,
|
||||||
|
) -> Self {
|
||||||
let num_threads = get_thread_count();
|
let num_threads = get_thread_count();
|
||||||
const MAX_READ_ONLY_CACHE_DATA_SIZE: usize = 200_000_000;
|
const MAX_READ_ONLY_CACHE_DATA_SIZE: usize = 200_000_000;
|
||||||
|
|
||||||
|
let mut temp_accounts_hash_cache_path = None;
|
||||||
|
let accounts_hash_cache_path = accounts_hash_cache_path.unwrap_or_else(|| {
|
||||||
|
temp_accounts_hash_cache_path = Some(TempDir::new().unwrap());
|
||||||
|
temp_accounts_hash_cache_path
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.path()
|
||||||
|
.to_path_buf()
|
||||||
|
});
|
||||||
|
|
||||||
let mut bank_hashes = HashMap::new();
|
let mut bank_hashes = HashMap::new();
|
||||||
bank_hashes.insert(0, BankHashInfo::default());
|
bank_hashes.insert(0, BankHashInfo::default());
|
||||||
AccountsDb {
|
AccountsDb {
|
||||||
@ -1445,6 +1469,8 @@ impl AccountsDb {
|
|||||||
shrink_candidate_slots: Mutex::new(HashMap::new()),
|
shrink_candidate_slots: Mutex::new(HashMap::new()),
|
||||||
write_version: AtomicU64::new(0),
|
write_version: AtomicU64::new(0),
|
||||||
paths: vec![],
|
paths: vec![],
|
||||||
|
accounts_hash_cache_path,
|
||||||
|
temp_accounts_hash_cache_path,
|
||||||
shrink_paths: RwLock::new(None),
|
shrink_paths: RwLock::new(None),
|
||||||
temp_paths: None,
|
temp_paths: None,
|
||||||
file_size: DEFAULT_FILE_SIZE,
|
file_size: DEFAULT_FILE_SIZE,
|
||||||
@ -1495,7 +1521,11 @@ impl AccountsDb {
|
|||||||
shrink_ratio: AccountShrinkThreshold,
|
shrink_ratio: AccountShrinkThreshold,
|
||||||
accounts_db_config: Option<AccountsDbConfig>,
|
accounts_db_config: Option<AccountsDbConfig>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let accounts_index = AccountsIndex::new(accounts_db_config.and_then(|x| x.index));
|
let accounts_index =
|
||||||
|
AccountsIndex::new(accounts_db_config.as_ref().and_then(|x| x.index.clone()));
|
||||||
|
let accounts_hash_cache_path = accounts_db_config
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|x| x.accounts_hash_cache_path.clone());
|
||||||
let mut new = if !paths.is_empty() {
|
let mut new = if !paths.is_empty() {
|
||||||
Self {
|
Self {
|
||||||
paths,
|
paths,
|
||||||
@ -1504,7 +1534,7 @@ impl AccountsDb {
|
|||||||
account_indexes,
|
account_indexes,
|
||||||
caching_enabled,
|
caching_enabled,
|
||||||
shrink_ratio,
|
shrink_ratio,
|
||||||
..Self::default_with_accounts_index(accounts_index)
|
..Self::default_with_accounts_index(accounts_index, accounts_hash_cache_path)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Create a temporary set of accounts directories, used primarily
|
// Create a temporary set of accounts directories, used primarily
|
||||||
@ -1517,7 +1547,7 @@ impl AccountsDb {
|
|||||||
account_indexes,
|
account_indexes,
|
||||||
caching_enabled,
|
caching_enabled,
|
||||||
shrink_ratio,
|
shrink_ratio,
|
||||||
..Self::default_with_accounts_index(accounts_index)
|
..Self::default_with_accounts_index(accounts_index, accounts_hash_cache_path)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -4939,7 +4969,8 @@ impl AccountsDb {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Scan through all the account storage in parallel
|
/// Scan through all the account storage in parallel
|
||||||
fn scan_account_storage_no_bank<F, F2, B, C>(
|
fn scan_account_storage_no_bank<F, F2>(
|
||||||
|
cache_hash_data: &CacheHashData,
|
||||||
accounts_cache_and_ancestors: Option<(
|
accounts_cache_and_ancestors: Option<(
|
||||||
&AccountsCache,
|
&AccountsCache,
|
||||||
&Ancestors,
|
&Ancestors,
|
||||||
@ -4948,32 +4979,113 @@ impl AccountsDb {
|
|||||||
snapshot_storages: &SortedStorages,
|
snapshot_storages: &SortedStorages,
|
||||||
scan_func: F,
|
scan_func: F,
|
||||||
after_func: F2,
|
after_func: F2,
|
||||||
) -> Vec<C>
|
bin_range: &Range<usize>,
|
||||||
|
bin_calculator: &PubkeyBinCalculator16,
|
||||||
|
) -> Vec<BinnedHashData>
|
||||||
where
|
where
|
||||||
F: Fn(LoadedAccount, &mut B, Slot) + Send + Sync,
|
F: Fn(LoadedAccount, &mut BinnedHashData, Slot) + Send + Sync,
|
||||||
F2: Fn(B) -> C + Send + Sync,
|
F2: Fn(BinnedHashData) -> BinnedHashData + Send + Sync,
|
||||||
B: Send + Default,
|
|
||||||
C: Send + Default,
|
|
||||||
{
|
{
|
||||||
let chunks = 1 + (snapshot_storages.range_width() as Slot / MAX_ITEMS_PER_CHUNK);
|
let start_bin_index = bin_range.start;
|
||||||
|
|
||||||
|
let width = snapshot_storages.range_width();
|
||||||
|
// 2 is for 2 special chunks - unaligned slots at the beginning and end
|
||||||
|
let chunks = 2 + (width as Slot / MAX_ITEMS_PER_CHUNK);
|
||||||
|
let range = snapshot_storages.range();
|
||||||
|
let slot0 = range.start;
|
||||||
|
let first_boundary =
|
||||||
|
((slot0 + MAX_ITEMS_PER_CHUNK) / MAX_ITEMS_PER_CHUNK) * MAX_ITEMS_PER_CHUNK;
|
||||||
(0..chunks)
|
(0..chunks)
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|chunk| {
|
.map(|chunk| {
|
||||||
let mut retval = B::default();
|
let mut retval = vec![];
|
||||||
let start = snapshot_storages.range().start + chunk * MAX_ITEMS_PER_CHUNK;
|
// calculate start, end
|
||||||
let end = std::cmp::min(start + MAX_ITEMS_PER_CHUNK, snapshot_storages.range().end);
|
let (start, mut end) = if chunk == 0 {
|
||||||
|
if slot0 == first_boundary {
|
||||||
|
return after_func(retval); // if we evenly divide, nothing for special chunk 0 to do
|
||||||
|
}
|
||||||
|
// otherwise first chunk is not 'full'
|
||||||
|
(slot0, first_boundary)
|
||||||
|
} else {
|
||||||
|
// normal chunk in the middle or at the end
|
||||||
|
let start = first_boundary + MAX_ITEMS_PER_CHUNK * (chunk - 1);
|
||||||
|
let end = start + MAX_ITEMS_PER_CHUNK;
|
||||||
|
(start, end)
|
||||||
|
};
|
||||||
|
end = std::cmp::min(end, range.end);
|
||||||
|
if start == end {
|
||||||
|
return after_func(retval);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut file_name = String::default();
|
||||||
|
if accounts_cache_and_ancestors.is_none()
|
||||||
|
&& end.saturating_sub(start) == MAX_ITEMS_PER_CHUNK
|
||||||
|
{
|
||||||
|
let mut load_from_cache = true;
|
||||||
|
let mut hasher = std::collections::hash_map::DefaultHasher::new(); // wrong one?
|
||||||
|
|
||||||
|
for slot in start..end {
|
||||||
|
let sub_storages = snapshot_storages.get(slot);
|
||||||
|
bin_range.start.hash(&mut hasher);
|
||||||
|
bin_range.end.hash(&mut hasher);
|
||||||
|
if let Some(sub_storages) = sub_storages {
|
||||||
|
if sub_storages.len() > 1 {
|
||||||
|
load_from_cache = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let storage_file = sub_storages.first().unwrap().accounts.get_path();
|
||||||
|
slot.hash(&mut hasher);
|
||||||
|
storage_file.hash(&mut hasher);
|
||||||
|
// check alive_bytes, etc. here?
|
||||||
|
let amod = std::fs::metadata(storage_file);
|
||||||
|
if amod.is_err() {
|
||||||
|
load_from_cache = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let amod = amod.unwrap().modified();
|
||||||
|
if amod.is_err() {
|
||||||
|
load_from_cache = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let amod = amod
|
||||||
|
.unwrap()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs();
|
||||||
|
amod.hash(&mut hasher);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if load_from_cache {
|
||||||
|
// we have a hash value for all the storages in this slot
|
||||||
|
// so, build a file name:
|
||||||
|
let hash = hasher.finish();
|
||||||
|
file_name = format!(
|
||||||
|
"{}.{}.{}.{}.{}",
|
||||||
|
start, end, bin_range.start, bin_range.end, hash
|
||||||
|
);
|
||||||
|
if retval.is_empty() {
|
||||||
|
let range = bin_range.end - bin_range.start;
|
||||||
|
retval.append(&mut vec![Vec::new(); range]);
|
||||||
|
}
|
||||||
|
if cache_hash_data
|
||||||
|
.load(
|
||||||
|
&Path::new(&file_name),
|
||||||
|
&mut retval,
|
||||||
|
start_bin_index,
|
||||||
|
bin_calculator,
|
||||||
|
)
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
|
||||||
|
// fall through and load normally - we failed to load
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for slot in start..end {
|
for slot in start..end {
|
||||||
let sub_storages = snapshot_storages.get(slot);
|
let sub_storages = snapshot_storages.get(slot);
|
||||||
let mut valid_slot = false;
|
let valid_slot = sub_storages.is_some();
|
||||||
if let Some(sub_storages) = sub_storages {
|
|
||||||
valid_slot = true;
|
|
||||||
Self::scan_multiple_account_storages_one_slot(
|
|
||||||
sub_storages,
|
|
||||||
&scan_func,
|
|
||||||
slot,
|
|
||||||
&mut retval,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if let Some((cache, ancestors, accounts_index)) = accounts_cache_and_ancestors {
|
if let Some((cache, ancestors, accounts_index)) = accounts_cache_and_ancestors {
|
||||||
if let Some(slot_cache) = cache.slot_cache(slot) {
|
if let Some(slot_cache) = cache.slot_cache(slot) {
|
||||||
if valid_slot
|
if valid_slot
|
||||||
@ -4994,9 +5106,30 @@ impl AccountsDb {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(sub_storages) = sub_storages {
|
||||||
|
Self::scan_multiple_account_storages_one_slot(
|
||||||
|
sub_storages,
|
||||||
|
&scan_func,
|
||||||
|
slot,
|
||||||
|
&mut retval,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
after_func(retval)
|
let r = after_func(retval);
|
||||||
|
if !file_name.is_empty() {
|
||||||
|
let result = cache_hash_data.save(Path::new(&file_name), &r);
|
||||||
|
|
||||||
|
if result.is_err() {
|
||||||
|
info!(
|
||||||
|
"FAILED_TO_SAVE: {}-{}, {}, first_boundary: {}, {:?}",
|
||||||
|
range.start, range.end, width, first_boundary, file_name,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r
|
||||||
})
|
})
|
||||||
|
.filter(|x| !x.is_empty())
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -5057,6 +5190,7 @@ impl AccountsDb {
|
|||||||
};
|
};
|
||||||
|
|
||||||
Self::calculate_accounts_hash_without_index(
|
Self::calculate_accounts_hash_without_index(
|
||||||
|
&self.accounts_hash_cache_path,
|
||||||
&storages,
|
&storages,
|
||||||
Some(&self.thread_pool_clean),
|
Some(&self.thread_pool_clean),
|
||||||
timings,
|
timings,
|
||||||
@ -5136,6 +5270,7 @@ impl AccountsDb {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn scan_snapshot_stores_with_cache(
|
fn scan_snapshot_stores_with_cache(
|
||||||
|
cache_hash_data: &CacheHashData,
|
||||||
storage: &SortedStorages,
|
storage: &SortedStorages,
|
||||||
mut stats: &mut crate::accounts_hash::HashStats,
|
mut stats: &mut crate::accounts_hash::HashStats,
|
||||||
bins: usize,
|
bins: usize,
|
||||||
@ -5156,6 +5291,7 @@ impl AccountsDb {
|
|||||||
let sort_time = AtomicU64::new(0);
|
let sort_time = AtomicU64::new(0);
|
||||||
|
|
||||||
let result: Vec<BinnedHashData> = Self::scan_account_storage_no_bank(
|
let result: Vec<BinnedHashData> = Self::scan_account_storage_no_bank(
|
||||||
|
cache_hash_data,
|
||||||
accounts_cache_and_ancestors,
|
accounts_cache_and_ancestors,
|
||||||
storage,
|
storage,
|
||||||
|loaded_account: LoadedAccount, accum: &mut BinnedHashData, slot: Slot| {
|
|loaded_account: LoadedAccount, accum: &mut BinnedHashData, slot: Slot| {
|
||||||
@ -5189,10 +5325,8 @@ impl AccountsDb {
|
|||||||
mismatch_found.fetch_add(1, Ordering::Relaxed);
|
mismatch_found.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if accum.is_empty() {
|
||||||
let max = accum.len();
|
accum.append(&mut vec![Vec::new(); range]);
|
||||||
if max == 0 {
|
|
||||||
accum.extend(vec![Vec::new(); range]);
|
|
||||||
}
|
}
|
||||||
accum[pubkey_to_bin_index].push(source_item);
|
accum[pubkey_to_bin_index].push(source_item);
|
||||||
},
|
},
|
||||||
@ -5201,6 +5335,8 @@ impl AccountsDb {
|
|||||||
sort_time.fetch_add(timing, Ordering::Relaxed);
|
sort_time.fetch_add(timing, Ordering::Relaxed);
|
||||||
result
|
result
|
||||||
},
|
},
|
||||||
|
bin_range,
|
||||||
|
&bin_calculator,
|
||||||
);
|
);
|
||||||
|
|
||||||
stats.sort_time_total_us += sort_time.load(Ordering::Relaxed);
|
stats.sort_time_total_us += sort_time.load(Ordering::Relaxed);
|
||||||
@ -5242,6 +5378,7 @@ impl AccountsDb {
|
|||||||
// modeled after get_accounts_delta_hash
|
// modeled after get_accounts_delta_hash
|
||||||
// intended to be faster than calculate_accounts_hash
|
// intended to be faster than calculate_accounts_hash
|
||||||
pub fn calculate_accounts_hash_without_index(
|
pub fn calculate_accounts_hash_without_index(
|
||||||
|
accounts_hash_cache_path: &Path,
|
||||||
storages: &SortedStorages,
|
storages: &SortedStorages,
|
||||||
thread_pool: Option<&ThreadPool>,
|
thread_pool: Option<&ThreadPool>,
|
||||||
mut stats: HashStats,
|
mut stats: HashStats,
|
||||||
@ -5260,6 +5397,8 @@ impl AccountsDb {
|
|||||||
let mut previous_pass = PreviousPass::default();
|
let mut previous_pass = PreviousPass::default();
|
||||||
let mut final_result = (Hash::default(), 0);
|
let mut final_result = (Hash::default(), 0);
|
||||||
|
|
||||||
|
let cache_hash_data = CacheHashData::new(&accounts_hash_cache_path);
|
||||||
|
|
||||||
for pass in 0..NUM_SCAN_PASSES {
|
for pass in 0..NUM_SCAN_PASSES {
|
||||||
let bounds = Range {
|
let bounds = Range {
|
||||||
start: pass * BINS_PER_PASS,
|
start: pass * BINS_PER_PASS,
|
||||||
@ -5267,6 +5406,7 @@ impl AccountsDb {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let result = Self::scan_snapshot_stores_with_cache(
|
let result = Self::scan_snapshot_stores_with_cache(
|
||||||
|
&cache_hash_data,
|
||||||
storages,
|
storages,
|
||||||
&mut stats,
|
&mut stats,
|
||||||
PUBKEY_BINS_FOR_CALCULATING_HASHES,
|
PUBKEY_BINS_FOR_CALCULATING_HASHES,
|
||||||
@ -6753,7 +6893,17 @@ pub mod tests {
|
|||||||
bin_range: &Range<usize>,
|
bin_range: &Range<usize>,
|
||||||
check_hash: bool,
|
check_hash: bool,
|
||||||
) -> Result<Vec<BinnedHashData>, BankHashVerificationError> {
|
) -> Result<Vec<BinnedHashData>, BankHashVerificationError> {
|
||||||
Self::scan_snapshot_stores_with_cache(storage, stats, bins, bin_range, check_hash, None)
|
let temp_dir = TempDir::new().unwrap();
|
||||||
|
let accounts_hash_cache_path = temp_dir.path();
|
||||||
|
Self::scan_snapshot_stores_with_cache(
|
||||||
|
&CacheHashData::new(&accounts_hash_cache_path),
|
||||||
|
storage,
|
||||||
|
stats,
|
||||||
|
bins,
|
||||||
|
bin_range,
|
||||||
|
check_hash,
|
||||||
|
None,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -6960,7 +7110,8 @@ pub mod tests {
|
|||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(result.len(), 2); // 2 chunks
|
assert_eq!(result.len(), 2); // 2 chunks
|
||||||
assert_eq!(result[0].len(), 0); // nothing found in first slots
|
assert_eq!(result[0].len(), bins);
|
||||||
|
assert_eq!(0, result[0].iter().map(|x| x.len()).sum::<usize>()); // nothing found in bin 0
|
||||||
assert_eq!(result[1].len(), bins);
|
assert_eq!(result[1].len(), bins);
|
||||||
assert_eq!(result[1], vec![raw_expected]);
|
assert_eq!(result[1], vec![raw_expected]);
|
||||||
}
|
}
|
||||||
@ -7029,6 +7180,7 @@ pub mod tests {
|
|||||||
|
|
||||||
let bins = 256;
|
let bins = 256;
|
||||||
let bin_locations = vec![0, 127, 128, 255];
|
let bin_locations = vec![0, 127, 128, 255];
|
||||||
|
let range = 1;
|
||||||
for bin in 0..bins {
|
for bin in 0..bins {
|
||||||
let result = AccountsDb::scan_snapshot_stores(
|
let result = AccountsDb::scan_snapshot_stores(
|
||||||
&get_storage_refs(&storages),
|
&get_storage_refs(&storages),
|
||||||
@ -7036,17 +7188,17 @@ pub mod tests {
|
|||||||
bins,
|
bins,
|
||||||
&Range {
|
&Range {
|
||||||
start: bin,
|
start: bin,
|
||||||
end: bin + 1,
|
end: bin + range,
|
||||||
},
|
},
|
||||||
false,
|
false,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mut expected = vec![];
|
let mut expected = vec![];
|
||||||
if let Some(index) = bin_locations.iter().position(|&r| r == bin) {
|
if let Some(index) = bin_locations.iter().position(|&r| r == bin) {
|
||||||
expected = vec![Vec::new(); 1];
|
expected = vec![vec![Vec::new(); range]];
|
||||||
expected[0].push(raw_expected[index].clone());
|
expected[0][0].push(raw_expected[index].clone());
|
||||||
}
|
}
|
||||||
assert_eq!(result, vec![expected]);
|
assert_eq!(result, expected);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -7063,20 +7215,23 @@ pub mod tests {
|
|||||||
SortedStorages::new_debug(&storage_data[..], 0, MAX_ITEMS_PER_CHUNK as usize + 1);
|
SortedStorages::new_debug(&storage_data[..], 0, MAX_ITEMS_PER_CHUNK as usize + 1);
|
||||||
|
|
||||||
let mut stats = HashStats::default();
|
let mut stats = HashStats::default();
|
||||||
|
let range = 1;
|
||||||
|
let start = 127;
|
||||||
let result = AccountsDb::scan_snapshot_stores(
|
let result = AccountsDb::scan_snapshot_stores(
|
||||||
&sorted_storages,
|
&sorted_storages,
|
||||||
&mut stats,
|
&mut stats,
|
||||||
bins,
|
bins,
|
||||||
&Range {
|
&Range {
|
||||||
start: 127,
|
start,
|
||||||
end: 128,
|
end: start + range,
|
||||||
},
|
},
|
||||||
false,
|
false,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(result.len(), 2); // 2 chunks
|
assert_eq!(result.len(), 2); // 2 chunks
|
||||||
assert_eq!(result[0].len(), 0); // nothing found in first slots
|
assert_eq!(result[0].len(), range);
|
||||||
let mut expected = vec![Vec::new(); 1];
|
assert_eq!(0, result[0].iter().map(|x| x.len()).sum::<usize>()); // nothing found in bin 0
|
||||||
|
let mut expected = vec![Vec::new(); range];
|
||||||
expected[0].push(raw_expected[1].clone());
|
expected[0].push(raw_expected[1].clone());
|
||||||
assert_eq!(result[1].len(), 1);
|
assert_eq!(result[1].len(), 1);
|
||||||
assert_eq!(result[1], expected);
|
assert_eq!(result[1], expected);
|
||||||
@ -7088,6 +7243,7 @@ pub mod tests {
|
|||||||
|
|
||||||
let (storages, _size, _slot_expected) = sample_storage();
|
let (storages, _size, _slot_expected) = sample_storage();
|
||||||
let result = AccountsDb::calculate_accounts_hash_without_index(
|
let result = AccountsDb::calculate_accounts_hash_without_index(
|
||||||
|
TempDir::new().unwrap().path(),
|
||||||
&get_storage_refs(&storages),
|
&get_storage_refs(&storages),
|
||||||
None,
|
None,
|
||||||
HashStats::default(),
|
HashStats::default(),
|
||||||
@ -7110,6 +7266,7 @@ pub mod tests {
|
|||||||
});
|
});
|
||||||
let sum = raw_expected.iter().map(|item| item.lamports).sum();
|
let sum = raw_expected.iter().map(|item| item.lamports).sum();
|
||||||
let result = AccountsDb::calculate_accounts_hash_without_index(
|
let result = AccountsDb::calculate_accounts_hash_without_index(
|
||||||
|
TempDir::new().unwrap().path(),
|
||||||
&get_storage_refs(&storages),
|
&get_storage_refs(&storages),
|
||||||
None,
|
None,
|
||||||
HashStats::default(),
|
HashStats::default(),
|
||||||
@ -7161,19 +7318,35 @@ pub mod tests {
|
|||||||
.append_accounts(&[(sm, Some(&acc))], &[&Hash::default()]);
|
.append_accounts(&[(sm, Some(&acc))], &[&Hash::default()]);
|
||||||
|
|
||||||
let calls = AtomicU64::new(0);
|
let calls = AtomicU64::new(0);
|
||||||
|
let temp_dir = TempDir::new().unwrap();
|
||||||
|
let accounts_hash_cache_path = temp_dir.path();
|
||||||
let result = AccountsDb::scan_account_storage_no_bank(
|
let result = AccountsDb::scan_account_storage_no_bank(
|
||||||
|
&CacheHashData::new(&accounts_hash_cache_path),
|
||||||
None,
|
None,
|
||||||
&get_storage_refs(&storages),
|
&get_storage_refs(&storages),
|
||||||
|loaded_account: LoadedAccount, accum: &mut Vec<u64>, slot: Slot| {
|
|loaded_account: LoadedAccount, accum: &mut BinnedHashData, slot: Slot| {
|
||||||
calls.fetch_add(1, Ordering::Relaxed);
|
calls.fetch_add(1, Ordering::Relaxed);
|
||||||
assert_eq!(loaded_account.pubkey(), &pubkey);
|
assert_eq!(loaded_account.pubkey(), &pubkey);
|
||||||
assert_eq!(slot_expected, slot);
|
assert_eq!(slot_expected, slot);
|
||||||
accum.push(expected);
|
accum.push(vec![CalculateHashIntermediate::new(
|
||||||
|
Hash::default(),
|
||||||
|
expected,
|
||||||
|
pubkey,
|
||||||
|
)]);
|
||||||
},
|
},
|
||||||
|a| a,
|
|a| a,
|
||||||
|
&Range { start: 0, end: 1 },
|
||||||
|
&PubkeyBinCalculator16::new(1),
|
||||||
);
|
);
|
||||||
assert_eq!(calls.load(Ordering::Relaxed), 1);
|
assert_eq!(calls.load(Ordering::Relaxed), 1);
|
||||||
assert_eq!(result, vec![vec![expected]]);
|
assert_eq!(
|
||||||
|
result,
|
||||||
|
vec![vec![vec![CalculateHashIntermediate::new(
|
||||||
|
Hash::default(),
|
||||||
|
expected,
|
||||||
|
pubkey
|
||||||
|
)]]]
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
437
runtime/src/cache_hash_data.rs
Normal file
437
runtime/src/cache_hash_data.rs
Normal file
@ -0,0 +1,437 @@
|
|||||||
|
//! Cached data for hashing accounts
|
||||||
|
use crate::accounts_hash::CalculateHashIntermediate;
|
||||||
|
use crate::cache_hash_data_stats::CacheHashDataStats;
|
||||||
|
use crate::pubkey_bins::PubkeyBinCalculator16;
|
||||||
|
use log::*;
|
||||||
|
use memmap2::MmapMut;
|
||||||
|
use solana_measure::measure::Measure;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::fs::{self};
|
||||||
|
use std::fs::{remove_file, OpenOptions};
|
||||||
|
use std::io::Seek;
|
||||||
|
use std::io::SeekFrom;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
pub type EntryType = CalculateHashIntermediate;
|
||||||
|
pub type SavedType = Vec<Vec<EntryType>>;
|
||||||
|
pub type SavedTypeSlice = [Vec<EntryType>];
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
pub struct Header {
|
||||||
|
count: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CacheHashDataFile {
|
||||||
|
cell_size: u64,
|
||||||
|
mmap: MmapMut,
|
||||||
|
capacity: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CacheHashDataFile {
|
||||||
|
fn get_mut<T: Sized>(&mut self, ix: u64) -> &mut T {
|
||||||
|
let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
|
||||||
|
let end = start + std::mem::size_of::<T>();
|
||||||
|
assert!(
|
||||||
|
end <= self.capacity as usize,
|
||||||
|
"end: {}, capacity: {}, ix: {}, cell size: {}",
|
||||||
|
end,
|
||||||
|
self.capacity,
|
||||||
|
ix,
|
||||||
|
self.cell_size
|
||||||
|
);
|
||||||
|
let item_slice: &[u8] = &self.mmap[start..end];
|
||||||
|
unsafe {
|
||||||
|
let item = item_slice.as_ptr() as *mut T;
|
||||||
|
&mut *item
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_header_mut(&mut self) -> &mut Header {
|
||||||
|
let start = 0_usize;
|
||||||
|
let end = start + std::mem::size_of::<Header>();
|
||||||
|
let item_slice: &[u8] = &self.mmap[start..end];
|
||||||
|
unsafe {
|
||||||
|
let item = item_slice.as_ptr() as *mut Header;
|
||||||
|
&mut *item
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_map(file: &Path, capacity: u64) -> Result<MmapMut, std::io::Error> {
|
||||||
|
let mut data = OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.write(true)
|
||||||
|
.create(true)
|
||||||
|
.open(file)?;
|
||||||
|
|
||||||
|
// Theoretical performance optimization: write a zero to the end of
|
||||||
|
// the file so that we won't have to resize it later, which may be
|
||||||
|
// expensive.
|
||||||
|
data.seek(SeekFrom::Start(capacity - 1)).unwrap();
|
||||||
|
data.write_all(&[0]).unwrap();
|
||||||
|
data.seek(SeekFrom::Start(0)).unwrap();
|
||||||
|
data.flush().unwrap();
|
||||||
|
Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_map(file: &Path) -> Result<MmapMut, std::io::Error> {
|
||||||
|
let data = OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.write(true)
|
||||||
|
.create(false)
|
||||||
|
.open(file)?;
|
||||||
|
|
||||||
|
Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type PreExistingCacheFiles = HashSet<String>;
|
||||||
|
pub struct CacheHashData {
|
||||||
|
cache_folder: PathBuf,
|
||||||
|
pre_existing_cache_files: Arc<Mutex<PreExistingCacheFiles>>,
|
||||||
|
pub stats: Arc<Mutex<CacheHashDataStats>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for CacheHashData {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.delete_old_cache_files();
|
||||||
|
self.stats.lock().unwrap().report();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CacheHashData {
|
||||||
|
pub fn new<P: AsRef<Path> + std::fmt::Debug>(parent_folder: &P) -> CacheHashData {
|
||||||
|
let cache_folder = Self::get_cache_root_path(parent_folder);
|
||||||
|
|
||||||
|
std::fs::create_dir_all(cache_folder.clone())
|
||||||
|
.unwrap_or_else(|_| panic!("error creating cache dir: {:?}", cache_folder));
|
||||||
|
|
||||||
|
let result = CacheHashData {
|
||||||
|
cache_folder,
|
||||||
|
pre_existing_cache_files: Arc::new(Mutex::new(PreExistingCacheFiles::default())),
|
||||||
|
stats: Arc::new(Mutex::new(CacheHashDataStats::default())),
|
||||||
|
};
|
||||||
|
|
||||||
|
result.get_cache_files();
|
||||||
|
result
|
||||||
|
}
|
||||||
|
fn delete_old_cache_files(&self) {
|
||||||
|
let pre_existing_cache_files = self.pre_existing_cache_files.lock().unwrap();
|
||||||
|
if !pre_existing_cache_files.is_empty() {
|
||||||
|
self.stats.lock().unwrap().unused_cache_files += pre_existing_cache_files.len();
|
||||||
|
for file_name in pre_existing_cache_files.iter() {
|
||||||
|
let result = self.cache_folder.join(file_name);
|
||||||
|
let _ = fs::remove_file(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn get_cache_files(&self) {
|
||||||
|
if self.cache_folder.is_dir() {
|
||||||
|
let dir = fs::read_dir(self.cache_folder.clone());
|
||||||
|
if let Ok(dir) = dir {
|
||||||
|
let mut pre_existing = self.pre_existing_cache_files.lock().unwrap();
|
||||||
|
for entry in dir.flatten() {
|
||||||
|
if let Some(name) = entry.path().file_name() {
|
||||||
|
pre_existing.insert(name.to_str().unwrap().to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.stats.lock().unwrap().cache_file_count += pre_existing.len();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_cache_root_path<P: AsRef<Path>>(parent_folder: &P) -> PathBuf {
|
||||||
|
parent_folder.as_ref().join("calculate_accounts_hash_cache")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn load<P: AsRef<Path> + std::fmt::Debug>(
|
||||||
|
&self,
|
||||||
|
file_name: &P,
|
||||||
|
accumulator: &mut SavedType,
|
||||||
|
start_bin_index: usize,
|
||||||
|
bin_calculator: &PubkeyBinCalculator16,
|
||||||
|
) -> Result<(), std::io::Error> {
|
||||||
|
let mut stats = CacheHashDataStats::default();
|
||||||
|
let result = self.load_internal(
|
||||||
|
file_name,
|
||||||
|
accumulator,
|
||||||
|
start_bin_index,
|
||||||
|
bin_calculator,
|
||||||
|
&mut stats,
|
||||||
|
);
|
||||||
|
self.stats.lock().unwrap().merge(&stats);
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_internal<P: AsRef<Path> + std::fmt::Debug>(
|
||||||
|
&self,
|
||||||
|
file_name: &P,
|
||||||
|
accumulator: &mut SavedType,
|
||||||
|
start_bin_index: usize,
|
||||||
|
bin_calculator: &PubkeyBinCalculator16,
|
||||||
|
stats: &mut CacheHashDataStats,
|
||||||
|
) -> Result<(), std::io::Error> {
|
||||||
|
let mut m = Measure::start("overall");
|
||||||
|
let path = self.cache_folder.join(file_name);
|
||||||
|
let file_len = std::fs::metadata(path.clone())?.len();
|
||||||
|
let mut m1 = Measure::start("read_file");
|
||||||
|
let mmap = CacheHashDataFile::load_map(&path)?;
|
||||||
|
m1.stop();
|
||||||
|
stats.read_us = m1.as_us();
|
||||||
|
|
||||||
|
let cell_size = std::mem::size_of::<EntryType>() as u64;
|
||||||
|
let mut cache_file = CacheHashDataFile {
|
||||||
|
mmap,
|
||||||
|
cell_size,
|
||||||
|
capacity: 0,
|
||||||
|
};
|
||||||
|
let header = cache_file.get_header_mut();
|
||||||
|
let entries = header.count;
|
||||||
|
|
||||||
|
let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;
|
||||||
|
cache_file.capacity = capacity;
|
||||||
|
assert_eq!(
|
||||||
|
capacity, file_len,
|
||||||
|
"expected: {}, len on disk: {} {:?}, entries: {}, cell_size: {}",
|
||||||
|
capacity, file_len, path, entries, cell_size
|
||||||
|
);
|
||||||
|
|
||||||
|
stats.total_entries = entries;
|
||||||
|
stats.cache_file_size += capacity as usize;
|
||||||
|
|
||||||
|
let file_name_lookup = file_name.as_ref().to_str().unwrap().to_string();
|
||||||
|
let found = self
|
||||||
|
.pre_existing_cache_files
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.remove(&file_name_lookup);
|
||||||
|
if !found {
|
||||||
|
info!(
|
||||||
|
"tried to mark {:?} as used, but it wasn't in the set, one example: {:?}",
|
||||||
|
file_name_lookup,
|
||||||
|
self.pre_existing_cache_files.lock().unwrap().iter().next()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
stats.loaded_from_cache += 1;
|
||||||
|
stats.entries_loaded_from_cache += entries;
|
||||||
|
let mut m2 = Measure::start("decode");
|
||||||
|
for i in 0..entries {
|
||||||
|
let d = cache_file.get_mut::<EntryType>(i as u64);
|
||||||
|
let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
|
||||||
|
assert!(
|
||||||
|
pubkey_to_bin_index >= start_bin_index,
|
||||||
|
"{}, {}",
|
||||||
|
pubkey_to_bin_index,
|
||||||
|
start_bin_index
|
||||||
|
); // this would indicate we put a pubkey in too high of a bin
|
||||||
|
pubkey_to_bin_index -= start_bin_index;
|
||||||
|
accumulator[pubkey_to_bin_index].push(d.clone()); // may want to avoid clone here
|
||||||
|
}
|
||||||
|
|
||||||
|
m2.stop();
|
||||||
|
stats.decode_us += m2.as_us();
|
||||||
|
m.stop();
|
||||||
|
stats.load_us += m.as_us();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn save(&self, file_name: &Path, data: &SavedTypeSlice) -> Result<(), std::io::Error> {
|
||||||
|
let mut stats = CacheHashDataStats::default();
|
||||||
|
let result = self.save_internal(file_name, data, &mut stats);
|
||||||
|
self.stats.lock().unwrap().merge(&stats);
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn save_internal(
|
||||||
|
&self,
|
||||||
|
file_name: &Path,
|
||||||
|
data: &SavedTypeSlice,
|
||||||
|
stats: &mut CacheHashDataStats,
|
||||||
|
) -> Result<(), std::io::Error> {
|
||||||
|
let mut m = Measure::start("save");
|
||||||
|
let cache_path = self.cache_folder.join(file_name);
|
||||||
|
let create = true;
|
||||||
|
if create {
|
||||||
|
let _ignored = remove_file(&cache_path);
|
||||||
|
}
|
||||||
|
let cell_size = std::mem::size_of::<EntryType>() as u64;
|
||||||
|
let mut m1 = Measure::start("create save");
|
||||||
|
let entries = data
|
||||||
|
.iter()
|
||||||
|
.map(|x: &Vec<EntryType>| x.len())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let entries = entries.iter().sum::<usize>();
|
||||||
|
let capacity = cell_size * (entries as u64) + std::mem::size_of::<Header>() as u64;
|
||||||
|
|
||||||
|
let mmap = CacheHashDataFile::new_map(&cache_path, capacity)?;
|
||||||
|
m1.stop();
|
||||||
|
stats.create_save_us += m1.as_us();
|
||||||
|
let mut cache_file = CacheHashDataFile {
|
||||||
|
mmap,
|
||||||
|
cell_size,
|
||||||
|
capacity,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut header = cache_file.get_header_mut();
|
||||||
|
header.count = entries;
|
||||||
|
|
||||||
|
stats.cache_file_size = capacity as usize;
|
||||||
|
stats.total_entries = entries;
|
||||||
|
|
||||||
|
let mut m2 = Measure::start("write_to_mmap");
|
||||||
|
let mut i = 0;
|
||||||
|
data.iter().for_each(|x| {
|
||||||
|
x.iter().for_each(|item| {
|
||||||
|
let d = cache_file.get_mut::<EntryType>(i as u64);
|
||||||
|
i += 1;
|
||||||
|
*d = item.clone();
|
||||||
|
})
|
||||||
|
});
|
||||||
|
assert_eq!(i, entries);
|
||||||
|
m2.stop();
|
||||||
|
stats.write_to_mmap_us += m2.as_us();
|
||||||
|
m.stop();
|
||||||
|
stats.save_us += m.as_us();
|
||||||
|
stats.saved_to_cache += 1;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub mod tests {
|
||||||
|
use super::*;
|
||||||
|
use rand::Rng;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_read_write() {
|
||||||
|
// generate sample data
|
||||||
|
// write to file
|
||||||
|
// read
|
||||||
|
// compare
|
||||||
|
use tempfile::TempDir;
|
||||||
|
let tmpdir = TempDir::new().unwrap();
|
||||||
|
std::fs::create_dir_all(&tmpdir).unwrap();
|
||||||
|
|
||||||
|
for bins in [1, 2, 4] {
|
||||||
|
let bin_calculator = PubkeyBinCalculator16::new(bins);
|
||||||
|
let num_points = 5;
|
||||||
|
let (data, _total_points) = generate_test_data(num_points, bins, &bin_calculator);
|
||||||
|
for passes in [1, 2] {
|
||||||
|
let bins_per_pass = bins / passes;
|
||||||
|
if bins_per_pass == 0 {
|
||||||
|
continue; // illegal test case
|
||||||
|
}
|
||||||
|
for pass in 0..passes {
|
||||||
|
for flatten_data in [true, false] {
|
||||||
|
let mut data_this_pass = if flatten_data {
|
||||||
|
vec![vec![], vec![]]
|
||||||
|
} else {
|
||||||
|
vec![]
|
||||||
|
};
|
||||||
|
let start_bin_this_pass = pass * bins_per_pass;
|
||||||
|
for bin in 0..bins_per_pass {
|
||||||
|
let mut this_bin_data = data[bin + start_bin_this_pass].clone();
|
||||||
|
if flatten_data {
|
||||||
|
data_this_pass[0].append(&mut this_bin_data);
|
||||||
|
} else {
|
||||||
|
data_this_pass.push(this_bin_data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let cache = CacheHashData::new(&tmpdir);
|
||||||
|
let file_name = "test";
|
||||||
|
let file = Path::new(file_name).to_path_buf();
|
||||||
|
cache.save(&file, &data_this_pass).unwrap();
|
||||||
|
cache.get_cache_files();
|
||||||
|
assert_eq!(
|
||||||
|
cache
|
||||||
|
.pre_existing_cache_files
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
vec![file_name]
|
||||||
|
);
|
||||||
|
let mut accum = (0..bins_per_pass).into_iter().map(|_| vec![]).collect();
|
||||||
|
cache
|
||||||
|
.load(&file, &mut accum, start_bin_this_pass, &bin_calculator)
|
||||||
|
.unwrap();
|
||||||
|
if flatten_data {
|
||||||
|
bin_data(
|
||||||
|
&mut data_this_pass,
|
||||||
|
&bin_calculator,
|
||||||
|
bins_per_pass,
|
||||||
|
start_bin_this_pass,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
accum, data_this_pass,
|
||||||
|
"bins: {}, start_bin_this_pass: {}, pass: {}, flatten: {}, passes: {}",
|
||||||
|
bins, start_bin_this_pass, pass, flatten_data, passes
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bin_data(
|
||||||
|
data: &mut SavedType,
|
||||||
|
bin_calculator: &PubkeyBinCalculator16,
|
||||||
|
bins: usize,
|
||||||
|
start_bin: usize,
|
||||||
|
) {
|
||||||
|
let mut accum: SavedType = (0..bins).into_iter().map(|_| vec![]).collect();
|
||||||
|
data.drain(..).into_iter().for_each(|mut x| {
|
||||||
|
x.drain(..).into_iter().for_each(|item| {
|
||||||
|
let bin = bin_calculator.bin_from_pubkey(&item.pubkey);
|
||||||
|
accum[bin - start_bin].push(item);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
*data = accum;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn generate_test_data(
|
||||||
|
count: usize,
|
||||||
|
bins: usize,
|
||||||
|
binner: &PubkeyBinCalculator16,
|
||||||
|
) -> (SavedType, usize) {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let mut ct = 0;
|
||||||
|
(
|
||||||
|
(0..bins)
|
||||||
|
.into_iter()
|
||||||
|
.map(|bin| {
|
||||||
|
let rnd = rng.gen::<u64>() % (bins as u64);
|
||||||
|
if rnd < count as u64 {
|
||||||
|
(0..std::cmp::max(1, count / bins))
|
||||||
|
.into_iter()
|
||||||
|
.map(|_| {
|
||||||
|
ct += 1;
|
||||||
|
let mut pk;
|
||||||
|
loop {
|
||||||
|
// expensive, but small numbers and for tests, so ok
|
||||||
|
pk = solana_sdk::pubkey::new_rand();
|
||||||
|
if binner.bin_from_pubkey(&pk) == bin {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
CalculateHashIntermediate::new(
|
||||||
|
solana_sdk::hash::new_rand(&mut rng),
|
||||||
|
ct as u64,
|
||||||
|
pk,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
} else {
|
||||||
|
vec![]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
ct,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
59
runtime/src/cache_hash_data_stats.rs
Normal file
59
runtime/src/cache_hash_data_stats.rs
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
//! Cached data for hashing accounts
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
pub struct CacheHashDataStats {
|
||||||
|
pub cache_file_size: usize,
|
||||||
|
pub cache_file_count: usize,
|
||||||
|
pub total_entries: usize,
|
||||||
|
pub loaded_from_cache: usize,
|
||||||
|
pub entries_loaded_from_cache: usize,
|
||||||
|
pub save_us: u64,
|
||||||
|
pub saved_to_cache: usize,
|
||||||
|
pub write_to_mmap_us: u64,
|
||||||
|
pub create_save_us: u64,
|
||||||
|
pub load_us: u64,
|
||||||
|
pub read_us: u64,
|
||||||
|
pub decode_us: u64,
|
||||||
|
pub merge_us: u64,
|
||||||
|
pub unused_cache_files: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CacheHashDataStats {
|
||||||
|
pub fn merge(&mut self, other: &CacheHashDataStats) {
|
||||||
|
self.cache_file_size += other.cache_file_size;
|
||||||
|
self.total_entries += other.total_entries;
|
||||||
|
self.loaded_from_cache += other.loaded_from_cache;
|
||||||
|
self.entries_loaded_from_cache += other.entries_loaded_from_cache;
|
||||||
|
self.load_us += other.load_us;
|
||||||
|
self.read_us += other.read_us;
|
||||||
|
self.decode_us += other.decode_us;
|
||||||
|
self.save_us += other.save_us;
|
||||||
|
self.saved_to_cache += other.saved_to_cache;
|
||||||
|
self.create_save_us += other.create_save_us;
|
||||||
|
self.cache_file_count += other.cache_file_count;
|
||||||
|
self.write_to_mmap_us += other.write_to_mmap_us;
|
||||||
|
self.unused_cache_files += other.unused_cache_files;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn report(&self) {
|
||||||
|
datapoint_info!(
|
||||||
|
"cache_hash_data_stats",
|
||||||
|
("cache_file_size", self.cache_file_size, i64),
|
||||||
|
("cache_file_count", self.cache_file_count, i64),
|
||||||
|
("total_entries", self.total_entries, i64),
|
||||||
|
("loaded_from_cache", self.loaded_from_cache, i64),
|
||||||
|
("saved_to_cache", self.saved_to_cache, i64),
|
||||||
|
(
|
||||||
|
"entries_loaded_from_cache",
|
||||||
|
self.entries_loaded_from_cache,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
("save_us", self.save_us, i64),
|
||||||
|
("write_to_mmap_us", self.write_to_mmap_us, i64),
|
||||||
|
("create_save_us", self.create_save_us, i64),
|
||||||
|
("load_us", self.load_us, i64),
|
||||||
|
("read_us", self.read_us, i64),
|
||||||
|
("decode_us", self.decode_us, i64),
|
||||||
|
("unused_cache_files", self.unused_cache_files, i64),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -18,6 +18,8 @@ pub mod bloom;
|
|||||||
pub mod bucket_map_holder;
|
pub mod bucket_map_holder;
|
||||||
pub mod bucket_map_holder_stats;
|
pub mod bucket_map_holder_stats;
|
||||||
pub mod builtins;
|
pub mod builtins;
|
||||||
|
pub mod cache_hash_data;
|
||||||
|
pub mod cache_hash_data_stats;
|
||||||
pub mod commitment;
|
pub mod commitment;
|
||||||
pub mod contains;
|
pub mod contains;
|
||||||
pub mod epoch_stakes;
|
pub mod epoch_stakes;
|
||||||
|
@ -2488,7 +2488,11 @@ pub fn main() {
|
|||||||
let accounts_index_config = value_t!(matches, "accounts_index_bins", usize)
|
let accounts_index_config = value_t!(matches, "accounts_index_bins", usize)
|
||||||
.ok()
|
.ok()
|
||||||
.map(|bins| AccountsIndexConfig { bins: Some(bins) });
|
.map(|bins| AccountsIndexConfig { bins: Some(bins) });
|
||||||
let accounts_db_config = accounts_index_config.map(|x| AccountsDbConfig { index: Some(x) });
|
|
||||||
|
let accounts_db_config = Some(AccountsDbConfig {
|
||||||
|
index: accounts_index_config,
|
||||||
|
accounts_hash_cache_path: Some(ledger_path.clone()),
|
||||||
|
});
|
||||||
|
|
||||||
let accountsdb_repl_service_config = if matches.is_present("enable_accountsdb_repl") {
|
let accountsdb_repl_service_config = if matches.is_present("enable_accountsdb_repl") {
|
||||||
let accountsdb_repl_bind_address = if matches.is_present("accountsdb_repl_bind_address") {
|
let accountsdb_repl_bind_address = if matches.is_present("accountsdb_repl_bind_address") {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user