Throw more threads at hash_internal_state (#5023)

This commit is contained in:
sakridge
2019-07-16 16:58:30 -07:00
committed by GitHub
parent acf096c5f7
commit b505a0df22
3 changed files with 82 additions and 13 deletions

View File

@ -40,7 +40,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use sys_info;
const ACCOUNT_DATA_FILE_SIZE: u64 = 16 * 1024 * 1024;
const ACCOUNT_DATA_FILE_SIZE: u64 = 4 * 1024 * 1024;
const ACCOUNT_DATA_FILE: &str = "data";
pub const NUM_THREADS: u32 = 10;
@ -78,8 +78,11 @@ pub type InstructionAccounts = Vec<Account>;
pub type InstructionCredits = Vec<LamportCredit>;
pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>;
// Each fork has a set of storage entries.
type ForkStores = HashMap<usize, Arc<AccountStorageEntry>>;
#[derive(Default, Debug)]
pub struct AccountStorage(HashMap<Fork, HashMap<usize, Arc<AccountStorageEntry>>>);
pub struct AccountStorage(HashMap<Fork, ForkStores>);
struct AccountStorageVisitor;
@ -275,7 +278,9 @@ pub struct AccountsDB {
file_size: u64,
/// Thread pool used for par_iter
thread_pool: ThreadPool,
pub thread_pool: ThreadPool,
min_num_stores: usize,
}
pub fn get_paths_vec(paths: &str) -> Vec<String> {
@ -295,11 +300,18 @@ impl Default for AccountsDB {
.num_threads(2)
.build()
.unwrap(),
min_num_stores: 0,
}
}
}
impl AccountsDB {
pub fn new_with_num_stores(paths: &str, min_num_stores: usize) -> Self {
let mut new = Self::new(paths);
new.min_num_stores = min_num_stores;
new
}
pub fn new_with_file_size(paths: &str, file_size: u64) -> Self {
let paths = get_paths_vec(&paths);
AccountsDB {
@ -313,6 +325,7 @@ impl AccountsDB {
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.build()
.unwrap(),
min_num_stores: 0,
}
}
@ -359,12 +372,12 @@ impl AccountsDB {
Ok(())
}
fn new_storage_entry(&self, fork_id: Fork, path: &str) -> AccountStorageEntry {
fn new_storage_entry(&self, fork_id: Fork, path: &str, size: u64) -> AccountStorageEntry {
AccountStorageEntry::new(
path,
fork_id,
self.next_id.fetch_add(1, Ordering::Relaxed),
self.file_size,
size,
)
}
@ -491,11 +504,31 @@ impl AccountsDB {
}
drop(stores);
let mut stores = self.storage.write().unwrap();
let path_index = thread_rng().gen_range(0, self.paths.len());
let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new);
let store = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index]));
let store = self.create_and_insert_store(fork_id, self.file_size);
store.try_available();
store
}
fn create_and_insert_store(&self, fork_id: Fork, size: u64) -> Arc<AccountStorageEntry> {
let mut stores = self.storage.write().unwrap();
let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new);
// Create more stores so that when scanning the storage all CPUs have work
while fork_storage.len() < self.min_num_stores {
self.create_store(fork_id, fork_storage, self.file_size);
}
self.create_store(fork_id, fork_storage, size)
}
fn create_store(
&self,
fork_id: Fork,
fork_storage: &mut ForkStores,
size: u64,
) -> Arc<AccountStorageEntry> {
let path_index = thread_rng().gen_range(0, self.paths.len());
let store = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index], size));
fork_storage.insert(store.id, store.clone());
store
}
@ -537,6 +570,12 @@ impl AccountsDB {
let rvs = storage.accounts.append_accounts(&with_meta[infos.len()..]);
if rvs.is_empty() {
storage.set_status(AccountStorageStatus::Full);
// See if an account overflows the default append vec size.
let data_len = (with_meta[infos.len()].1.data.len() + 4096) as u64;
if data_len > self.file_size {
self.create_and_insert_store(fork_id, data_len * 2);
}
continue;
}
for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) {
@ -1374,4 +1413,21 @@ mod tests {
});
assert_eq!(accounts.len(), 2);
}
#[test]
fn test_store_large_account() {
solana_logger::setup();
let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths);
let key = Pubkey::default();
let data_len = ACCOUNT_DATA_FILE_SIZE as usize + 7;
let account = Account::new(1, data_len, &key);
db.store(0, &hashmap!(&key => &account));
let ancestors = vec![(0, 0)].into_iter().collect();
let ret = db.load_slow(&ancestors, &key).unwrap();
assert_eq!(ret.0.data.len(), data_len);
}
}