From e6934e724713338c31afda2f9d850ffa26abc33a Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Mon, 20 Sep 2021 09:58:20 -0500 Subject: [PATCH] AcctIdx: consolidate to correct CondVar (#20017) --- runtime/src/accounts_index_storage.rs | 14 +++++--------- runtime/src/bucket_map_holder.rs | 5 +++-- runtime/src/in_mem_accounts_index.rs | 2 ++ 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/runtime/src/accounts_index_storage.rs b/runtime/src/accounts_index_storage.rs index 0f31d3ef8c..9fe7756701 100644 --- a/runtime/src/accounts_index_storage.rs +++ b/runtime/src/accounts_index_storage.rs @@ -1,7 +1,6 @@ use crate::accounts_index::{AccountsIndexConfig, IndexValue}; use crate::bucket_map_holder::BucketMapHolder; use crate::in_mem_accounts_index::InMemAccountsIndex; -use crate::waitable_condvar::WaitableCondvar; use std::fmt::Debug; use std::time::Duration; use std::{ @@ -19,7 +18,6 @@ use std::{ pub struct AccountsIndexStorage { // for managing the bg threads exit: Arc, - wait: Arc, handles: Option>>, // eventually the backing storage @@ -36,7 +34,7 @@ impl Debug for AccountsIndexStorage { impl Drop for AccountsIndexStorage { fn drop(&mut self) { self.exit.store(true, Ordering::Relaxed); - self.wait.notify_all(); + self.storage.wait_dirty_or_aged.notify_all(); if let Some(handles) = self.handles.take() { handles .into_iter() @@ -61,21 +59,19 @@ impl AccountsIndexStorage { .unwrap_or(DEFAULT_THREADS); let exit = Arc::new(AtomicBool::default()); - let wait = Arc::new(WaitableCondvar::default()); let handles = Some( (0..threads) .into_iter() .map(|_| { let storage_ = Arc::clone(&storage); let exit_ = Arc::clone(&exit); - let wait_ = Arc::clone(&wait); let in_mem_ = in_mem.clone(); // note that rayon use here causes us to exhaust # rayon threads and many tests running in parallel deadlock Builder::new() .name("solana-idx-flusher".to_string()) .spawn(move || { - Self::background(storage_, exit_, wait_, in_mem_); + Self::background(storage_, exit_, in_mem_); }) .unwrap() }) @@ -84,7 +80,6 @@ impl AccountsIndexStorage { Self { exit, - wait, handles, storage, in_mem, @@ -99,12 +94,13 @@ impl AccountsIndexStorage { pub fn background( storage: Arc>, exit: Arc, - wait: Arc, in_mem: Vec>>, ) { loop { // this will transition to waits and thread throttling - wait.wait_timeout(Duration::from_millis(10000)); + storage + .wait_dirty_or_aged + .wait_timeout(Duration::from_millis(10000)); if exit.load(Ordering::Relaxed) { break; } diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs index 8f3d38ceca..79256646a4 100644 --- a/runtime/src/bucket_map_holder.rs +++ b/runtime/src/bucket_map_holder.rs @@ -16,7 +16,7 @@ pub struct BucketMapHolder { pub stats: BucketMapHolderStats, // used by bg processing to know when any bucket has become dirty - pub wait_dirty_bucket: WaitableCondvar, + pub wait_dirty_or_aged: WaitableCondvar, next_bucket_to_flush: Mutex, bins: usize, @@ -48,6 +48,7 @@ impl BucketMapHolder { // since we changed age, there are now 0 buckets that have been flushed at this age let previous = self.count_ages_flushed.swap(0, Ordering::Relaxed); assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed + self.wait_dirty_or_aged.notify_all(); // notify all because we can age scan in parallel } pub fn future_age_to_flush(&self) -> Age { @@ -101,7 +102,7 @@ impl BucketMapHolder { count_ages_flushed: AtomicUsize::default(), age: AtomicU8::default(), stats: BucketMapHolderStats::default(), - wait_dirty_bucket: WaitableCondvar::default(), + wait_dirty_or_aged: WaitableCondvar::default(), next_bucket_to_flush: Mutex::new(0), bins, startup: AtomicBool::default(), diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index de577e5da9..789e2a16d5 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -397,6 +397,8 @@ impl InMemAccountsIndex { pub fn set_bin_dirty(&self) { self.bin_dirty.store(true, Ordering::Release); + // 1 bin dirty, so only need 1 thread to wake up if many could be waiting + self.storage.wait_dirty_or_aged.notify_one(); } fn flush_internal(&self) {