diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs index 79256646a4..f8208cd177 100644 --- a/runtime/src/bucket_map_holder.rs +++ b/runtime/src/bucket_map_holder.rs @@ -3,11 +3,16 @@ use crate::bucket_map_holder_stats::BucketMapHolderStats; use crate::in_mem_accounts_index::SlotT; use crate::waitable_condvar::WaitableCondvar; use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig}; +use solana_sdk::clock::SLOT_MS; +use solana_sdk::timing::AtomicInterval; use std::fmt::Debug; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering}; use std::sync::Mutex; pub type Age = u8; +const AGE_MS: u64 = SLOT_MS; // match one age per slot time + +// will eventually hold the bucket map pub struct BucketMapHolder { pub disk: Option>>, @@ -15,6 +20,8 @@ pub struct BucketMapHolder { pub age: AtomicU8, pub stats: BucketMapHolderStats, + age_timer: AtomicInterval, + // used by bg processing to know when any bucket has become dirty pub wait_dirty_or_aged: WaitableCondvar, next_bucket_to_flush: Mutex, @@ -54,6 +61,12 @@ impl BucketMapHolder { pub fn future_age_to_flush(&self) -> Age { self.current_age().wrapping_add(self.ages_to_stay_in_cache) } + + fn has_age_interval_elapsed(&self) -> bool { + // note that when this returns true, state of age_timer is modified + self.age_timer.should_update(AGE_MS) + } + /// used by bg processes to determine # active threads and how aggressively to flush pub fn get_startup(&self) -> bool { self.startup.load(Ordering::Relaxed) @@ -83,6 +96,16 @@ impl BucketMapHolder { self.count_ages_flushed.load(Ordering::Relaxed) >= self.bins } + pub fn maybe_advance_age(&self) -> bool { + // check has_age_interval_elapsed last as calling it modifies state on success + if self.all_buckets_flushed_at_current_age() && self.has_age_interval_elapsed() { + self.increment_age(); + true + } else { + false + } + } + pub fn new(bins: usize, config: &Option) -> Self { const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5; let ages_to_stay_in_cache = config @@ -104,6 +127,7 @@ impl BucketMapHolder { stats: BucketMapHolderStats::default(), wait_dirty_or_aged: WaitableCondvar::default(), next_bucket_to_flush: Mutex::new(0), + age_timer: AtomicInterval::default(), bins, startup: AtomicBool::default(), mem_budget_mb, @@ -128,6 +152,7 @@ pub mod tests { use super::*; use rayon::prelude::*; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Instant; #[test] fn test_next_bucket_to_flush() { @@ -171,6 +196,26 @@ pub mod tests { } } + #[test] + fn test_age_time() { + solana_logger::setup(); + let bins = 1; + let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default())); + let threads = 2; + let time = AGE_MS * 5 / 2; + let expected = (time / AGE_MS) as Age; + let now = Instant::now(); + test.bucket_flushed_at_current_age(); // done with age 0 + (0..threads).into_par_iter().for_each(|_| { + while now.elapsed().as_millis() < (time as u128) { + if test.maybe_advance_age() { + test.bucket_flushed_at_current_age(); + } + } + }); + assert_eq!(test.current_age(), expected); + } + #[test] fn test_age_broad() { solana_logger::setup();