diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs index 442891393d..0b314cfdc0 100644 --- a/runtime/src/bucket_map_holder.rs +++ b/runtime/src/bucket_map_holder.rs @@ -1,10 +1,17 @@ use crate::accounts_index::IndexValue; use crate::bucket_map_holder_stats::BucketMapHolderStats; +use crate::waitable_condvar::WaitableCondvar; use std::fmt::Debug; +use std::sync::Mutex; // will eventually hold the bucket map pub struct BucketMapHolder { pub stats: BucketMapHolderStats, + + // used by bg processing to know when any bucket has become dirty + pub wait_dirty_bucket: WaitableCondvar, + next_bucket_to_flush: Mutex, + bins: usize, _phantom: std::marker::PhantomData, } @@ -14,11 +21,57 @@ impl Debug for BucketMapHolder { } } +#[allow(clippy::mutex_atomic)] impl BucketMapHolder { - pub fn new(_bins: usize) -> Self { + pub fn new(bins: usize) -> Self { Self { stats: BucketMapHolderStats::default(), + wait_dirty_bucket: WaitableCondvar::default(), + next_bucket_to_flush: Mutex::new(0), + bins, _phantom: std::marker::PhantomData::::default(), } } + + // get the next bucket to flush, with the idea that the previous bucket + // is perhaps being flushed by another thread already. + pub fn next_bucket_to_flush(&self) -> usize { + // could be lock-free as an optimization + // wrapping is tricky + let mut lock = self.next_bucket_to_flush.lock().unwrap(); + let result = *lock; + *lock = (result + 1) % self.bins; + result + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use rayon::prelude::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[test] + fn test_next_bucket_to_flush() { + solana_logger::setup(); + let bins = 4; + let test = BucketMapHolder::::new(bins); + let visited = (0..bins) + .into_iter() + .map(|_| AtomicUsize::default()) + .collect::>(); + let iterations = bins * 30; + let threads = bins * 4; + let expected = threads * iterations / bins; + + (0..threads).into_par_iter().for_each(|_| { + (0..iterations).into_iter().for_each(|_| { + let bin = test.next_bucket_to_flush(); + visited[bin].fetch_add(1, Ordering::Relaxed); + }); + }); + visited.iter().enumerate().for_each(|(bin, visited)| { + assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {}", bin) + }); + } }