diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index b058e194d2..89b86dbf81 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1397,7 +1397,7 @@ impl AccountsIndex { } pub fn set_startup(&self, value: bool) { - self.storage.storage.set_startup(value); + self.storage.set_startup(value); } /// Get an account diff --git a/runtime/src/accounts_index_storage.rs b/runtime/src/accounts_index_storage.rs index 74ab683b69..96700bd535 100644 --- a/runtime/src/accounts_index_storage.rs +++ b/runtime/src/accounts_index_storage.rs @@ -43,20 +43,23 @@ impl Drop for AccountsIndexStorage { } impl AccountsIndexStorage { - pub fn new(bins: usize, config: &Option) -> AccountsIndexStorage { - let num_threads = std::cmp::max(2, num_cpus::get() / 4); - let threads = config - .as_ref() - .and_then(|config| config.flush_threads) - .unwrap_or(num_threads); + pub fn add_worker_threads(existing: &Self, threads: usize) -> Self { + Self::allocate( + Arc::clone(&existing.storage), + existing.in_mem.clone(), + threads, + ) + } - let storage = Arc::new(BucketMapHolder::new(bins, config, threads)); - - let in_mem = (0..bins) - .into_iter() - .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin))) - .collect::>(); + pub fn set_startup(&self, value: bool) { + self.storage.set_startup(self, value); + } + fn allocate( + storage: Arc>, + in_mem: Vec>>, + threads: usize, + ) -> Self { let exit = Arc::new(AtomicBool::default()); let handles = Some( (0..threads) @@ -84,4 +87,21 @@ impl AccountsIndexStorage { in_mem, } } + + pub fn new(bins: usize, config: &Option) -> Self { + let num_threads = std::cmp::max(2, num_cpus::get() / 4); + let threads = config + .as_ref() + .and_then(|config| config.flush_threads) + .unwrap_or(num_threads); + + let storage = Arc::new(BucketMapHolder::new(bins, config, threads)); + + let in_mem = (0..bins) + .into_iter() + .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin))) + .collect::>(); + + Self::allocate(storage, in_mem, threads) + } } diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs index a28e79e70b..34ccbfaa25 100644 --- a/runtime/src/bucket_map_holder.rs +++ b/runtime/src/bucket_map_holder.rs @@ -1,4 +1,5 @@ use crate::accounts_index::{AccountsIndexConfig, IndexValue}; +use crate::accounts_index_storage::AccountsIndexStorage; use crate::bucket_map_holder_stats::BucketMapHolderStats; use crate::in_mem_accounts_index::{InMemAccountsIndex, SlotT}; use crate::waitable_condvar::WaitableCondvar; @@ -40,6 +41,8 @@ pub struct BucketMapHolder { /// and writing to disk in parallel are. /// Note startup is an optimization and is not required for correctness. startup: AtomicBool, + + startup_worker_threads: Mutex>>, } impl Debug for BucketMapHolder { @@ -75,9 +78,15 @@ impl BucketMapHolder { self.startup.load(Ordering::Relaxed) } - pub fn set_startup(&self, value: bool) { - if !value { + pub fn set_startup(&self, storage: &AccountsIndexStorage, value: bool) { + if value { + let num_threads = std::cmp::max(2, num_cpus::get() / 4); + *self.startup_worker_threads.lock().unwrap() = Some( + AccountsIndexStorage::add_worker_threads(storage, num_threads), + ); + } else { self.wait_for_idle(); + *self.startup_worker_threads.lock().unwrap() = None; } self.startup.store(value, Ordering::Relaxed) } @@ -151,6 +160,7 @@ impl BucketMapHolder { bins, startup: AtomicBool::default(), mem_budget_mb, + startup_worker_threads: Mutex::default(), _threads: threads, } }