diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 5834bb3f5d..4cacae406b 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -8,6 +8,7 @@ use crate::blockhash_queue::BlockhashQueue; use crate::message_processor::has_duplicates; use bincode::serialize; use log::*; +use rayon::slice::ParallelSliceMut; use solana_metrics::inc_new_counter_error; use solana_sdk::account::Account; use solana_sdk::hash::{Hash, Hasher}; @@ -100,12 +101,16 @@ impl Accounts { } pub fn new(in_paths: Option) -> Self { + Self::new_with_num_stores(in_paths, 0) + } + + pub fn new_with_num_stores(in_paths: Option, min_num_stores: usize) -> Self { let (paths, own_paths) = if in_paths.is_none() { (Self::make_default_paths(), true) } else { (in_paths.unwrap(), false) }; - let accounts_db = Arc::new(AccountsDB::new(&paths)); + let accounts_db = Arc::new(AccountsDB::new_with_num_stores(&paths, min_num_stores)); Accounts { accounts_db, account_locks: Mutex::new(HashSet::new()), @@ -332,7 +337,9 @@ impl Accounts { ); let mut versions: Vec<(Pubkey, u64, B)> = accumulator.into_iter().flat_map(|x| x).collect(); - versions.sort_by_key(|s| (s.0, s.1)); + self.accounts_db.thread_pool.install(|| { + versions.par_sort_by_key(|s| (s.0, s.1)); + }); versions.dedup_by_key(|s| s.0); versions .into_iter() diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index a3624418c8..de046707bb 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -40,7 +40,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use sys_info; -const ACCOUNT_DATA_FILE_SIZE: u64 = 16 * 1024 * 1024; +const ACCOUNT_DATA_FILE_SIZE: u64 = 4 * 1024 * 1024; const ACCOUNT_DATA_FILE: &str = "data"; pub const NUM_THREADS: u32 = 10; @@ -78,8 +78,11 @@ pub type InstructionAccounts = Vec; pub type InstructionCredits = Vec; pub type InstructionLoaders = Vec>; +// Each fork has a set of storage entries. +type ForkStores = HashMap>; + #[derive(Default, Debug)] -pub struct AccountStorage(HashMap>>); +pub struct AccountStorage(HashMap); struct AccountStorageVisitor; @@ -275,7 +278,9 @@ pub struct AccountsDB { file_size: u64, /// Thread pool used for par_iter - thread_pool: ThreadPool, + pub thread_pool: ThreadPool, + + min_num_stores: usize, } pub fn get_paths_vec(paths: &str) -> Vec { @@ -295,11 +300,18 @@ impl Default for AccountsDB { .num_threads(2) .build() .unwrap(), + min_num_stores: 0, } } } impl AccountsDB { + pub fn new_with_num_stores(paths: &str, min_num_stores: usize) -> Self { + let mut new = Self::new(paths); + new.min_num_stores = min_num_stores; + new + } + pub fn new_with_file_size(paths: &str, file_size: u64) -> Self { let paths = get_paths_vec(&paths); AccountsDB { @@ -313,6 +325,7 @@ impl AccountsDB { .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) .build() .unwrap(), + min_num_stores: 0, } } @@ -359,12 +372,12 @@ impl AccountsDB { Ok(()) } - fn new_storage_entry(&self, fork_id: Fork, path: &str) -> AccountStorageEntry { + fn new_storage_entry(&self, fork_id: Fork, path: &str, size: u64) -> AccountStorageEntry { AccountStorageEntry::new( path, fork_id, self.next_id.fetch_add(1, Ordering::Relaxed), - self.file_size, + size, ) } @@ -491,11 +504,31 @@ impl AccountsDB { } drop(stores); - let mut stores = self.storage.write().unwrap(); - let path_index = thread_rng().gen_range(0, self.paths.len()); - let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new); - let store = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index])); + let store = self.create_and_insert_store(fork_id, self.file_size); store.try_available(); + store + } + + fn create_and_insert_store(&self, fork_id: Fork, size: u64) -> Arc { + let mut stores = self.storage.write().unwrap(); + let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new); + + // Create more stores so that when scanning the storage all CPUs have work + while fork_storage.len() < self.min_num_stores { + self.create_store(fork_id, fork_storage, self.file_size); + } + + self.create_store(fork_id, fork_storage, size) + } + + fn create_store( + &self, + fork_id: Fork, + fork_storage: &mut ForkStores, + size: u64, + ) -> Arc { + let path_index = thread_rng().gen_range(0, self.paths.len()); + let store = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index], size)); fork_storage.insert(store.id, store.clone()); store } @@ -537,6 +570,12 @@ impl AccountsDB { let rvs = storage.accounts.append_accounts(&with_meta[infos.len()..]); if rvs.is_empty() { storage.set_status(AccountStorageStatus::Full); + + // See if an account overflows the default append vec size. + let data_len = (with_meta[infos.len()].1.data.len() + 4096) as u64; + if data_len > self.file_size { + self.create_and_insert_store(fork_id, data_len * 2); + } continue; } for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) { @@ -1374,4 +1413,21 @@ mod tests { }); assert_eq!(accounts.len(), 2); } + + #[test] + fn test_store_large_account() { + solana_logger::setup(); + let paths = get_tmp_accounts_path!(); + let db = AccountsDB::new(&paths.paths); + + let key = Pubkey::default(); + let data_len = ACCOUNT_DATA_FILE_SIZE as usize + 7; + let account = Account::new(1, data_len, &key); + + db.store(0, &hashmap!(&key => &account)); + + let ancestors = vec![(0, 0)].into_iter().collect(); + let ret = db.load_slow(&ancestors, &key).unwrap(); + assert_eq!(ret.0.data.len(), data_len); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d7a3cc51b0..c66f902f19 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -50,6 +50,12 @@ use std::sync::{Arc, RwLock, RwLockReadGuard}; pub const SECONDS_PER_YEAR: f64 = (365.0 * 24.0 * 60.0 * 60.0); +// Create many append vecs to increase parallelism in scan_ functions. +fn default_num_stores() -> usize { + const DEFAULT_NUM_STORES: u32 = 8; + sys_info::cpu_num().unwrap_or(DEFAULT_NUM_STORES) as usize +} + type BankStatusCache = StatusCache>; #[derive(Default)] @@ -63,7 +69,7 @@ pub struct BankRc { impl BankRc { pub fn new(account_paths: Option, id: AppendVecId) -> Self { - let accounts = Accounts::new(account_paths); + let accounts = Accounts::new_with_num_stores(account_paths, default_num_stores()); accounts .accounts_db .next_id @@ -280,7 +286,7 @@ impl Bank { pub fn new_with_paths(genesis_block: &GenesisBlock, paths: Option) -> Self { let mut bank = Self::default(); bank.ancestors.insert(bank.slot(), 0); - bank.rc.accounts = Arc::new(Accounts::new(paths)); + bank.rc.accounts = Arc::new(Accounts::new_with_num_stores(paths, default_num_stores())); bank.process_genesis_block(genesis_block); // genesis needs stakes for all epochs up to the epoch implied by // slot = 0 and genesis configuration