AcctIdx: store # threads in BucketHolder. Used later. (#20151)

This commit is contained in:
Jeff Washington (jwash)
2021-09-23 19:56:44 -05:00
committed by GitHub
parent b3bb079d9f
commit 1d13594c1c
3 changed files with 16 additions and 15 deletions

View File

@ -44,19 +44,19 @@ impl<T: IndexValue> Drop for AccountsIndexStorage<T> {
impl<T: IndexValue> AccountsIndexStorage<T> { impl<T: IndexValue> AccountsIndexStorage<T> {
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> AccountsIndexStorage<T> { pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> AccountsIndexStorage<T> {
let storage = Arc::new(BucketMapHolder::new(bins, config));
let in_mem = (0..bins)
.into_iter()
.map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
.collect::<Vec<_>>();
const DEFAULT_THREADS: usize = 1; // soon, this will be a cpu calculation const DEFAULT_THREADS: usize = 1; // soon, this will be a cpu calculation
let threads = config let threads = config
.as_ref() .as_ref()
.and_then(|config| config.flush_threads) .and_then(|config| config.flush_threads)
.unwrap_or(DEFAULT_THREADS); .unwrap_or(DEFAULT_THREADS);
let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
let in_mem = (0..bins)
.into_iter()
.map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
.collect::<Vec<_>>();
let exit = Arc::new(AtomicBool::default()); let exit = Arc::new(AtomicBool::default());
let handles = Some( let handles = Some(
(0..threads) (0..threads)

View File

@ -28,6 +28,8 @@ pub struct BucketMapHolder<T: IndexValue> {
next_bucket_to_flush: Mutex<usize>, next_bucket_to_flush: Mutex<usize>,
bins: usize, bins: usize,
_threads: usize,
// how much mb are we allowed to keep in the in-mem index? // how much mb are we allowed to keep in the in-mem index?
// Rest goes to disk. // Rest goes to disk.
pub mem_budget_mb: Option<usize>, pub mem_budget_mb: Option<usize>,
@ -38,7 +40,6 @@ pub struct BucketMapHolder<T: IndexValue> {
/// and writing to disk in parallel are. /// and writing to disk in parallel are.
/// Note startup is an optimization and is not required for correctness. /// Note startup is an optimization and is not required for correctness.
startup: AtomicBool, startup: AtomicBool,
_phantom: std::marker::PhantomData<T>,
} }
impl<T: IndexValue> Debug for BucketMapHolder<T> { impl<T: IndexValue> Debug for BucketMapHolder<T> {
@ -113,7 +114,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
} }
} }
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self { pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, threads: usize) -> Self {
const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5; const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
let ages_to_stay_in_cache = config let ages_to_stay_in_cache = config
.as_ref() .as_ref()
@ -125,7 +126,6 @@ impl<T: IndexValue> BucketMapHolder<T> {
let mem_budget_mb = config.as_ref().and_then(|config| config.index_limit_mb); let mem_budget_mb = config.as_ref().and_then(|config| config.index_limit_mb);
// only allocate if mem_budget_mb is Some // only allocate if mem_budget_mb is Some
let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config)); let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config));
Self { Self {
disk, disk,
ages_to_stay_in_cache, ages_to_stay_in_cache,
@ -138,7 +138,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
bins, bins,
startup: AtomicBool::default(), startup: AtomicBool::default(),
mem_budget_mb, mem_budget_mb,
_phantom: std::marker::PhantomData::<T>::default(), _threads: threads,
} }
} }
@ -195,7 +195,7 @@ pub mod tests {
fn test_next_bucket_to_flush() { fn test_next_bucket_to_flush() {
solana_logger::setup(); solana_logger::setup();
let bins = 4; let bins = 4;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default())); let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
let visited = (0..bins) let visited = (0..bins)
.into_iter() .into_iter()
.map(|_| AtomicUsize::default()) .map(|_| AtomicUsize::default())
@ -219,7 +219,7 @@ pub mod tests {
fn test_age_increment() { fn test_age_increment() {
solana_logger::setup(); solana_logger::setup();
let bins = 4; let bins = 4;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default())); let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
for age in 0..513 { for age in 0..513 {
assert_eq!(test.current_age(), (age % 256) as Age); assert_eq!(test.current_age(), (age % 256) as Age);
@ -239,7 +239,7 @@ pub mod tests {
fn test_age_time() { fn test_age_time() {
solana_logger::setup(); solana_logger::setup();
let bins = 1; let bins = 1;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default())); let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
let threads = 2; let threads = 2;
let time = AGE_MS * 5 / 2; let time = AGE_MS * 5 / 2;
let expected = (time / AGE_MS) as Age; let expected = (time / AGE_MS) as Age;
@ -259,7 +259,7 @@ pub mod tests {
fn test_age_broad() { fn test_age_broad() {
solana_logger::setup(); solana_logger::setup();
let bins = 4; let bins = 4;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default())); let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
assert_eq!(test.current_age(), 0); assert_eq!(test.current_age(), 0);
for _ in 0..bins { for _ in 0..bins {
assert!(!test.all_buckets_flushed_at_current_age()); assert!(!test.all_buckets_flushed_at_current_age());

View File

@ -717,6 +717,7 @@ mod tests {
let holder = Arc::new(BucketMapHolder::new( let holder = Arc::new(BucketMapHolder::new(
BINS_FOR_TESTING, BINS_FOR_TESTING,
&Some(AccountsIndexConfig::default()), &Some(AccountsIndexConfig::default()),
1,
)); ));
let bin = 0; let bin = 0;
InMemAccountsIndex::new(&holder, bin) InMemAccountsIndex::new(&holder, bin)