AcctIdx: configurable flush threads (#19983)

This commit is contained in:
Jeff Washington (jwash)
2021-09-17 17:02:43 -05:00
committed by GitHub
parent 073c5359b0
commit 24b136a993
2 changed files with 10 additions and 2 deletions

View File

@ -34,11 +34,14 @@ pub const ITER_BATCH_SIZE: usize = 1000;
pub const BINS_DEFAULT: usize = 8192; pub const BINS_DEFAULT: usize = 8192;
pub const BINS_FOR_TESTING: usize = BINS_DEFAULT; pub const BINS_FOR_TESTING: usize = BINS_DEFAULT;
pub const BINS_FOR_BENCHMARKS: usize = BINS_DEFAULT; pub const BINS_FOR_BENCHMARKS: usize = BINS_DEFAULT;
pub const FLUSH_THREADS_TESTING: usize = 1;
pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndexConfig { pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndexConfig {
bins: Some(BINS_FOR_TESTING), bins: Some(BINS_FOR_TESTING),
flush_threads: Some(FLUSH_THREADS_TESTING),
}; };
pub const ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS: AccountsIndexConfig = AccountsIndexConfig { pub const ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS: AccountsIndexConfig = AccountsIndexConfig {
bins: Some(BINS_FOR_BENCHMARKS), bins: Some(BINS_FOR_BENCHMARKS),
flush_threads: Some(FLUSH_THREADS_TESTING),
}; };
pub type ScanResult<T> = Result<T, ScanError>; pub type ScanResult<T> = Result<T, ScanError>;
pub type SlotList<T> = Vec<(Slot, T)>; pub type SlotList<T> = Vec<(Slot, T)>;
@ -93,6 +96,7 @@ pub struct AccountSecondaryIndexesIncludeExclude {
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub struct AccountsIndexConfig { pub struct AccountsIndexConfig {
pub bins: Option<usize>, pub bins: Option<usize>,
pub flush_threads: Option<usize>,
} }
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]

View File

@ -46,7 +46,7 @@ impl<T: IndexValue> Drop for AccountsIndexStorage<T> {
} }
impl<T: IndexValue> AccountsIndexStorage<T> { impl<T: IndexValue> AccountsIndexStorage<T> {
pub fn new(bins: usize, _config: &Option<AccountsIndexConfig>) -> AccountsIndexStorage<T> { pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> AccountsIndexStorage<T> {
let storage = Arc::new(BucketMapHolder::new(bins)); let storage = Arc::new(BucketMapHolder::new(bins));
let in_mem = (0..bins) let in_mem = (0..bins)
@ -55,7 +55,11 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
.collect(); .collect();
const DEFAULT_THREADS: usize = 1; // soon, this will be a cpu calculation const DEFAULT_THREADS: usize = 1; // soon, this will be a cpu calculation
let threads = DEFAULT_THREADS; let threads = config
.as_ref()
.and_then(|config| config.flush_threads)
.unwrap_or(DEFAULT_THREADS);
let exit = Arc::new(AtomicBool::default()); let exit = Arc::new(AtomicBool::default());
let wait = Arc::new(WaitableCondvar::default()); let wait = Arc::new(WaitableCondvar::default());
let handles = Some( let handles = Some(