AcctIdx: bg loop sleeps when all buckets flushed at current age (#20168)
This commit is contained in:
committed by
GitHub
parent
88fd9670df
commit
122206dbb1
@ -158,14 +158,15 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||||||
let bins = in_mem.len();
|
let bins = in_mem.len();
|
||||||
let flush = self.disk.is_some();
|
let flush = self.disk.is_some();
|
||||||
loop {
|
loop {
|
||||||
let mut m = Measure::start("wait");
|
if self.all_buckets_flushed_at_current_age() {
|
||||||
// this will transition to waits and thread throttling
|
let mut m = Measure::start("wait");
|
||||||
self.wait_dirty_or_aged
|
self.wait_dirty_or_aged
|
||||||
.wait_timeout(Duration::from_millis(AGE_MS));
|
.wait_timeout(Duration::from_millis(AGE_MS));
|
||||||
m.stop();
|
m.stop();
|
||||||
self.stats
|
self.stats
|
||||||
.bg_waiting_us
|
.bg_waiting_us
|
||||||
.fetch_add(m.as_us(), Ordering::Relaxed);
|
.fetch_add(m.as_us(), Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
@ -178,6 +179,9 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||||||
in_mem[index].flush();
|
in_mem[index].flush();
|
||||||
}
|
}
|
||||||
self.stats.report_stats(self);
|
self.stats.report_stats(self);
|
||||||
|
if self.all_buckets_flushed_at_current_age() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
|
self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user