add worker threads during startup (#20700)

This commit is contained in:
Jeff Washington (jwash) 2021-10-15 10:40:36 -05:00 committed by GitHub
parent e225ed7ab0
commit b80fd7566a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 15 deletions

View File

@ -1397,7 +1397,7 @@ impl<T: IndexValue> AccountsIndex<T> {
}
pub fn set_startup(&self, value: bool) {
self.storage.storage.set_startup(value);
self.storage.set_startup(value);
}
/// Get an account

View File

@ -43,20 +43,23 @@ impl<T: IndexValue> Drop for AccountsIndexStorage<T> {
}
impl<T: IndexValue> AccountsIndexStorage<T> {
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> AccountsIndexStorage<T> {
let num_threads = std::cmp::max(2, num_cpus::get() / 4);
let threads = config
.as_ref()
.and_then(|config| config.flush_threads)
.unwrap_or(num_threads);
pub fn add_worker_threads(existing: &Self, threads: usize) -> Self {
Self::allocate(
Arc::clone(&existing.storage),
existing.in_mem.clone(),
threads,
)
}
let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
let in_mem = (0..bins)
.into_iter()
.map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
.collect::<Vec<_>>();
pub fn set_startup(&self, value: bool) {
self.storage.set_startup(self, value);
}
fn allocate(
storage: Arc<BucketMapHolder<T>>,
in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
threads: usize,
) -> Self {
let exit = Arc::new(AtomicBool::default());
let handles = Some(
(0..threads)
@ -84,4 +87,21 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
in_mem,
}
}
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self {
let num_threads = std::cmp::max(2, num_cpus::get() / 4);
let threads = config
.as_ref()
.and_then(|config| config.flush_threads)
.unwrap_or(num_threads);
let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
let in_mem = (0..bins)
.into_iter()
.map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
.collect::<Vec<_>>();
Self::allocate(storage, in_mem, threads)
}
}

View File

@ -1,4 +1,5 @@
use crate::accounts_index::{AccountsIndexConfig, IndexValue};
use crate::accounts_index_storage::AccountsIndexStorage;
use crate::bucket_map_holder_stats::BucketMapHolderStats;
use crate::in_mem_accounts_index::{InMemAccountsIndex, SlotT};
use crate::waitable_condvar::WaitableCondvar;
@ -40,6 +41,8 @@ pub struct BucketMapHolder<T: IndexValue> {
/// and writing to disk in parallel are.
/// Note startup is an optimization and is not required for correctness.
startup: AtomicBool,
startup_worker_threads: Mutex<Option<AccountsIndexStorage<T>>>,
}
impl<T: IndexValue> Debug for BucketMapHolder<T> {
@ -75,9 +78,15 @@ impl<T: IndexValue> BucketMapHolder<T> {
self.startup.load(Ordering::Relaxed)
}
pub fn set_startup(&self, value: bool) {
if !value {
pub fn set_startup(&self, storage: &AccountsIndexStorage<T>, value: bool) {
if value {
let num_threads = std::cmp::max(2, num_cpus::get() / 4);
*self.startup_worker_threads.lock().unwrap() = Some(
AccountsIndexStorage::add_worker_threads(storage, num_threads),
);
} else {
self.wait_for_idle();
*self.startup_worker_threads.lock().unwrap() = None;
}
self.startup.store(value, Ordering::Relaxed)
}
@ -151,6 +160,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
bins,
startup: AtomicBool::default(),
mem_budget_mb,
startup_worker_threads: Mutex::default(),
_threads: threads,
}
}