Use AtomicUsize for next_bucket_to_flush (#22095)
This commit is contained in:
		@@ -12,7 +12,7 @@ use {
 | 
				
			|||||||
        fmt::Debug,
 | 
					        fmt::Debug,
 | 
				
			||||||
        sync::{
 | 
					        sync::{
 | 
				
			||||||
            atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
 | 
					            atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
 | 
				
			||||||
            Arc, Mutex,
 | 
					            Arc,
 | 
				
			||||||
        },
 | 
					        },
 | 
				
			||||||
        time::Duration,
 | 
					        time::Duration,
 | 
				
			||||||
    },
 | 
					    },
 | 
				
			||||||
@@ -32,7 +32,7 @@ pub struct BucketMapHolder<T: IndexValue> {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    // used by bg processing to know when any bucket has become dirty
 | 
					    // used by bg processing to know when any bucket has become dirty
 | 
				
			||||||
    pub wait_dirty_or_aged: Arc<WaitableCondvar>,
 | 
					    pub wait_dirty_or_aged: Arc<WaitableCondvar>,
 | 
				
			||||||
    next_bucket_to_flush: Mutex<usize>,
 | 
					    next_bucket_to_flush: AtomicUsize,
 | 
				
			||||||
    bins: usize,
 | 
					    bins: usize,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub threads: usize,
 | 
					    pub threads: usize,
 | 
				
			||||||
@@ -163,7 +163,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
 | 
				
			|||||||
            age: AtomicU8::default(),
 | 
					            age: AtomicU8::default(),
 | 
				
			||||||
            stats: BucketMapHolderStats::new(bins),
 | 
					            stats: BucketMapHolderStats::new(bins),
 | 
				
			||||||
            wait_dirty_or_aged: Arc::default(),
 | 
					            wait_dirty_or_aged: Arc::default(),
 | 
				
			||||||
            next_bucket_to_flush: Mutex::new(0),
 | 
					            next_bucket_to_flush: AtomicUsize::new(0),
 | 
				
			||||||
            age_timer: AtomicInterval::default(),
 | 
					            age_timer: AtomicInterval::default(),
 | 
				
			||||||
            bins,
 | 
					            bins,
 | 
				
			||||||
            startup: AtomicBool::default(),
 | 
					            startup: AtomicBool::default(),
 | 
				
			||||||
@@ -175,12 +175,11 @@ impl<T: IndexValue> BucketMapHolder<T> {
 | 
				
			|||||||
    // get the next bucket to flush, with the idea that the previous bucket
 | 
					    // get the next bucket to flush, with the idea that the previous bucket
 | 
				
			||||||
    // is perhaps being flushed by another thread already.
 | 
					    // is perhaps being flushed by another thread already.
 | 
				
			||||||
    pub fn next_bucket_to_flush(&self) -> usize {
 | 
					    pub fn next_bucket_to_flush(&self) -> usize {
 | 
				
			||||||
        // could be lock-free as an optimization
 | 
					        self.next_bucket_to_flush
 | 
				
			||||||
        // wrapping is tricky
 | 
					            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |bucket| {
 | 
				
			||||||
        let mut lock = self.next_bucket_to_flush.lock().unwrap();
 | 
					                Some((bucket + 1) % self.bins)
 | 
				
			||||||
        let result = *lock;
 | 
					            })
 | 
				
			||||||
        *lock = (result + 1) % self.bins;
 | 
					            .unwrap()
 | 
				
			||||||
        result
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /// prepare for this to be dynamic if necessary
 | 
					    /// prepare for this to be dynamic if necessary
 | 
				
			||||||
@@ -299,14 +298,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
#[cfg(test)]
 | 
					#[cfg(test)]
 | 
				
			||||||
pub mod tests {
 | 
					pub mod tests {
 | 
				
			||||||
    use {
 | 
					    use {super::*, rayon::prelude::*, std::time::Instant};
 | 
				
			||||||
        super::*,
 | 
					 | 
				
			||||||
        rayon::prelude::*,
 | 
					 | 
				
			||||||
        std::{
 | 
					 | 
				
			||||||
            sync::atomic::{AtomicUsize, Ordering},
 | 
					 | 
				
			||||||
            time::Instant,
 | 
					 | 
				
			||||||
        },
 | 
					 | 
				
			||||||
    };
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[test]
 | 
					    #[test]
 | 
				
			||||||
    fn test_next_bucket_to_flush() {
 | 
					    fn test_next_bucket_to_flush() {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user