AcctIdx: cleanup bg threads (#20731)
This commit is contained in:
		
				
					committed by
					
						
						GitHub
					
				
			
			
				
	
			
			
			
						parent
						
							254ad45878
						
					
				
				
					commit
					47de4f31b6
				
			@@ -1,27 +1,25 @@
 | 
			
		||||
use crate::accounts_index::{AccountsIndexConfig, IndexValue};
 | 
			
		||||
use crate::bucket_map_holder::BucketMapHolder;
 | 
			
		||||
use crate::in_mem_accounts_index::InMemAccountsIndex;
 | 
			
		||||
use crate::waitable_condvar::WaitableCondvar;
 | 
			
		||||
use std::fmt::Debug;
 | 
			
		||||
use std::{
 | 
			
		||||
    sync::{
 | 
			
		||||
        atomic::{AtomicBool, Ordering},
 | 
			
		||||
        Arc,
 | 
			
		||||
        Arc, Mutex,
 | 
			
		||||
    },
 | 
			
		||||
    thread::{Builder, JoinHandle},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
// eventually hold the bucket map
 | 
			
		||||
// Also manages the lifetime of the background processing threads.
 | 
			
		||||
//  When this instance is dropped, it will drop the bucket map and cleanup
 | 
			
		||||
//  and it will stop all the background threads and join them.
 | 
			
		||||
/// Manages the lifetime of the background processing threads.
 | 
			
		||||
pub struct AccountsIndexStorage<T: IndexValue> {
 | 
			
		||||
    // for managing the bg threads
 | 
			
		||||
    exit: Arc<AtomicBool>,
 | 
			
		||||
    handles: Option<Vec<JoinHandle<()>>>,
 | 
			
		||||
    _bg_threads: BgThreads,
 | 
			
		||||
 | 
			
		||||
    // eventually the backing storage
 | 
			
		||||
    pub storage: Arc<BucketMapHolder<T>>,
 | 
			
		||||
    pub in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
 | 
			
		||||
 | 
			
		||||
    /// set_startup(true) creates bg threads which are kept alive until set_startup(false)
 | 
			
		||||
    startup_worker_threads: Mutex<Option<BgThreads>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: IndexValue> Debug for AccountsIndexStorage<T> {
 | 
			
		||||
@@ -30,10 +28,17 @@ impl<T: IndexValue> Debug for AccountsIndexStorage<T> {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: IndexValue> Drop for AccountsIndexStorage<T> {
 | 
			
		||||
/// low-level managing the bg threads
 | 
			
		||||
struct BgThreads {
 | 
			
		||||
    exit: Arc<AtomicBool>,
 | 
			
		||||
    handles: Option<Vec<JoinHandle<()>>>,
 | 
			
		||||
    wait: Arc<WaitableCondvar>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Drop for BgThreads {
 | 
			
		||||
    fn drop(&mut self) {
 | 
			
		||||
        self.exit.store(true, Ordering::Relaxed);
 | 
			
		||||
        self.storage.wait_dirty_or_aged.notify_all();
 | 
			
		||||
        self.wait.notify_all();
 | 
			
		||||
        if let Some(handles) = self.handles.take() {
 | 
			
		||||
            handles
 | 
			
		||||
                .into_iter()
 | 
			
		||||
@@ -42,34 +47,23 @@ impl<T: IndexValue> Drop for AccountsIndexStorage<T> {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: IndexValue> AccountsIndexStorage<T> {
 | 
			
		||||
    pub fn add_worker_threads(existing: &Self, threads: usize) -> Self {
 | 
			
		||||
        Self::allocate(
 | 
			
		||||
            Arc::clone(&existing.storage),
 | 
			
		||||
            existing.in_mem.clone(),
 | 
			
		||||
            threads,
 | 
			
		||||
        )
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn set_startup(&self, value: bool) {
 | 
			
		||||
        self.storage.set_startup(self, value);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn allocate(
 | 
			
		||||
        storage: Arc<BucketMapHolder<T>>,
 | 
			
		||||
        in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
 | 
			
		||||
impl BgThreads {
 | 
			
		||||
    fn new<T: IndexValue>(
 | 
			
		||||
        storage: &Arc<BucketMapHolder<T>>,
 | 
			
		||||
        in_mem: &[Arc<InMemAccountsIndex<T>>],
 | 
			
		||||
        threads: usize,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        // stop signal used for THIS batch of bg threads
 | 
			
		||||
        let exit = Arc::new(AtomicBool::default());
 | 
			
		||||
        let handles = Some(
 | 
			
		||||
            (0..threads)
 | 
			
		||||
                .into_iter()
 | 
			
		||||
                .map(|_| {
 | 
			
		||||
                    let storage_ = Arc::clone(&storage);
 | 
			
		||||
                    let storage_ = Arc::clone(storage);
 | 
			
		||||
                    let exit_ = Arc::clone(&exit);
 | 
			
		||||
                    let in_mem_ = in_mem.clone();
 | 
			
		||||
                    let in_mem_ = in_mem.to_vec();
 | 
			
		||||
 | 
			
		||||
                    // note that rayon use here causes us to exhaust # rayon threads and many tests running in parallel deadlock
 | 
			
		||||
                    // note that using rayon here causes us to exhaust # rayon threads and many tests running in parallel deadlock
 | 
			
		||||
                    Builder::new()
 | 
			
		||||
                        .name("solana-idx-flusher".to_string())
 | 
			
		||||
                        .spawn(move || {
 | 
			
		||||
@@ -80,28 +74,58 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
 | 
			
		||||
                .collect(),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        Self {
 | 
			
		||||
        BgThreads {
 | 
			
		||||
            exit,
 | 
			
		||||
            handles,
 | 
			
		||||
            storage,
 | 
			
		||||
            in_mem,
 | 
			
		||||
            wait: Arc::clone(&storage.wait_dirty_or_aged),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: IndexValue> AccountsIndexStorage<T> {
 | 
			
		||||
    /// startup=true causes:
 | 
			
		||||
    ///      in mem to act in a way that flushes to disk asap
 | 
			
		||||
    ///      also creates some additional bg threads to facilitate flushing to disk asap
 | 
			
		||||
    /// startup=false is 'normal' operation
 | 
			
		||||
    pub fn set_startup(&self, value: bool) {
 | 
			
		||||
        if value {
 | 
			
		||||
            // create some additional bg threads to help get things to the disk index asap
 | 
			
		||||
            *self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
 | 
			
		||||
                &self.storage,
 | 
			
		||||
                &self.in_mem,
 | 
			
		||||
                Self::num_threads(),
 | 
			
		||||
            ));
 | 
			
		||||
        }
 | 
			
		||||
        self.storage.set_startup(value);
 | 
			
		||||
        if !value {
 | 
			
		||||
            // shutdown the bg threads
 | 
			
		||||
            *self.startup_worker_threads.lock().unwrap() = None;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn num_threads() -> usize {
 | 
			
		||||
        std::cmp::max(2, num_cpus::get() / 4)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// allocate BucketMapHolder and InMemAccountsIndex[]
 | 
			
		||||
    pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self {
 | 
			
		||||
        let num_threads = std::cmp::max(2, num_cpus::get() / 4);
 | 
			
		||||
        let threads = config
 | 
			
		||||
            .as_ref()
 | 
			
		||||
            .and_then(|config| config.flush_threads)
 | 
			
		||||
            .unwrap_or(num_threads);
 | 
			
		||||
            .unwrap_or_else(Self::num_threads);
 | 
			
		||||
 | 
			
		||||
        let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
 | 
			
		||||
        let storage = Arc::new(BucketMapHolder::new(bins, config));
 | 
			
		||||
 | 
			
		||||
        let in_mem = (0..bins)
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
 | 
			
		||||
            .collect::<Vec<_>>();
 | 
			
		||||
 | 
			
		||||
        Self::allocate(storage, in_mem, threads)
 | 
			
		||||
        Self {
 | 
			
		||||
            _bg_threads: BgThreads::new(&storage, &in_mem, threads),
 | 
			
		||||
            storage,
 | 
			
		||||
            in_mem,
 | 
			
		||||
            startup_worker_threads: Mutex::default(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,4 @@
 | 
			
		||||
use crate::accounts_index::{AccountsIndexConfig, IndexValue};
 | 
			
		||||
use crate::accounts_index_storage::AccountsIndexStorage;
 | 
			
		||||
use crate::bucket_map_holder_stats::BucketMapHolderStats;
 | 
			
		||||
use crate::in_mem_accounts_index::{InMemAccountsIndex, SlotT};
 | 
			
		||||
use crate::waitable_condvar::WaitableCondvar;
 | 
			
		||||
@@ -25,12 +24,10 @@ pub struct BucketMapHolder<T: IndexValue> {
 | 
			
		||||
    age_timer: AtomicInterval,
 | 
			
		||||
 | 
			
		||||
    // used by bg processing to know when any bucket has become dirty
 | 
			
		||||
    pub wait_dirty_or_aged: WaitableCondvar,
 | 
			
		||||
    pub wait_dirty_or_aged: Arc<WaitableCondvar>,
 | 
			
		||||
    next_bucket_to_flush: Mutex<usize>,
 | 
			
		||||
    bins: usize,
 | 
			
		||||
 | 
			
		||||
    _threads: usize,
 | 
			
		||||
 | 
			
		||||
    // how much mb are we allowed to keep in the in-mem index?
 | 
			
		||||
    // Rest goes to disk.
 | 
			
		||||
    pub mem_budget_mb: Option<usize>,
 | 
			
		||||
@@ -41,8 +38,6 @@ pub struct BucketMapHolder<T: IndexValue> {
 | 
			
		||||
    /// and writing to disk in parallel are.
 | 
			
		||||
    /// Note startup is an optimization and is not required for correctness.
 | 
			
		||||
    startup: AtomicBool,
 | 
			
		||||
 | 
			
		||||
    startup_worker_threads: Mutex<Option<AccountsIndexStorage<T>>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: IndexValue> Debug for BucketMapHolder<T> {
 | 
			
		||||
@@ -78,26 +73,25 @@ impl<T: IndexValue> BucketMapHolder<T> {
 | 
			
		||||
        self.startup.load(Ordering::Relaxed)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn set_startup(&self, storage: &AccountsIndexStorage<T>, value: bool) {
 | 
			
		||||
        if value {
 | 
			
		||||
            let num_threads = std::cmp::max(2, num_cpus::get() / 4);
 | 
			
		||||
            *self.startup_worker_threads.lock().unwrap() = Some(
 | 
			
		||||
                AccountsIndexStorage::add_worker_threads(storage, num_threads),
 | 
			
		||||
            );
 | 
			
		||||
        } else {
 | 
			
		||||
    /// startup=true causes:
 | 
			
		||||
    ///      in mem to act in a way that flushes to disk asap
 | 
			
		||||
    /// startup=false is 'normal' operation
 | 
			
		||||
    pub fn set_startup(&self, value: bool) {
 | 
			
		||||
        if !value {
 | 
			
		||||
            self.wait_for_idle();
 | 
			
		||||
            *self.startup_worker_threads.lock().unwrap() = None;
 | 
			
		||||
        }
 | 
			
		||||
        self.startup.store(value, Ordering::Relaxed)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// return when the bg threads have reached an 'idle' state
 | 
			
		||||
    pub(crate) fn wait_for_idle(&self) {
 | 
			
		||||
        assert!(self.get_startup());
 | 
			
		||||
        if self.disk.is_none() {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // when age has incremented twice, we know that we have made it through scanning all bins, so we are 'idle'
 | 
			
		||||
        // when age has incremented twice, we know that we have made it through scanning all bins since we started waiting,
 | 
			
		||||
        //  so we are then 'idle'
 | 
			
		||||
        let end_age = self.current_age().wrapping_add(2);
 | 
			
		||||
        loop {
 | 
			
		||||
            self.wait_dirty_or_aged
 | 
			
		||||
@@ -117,7 +111,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
 | 
			
		||||
        self.maybe_advance_age();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // have all buckets been flushed at the current age?
 | 
			
		||||
    /// have all buckets been flushed at the current age?
 | 
			
		||||
    pub fn all_buckets_flushed_at_current_age(&self) -> bool {
 | 
			
		||||
        self.count_ages_flushed() >= self.bins
 | 
			
		||||
    }
 | 
			
		||||
@@ -136,7 +130,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, threads: usize) -> Self {
 | 
			
		||||
    pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self {
 | 
			
		||||
        const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
 | 
			
		||||
        let ages_to_stay_in_cache = config
 | 
			
		||||
            .as_ref()
 | 
			
		||||
@@ -154,14 +148,12 @@ impl<T: IndexValue> BucketMapHolder<T> {
 | 
			
		||||
            count_ages_flushed: AtomicUsize::default(),
 | 
			
		||||
            age: AtomicU8::default(),
 | 
			
		||||
            stats: BucketMapHolderStats::new(bins),
 | 
			
		||||
            wait_dirty_or_aged: WaitableCondvar::default(),
 | 
			
		||||
            wait_dirty_or_aged: Arc::default(),
 | 
			
		||||
            next_bucket_to_flush: Mutex::new(0),
 | 
			
		||||
            age_timer: AtomicInterval::default(),
 | 
			
		||||
            bins,
 | 
			
		||||
            startup: AtomicBool::default(),
 | 
			
		||||
            mem_budget_mb,
 | 
			
		||||
            startup_worker_threads: Mutex::default(),
 | 
			
		||||
            _threads: threads,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -301,7 +293,7 @@ pub mod tests {
 | 
			
		||||
    fn test_next_bucket_to_flush() {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
        let bins = 4;
 | 
			
		||||
        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
 | 
			
		||||
        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()));
 | 
			
		||||
        let visited = (0..bins)
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .map(|_| AtomicUsize::default())
 | 
			
		||||
@@ -325,7 +317,7 @@ pub mod tests {
 | 
			
		||||
    fn test_age_increment() {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
        let bins = 4;
 | 
			
		||||
        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
 | 
			
		||||
        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()));
 | 
			
		||||
        for age in 0..513 {
 | 
			
		||||
            assert_eq!(test.current_age(), (age % 256) as Age);
 | 
			
		||||
 | 
			
		||||
@@ -345,7 +337,7 @@ pub mod tests {
 | 
			
		||||
    fn test_throttle() {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
        let bins = 100;
 | 
			
		||||
        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
 | 
			
		||||
        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()));
 | 
			
		||||
        let bins = test.bins as u64;
 | 
			
		||||
        let interval_ms = test.age_interval_ms();
 | 
			
		||||
        // 90% of time elapsed, all but 1 bins flushed, should not wait since we'll end up right on time
 | 
			
		||||
@@ -374,7 +366,7 @@ pub mod tests {
 | 
			
		||||
    fn test_age_time() {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
        let bins = 1;
 | 
			
		||||
        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
 | 
			
		||||
        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()));
 | 
			
		||||
        let threads = 2;
 | 
			
		||||
        let time = AGE_MS * 5 / 2;
 | 
			
		||||
        let expected = (time / AGE_MS) as Age;
 | 
			
		||||
@@ -394,7 +386,7 @@ pub mod tests {
 | 
			
		||||
    fn test_age_broad() {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
        let bins = 4;
 | 
			
		||||
        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
 | 
			
		||||
        let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()));
 | 
			
		||||
        assert_eq!(test.current_age(), 0);
 | 
			
		||||
        for _ in 0..bins {
 | 
			
		||||
            assert!(!test.all_buckets_flushed_at_current_age());
 | 
			
		||||
 
 | 
			
		||||
@@ -871,7 +871,6 @@ mod tests {
 | 
			
		||||
        let holder = Arc::new(BucketMapHolder::new(
 | 
			
		||||
            BINS_FOR_TESTING,
 | 
			
		||||
            &Some(AccountsIndexConfig::default()),
 | 
			
		||||
            1,
 | 
			
		||||
        ));
 | 
			
		||||
        let bin = 0;
 | 
			
		||||
        InMemAccountsIndex::new(&holder, bin)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user