AcctIdx: age adds time component (#20024)
This commit is contained in:
committed by
GitHub
parent
3b92210f5c
commit
e9ee90a121
@ -3,11 +3,16 @@ use crate::bucket_map_holder_stats::BucketMapHolderStats;
|
|||||||
use crate::in_mem_accounts_index::SlotT;
|
use crate::in_mem_accounts_index::SlotT;
|
||||||
use crate::waitable_condvar::WaitableCondvar;
|
use crate::waitable_condvar::WaitableCondvar;
|
||||||
use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig};
|
use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig};
|
||||||
|
use solana_sdk::clock::SLOT_MS;
|
||||||
|
use solana_sdk::timing::AtomicInterval;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
pub type Age = u8;
|
pub type Age = u8;
|
||||||
|
|
||||||
|
const AGE_MS: u64 = SLOT_MS; // match one age per slot time
|
||||||
|
|
||||||
|
// will eventually hold the bucket map
|
||||||
pub struct BucketMapHolder<T: IndexValue> {
|
pub struct BucketMapHolder<T: IndexValue> {
|
||||||
pub disk: Option<BucketMap<SlotT<T>>>,
|
pub disk: Option<BucketMap<SlotT<T>>>,
|
||||||
|
|
||||||
@ -15,6 +20,8 @@ pub struct BucketMapHolder<T: IndexValue> {
|
|||||||
pub age: AtomicU8,
|
pub age: AtomicU8,
|
||||||
pub stats: BucketMapHolderStats,
|
pub stats: BucketMapHolderStats,
|
||||||
|
|
||||||
|
age_timer: AtomicInterval,
|
||||||
|
|
||||||
// used by bg processing to know when any bucket has become dirty
|
// used by bg processing to know when any bucket has become dirty
|
||||||
pub wait_dirty_or_aged: WaitableCondvar,
|
pub wait_dirty_or_aged: WaitableCondvar,
|
||||||
next_bucket_to_flush: Mutex<usize>,
|
next_bucket_to_flush: Mutex<usize>,
|
||||||
@ -54,6 +61,12 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||||||
pub fn future_age_to_flush(&self) -> Age {
|
pub fn future_age_to_flush(&self) -> Age {
|
||||||
self.current_age().wrapping_add(self.ages_to_stay_in_cache)
|
self.current_age().wrapping_add(self.ages_to_stay_in_cache)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn has_age_interval_elapsed(&self) -> bool {
|
||||||
|
// note that when this returns true, state of age_timer is modified
|
||||||
|
self.age_timer.should_update(AGE_MS)
|
||||||
|
}
|
||||||
|
|
||||||
/// used by bg processes to determine # active threads and how aggressively to flush
|
/// used by bg processes to determine # active threads and how aggressively to flush
|
||||||
pub fn get_startup(&self) -> bool {
|
pub fn get_startup(&self) -> bool {
|
||||||
self.startup.load(Ordering::Relaxed)
|
self.startup.load(Ordering::Relaxed)
|
||||||
@ -83,6 +96,16 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||||||
self.count_ages_flushed.load(Ordering::Relaxed) >= self.bins
|
self.count_ages_flushed.load(Ordering::Relaxed) >= self.bins
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn maybe_advance_age(&self) -> bool {
|
||||||
|
// check has_age_interval_elapsed last as calling it modifies state on success
|
||||||
|
if self.all_buckets_flushed_at_current_age() && self.has_age_interval_elapsed() {
|
||||||
|
self.increment_age();
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self {
|
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self {
|
||||||
const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
|
const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
|
||||||
let ages_to_stay_in_cache = config
|
let ages_to_stay_in_cache = config
|
||||||
@ -104,6 +127,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||||||
stats: BucketMapHolderStats::default(),
|
stats: BucketMapHolderStats::default(),
|
||||||
wait_dirty_or_aged: WaitableCondvar::default(),
|
wait_dirty_or_aged: WaitableCondvar::default(),
|
||||||
next_bucket_to_flush: Mutex::new(0),
|
next_bucket_to_flush: Mutex::new(0),
|
||||||
|
age_timer: AtomicInterval::default(),
|
||||||
bins,
|
bins,
|
||||||
startup: AtomicBool::default(),
|
startup: AtomicBool::default(),
|
||||||
mem_budget_mb,
|
mem_budget_mb,
|
||||||
@ -128,6 +152,7 @@ pub mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_next_bucket_to_flush() {
|
fn test_next_bucket_to_flush() {
|
||||||
@ -171,6 +196,26 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_age_time() {
|
||||||
|
solana_logger::setup();
|
||||||
|
let bins = 1;
|
||||||
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()));
|
||||||
|
let threads = 2;
|
||||||
|
let time = AGE_MS * 5 / 2;
|
||||||
|
let expected = (time / AGE_MS) as Age;
|
||||||
|
let now = Instant::now();
|
||||||
|
test.bucket_flushed_at_current_age(); // done with age 0
|
||||||
|
(0..threads).into_par_iter().for_each(|_| {
|
||||||
|
while now.elapsed().as_millis() < (time as u128) {
|
||||||
|
if test.maybe_advance_age() {
|
||||||
|
test.bucket_flushed_at_current_age();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert_eq!(test.current_age(), expected);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_age_broad() {
|
fn test_age_broad() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
|
Reference in New Issue
Block a user