AcctIdx: add some bucket flush helpers (#19965)
This commit is contained in:
committed by
GitHub
parent
9eb98adf97
commit
376b20c3d2
@ -1,10 +1,17 @@
|
|||||||
use crate::accounts_index::IndexValue;
|
use crate::accounts_index::IndexValue;
|
||||||
use crate::bucket_map_holder_stats::BucketMapHolderStats;
|
use crate::bucket_map_holder_stats::BucketMapHolderStats;
|
||||||
|
use crate::waitable_condvar::WaitableCondvar;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
// will eventually hold the bucket map
|
// will eventually hold the bucket map
|
||||||
pub struct BucketMapHolder<T: IndexValue> {
|
pub struct BucketMapHolder<T: IndexValue> {
|
||||||
pub stats: BucketMapHolderStats,
|
pub stats: BucketMapHolderStats,
|
||||||
|
|
||||||
|
// used by bg processing to know when any bucket has become dirty
|
||||||
|
pub wait_dirty_bucket: WaitableCondvar,
|
||||||
|
next_bucket_to_flush: Mutex<usize>,
|
||||||
|
bins: usize,
|
||||||
_phantom: std::marker::PhantomData<T>,
|
_phantom: std::marker::PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -14,11 +21,57 @@ impl<T: IndexValue> Debug for BucketMapHolder<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::mutex_atomic)]
|
||||||
impl<T: IndexValue> BucketMapHolder<T> {
|
impl<T: IndexValue> BucketMapHolder<T> {
|
||||||
pub fn new(_bins: usize) -> Self {
|
pub fn new(bins: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
stats: BucketMapHolderStats::default(),
|
stats: BucketMapHolderStats::default(),
|
||||||
|
wait_dirty_bucket: WaitableCondvar::default(),
|
||||||
|
next_bucket_to_flush: Mutex::new(0),
|
||||||
|
bins,
|
||||||
_phantom: std::marker::PhantomData::<T>::default(),
|
_phantom: std::marker::PhantomData::<T>::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get the next bucket to flush, with the idea that the previous bucket
|
||||||
|
// is perhaps being flushed by another thread already.
|
||||||
|
pub fn next_bucket_to_flush(&self) -> usize {
|
||||||
|
// could be lock-free as an optimization
|
||||||
|
// wrapping is tricky
|
||||||
|
let mut lock = self.next_bucket_to_flush.lock().unwrap();
|
||||||
|
let result = *lock;
|
||||||
|
*lock = (result + 1) % self.bins;
|
||||||
|
result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub mod tests {
|
||||||
|
use super::*;
|
||||||
|
use rayon::prelude::*;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_next_bucket_to_flush() {
|
||||||
|
solana_logger::setup();
|
||||||
|
let bins = 4;
|
||||||
|
let test = BucketMapHolder::<u64>::new(bins);
|
||||||
|
let visited = (0..bins)
|
||||||
|
.into_iter()
|
||||||
|
.map(|_| AtomicUsize::default())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let iterations = bins * 30;
|
||||||
|
let threads = bins * 4;
|
||||||
|
let expected = threads * iterations / bins;
|
||||||
|
|
||||||
|
(0..threads).into_par_iter().for_each(|_| {
|
||||||
|
(0..iterations).into_iter().for_each(|_| {
|
||||||
|
let bin = test.next_bucket_to_flush();
|
||||||
|
visited[bin].fetch_add(1, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
visited.iter().enumerate().for_each(|(bin, visited)| {
|
||||||
|
assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {}", bin)
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user