AcctIdx: factor 'scan' out of flush_internal (#23777)
This commit is contained in:
committed by
GitHub
parent
f34434f96b
commit
258db77100
@ -64,6 +64,12 @@ pub enum InsertNewEntryResults {
|
|||||||
ExistedNewEntryNonZeroLamports,
|
ExistedNewEntryNonZeroLamports,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct FlushScanResult<T> {
|
||||||
|
evictions: Vec<Pubkey>,
|
||||||
|
evictions_random: Vec<Pubkey>,
|
||||||
|
dirty_items: Vec<(Pubkey, AccountMapEntry<T>)>,
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(dead_code)] // temporary during staging
|
#[allow(dead_code)] // temporary during staging
|
||||||
impl<T: IndexValue> InMemAccountsIndex<T> {
|
impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
pub fn new(storage: &Arc<BucketMapHolder<T>>, bin: usize) -> Self {
|
pub fn new(storage: &Arc<BucketMapHolder<T>>, bin: usize) -> Self {
|
||||||
@ -927,41 +933,21 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn flush_internal(&self, _flush_guard: &FlushGuard) {
|
/// scan loop
|
||||||
let current_age = self.storage.current_age();
|
/// holds read lock
|
||||||
let iterate_for_age = self.get_should_age(current_age);
|
/// identifies items which are dirty and items to evict
|
||||||
let startup = self.storage.get_startup();
|
fn flush_scan(
|
||||||
if !iterate_for_age && !startup {
|
&self,
|
||||||
// no need to age, so no need to flush this bucket
|
current_age: Age,
|
||||||
// but, at startup we want to evict from buckets as fast as possible if any items exist
|
startup: bool,
|
||||||
return;
|
_flush_guard: &FlushGuard,
|
||||||
}
|
) -> FlushScanResult<T> {
|
||||||
|
let exceeds_budget = self.get_exceeds_budget();
|
||||||
let in_mem_count = self.stats().count_in_mem.load(Ordering::Relaxed);
|
|
||||||
let limit = self.storage.mem_budget_mb;
|
|
||||||
let estimate_mem = in_mem_count * Self::approx_size_of_one_entry();
|
|
||||||
let exceeds_budget = limit
|
|
||||||
.map(|limit| estimate_mem >= limit * 1024 * 1024)
|
|
||||||
.unwrap_or_default();
|
|
||||||
self.stats()
|
|
||||||
.estimate_mem
|
|
||||||
.store(estimate_mem as u64, Ordering::Relaxed);
|
|
||||||
|
|
||||||
// may have to loop if disk has to grow and we have to restart
|
|
||||||
{
|
|
||||||
let mut dirty_items;
|
|
||||||
let mut evictions;
|
|
||||||
let mut evictions_random = Vec::default();
|
|
||||||
let disk = self.bucket.as_ref().unwrap();
|
|
||||||
|
|
||||||
let mut flush_entries_updated_on_disk = 0;
|
|
||||||
let mut flush_should_evict_us = 0;
|
|
||||||
// scan loop
|
|
||||||
// holds read lock
|
|
||||||
{
|
|
||||||
let map = self.map().read().unwrap();
|
let map = self.map().read().unwrap();
|
||||||
evictions = Vec::with_capacity(map.len());
|
let mut evictions = Vec::with_capacity(map.len());
|
||||||
dirty_items = Vec::with_capacity(map.len());
|
let mut evictions_random = Vec::default();
|
||||||
|
let mut dirty_items = Vec::with_capacity(map.len());
|
||||||
|
let mut flush_should_evict_us = 0;
|
||||||
let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
|
let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
|
||||||
for (k, v) in map.iter() {
|
for (k, v) in map.iter() {
|
||||||
let mut mse = Measure::start("flush_should_evict");
|
let mut mse = Measure::start("flush_should_evict");
|
||||||
@ -994,7 +980,36 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Self::update_time_stat(&self.stats().flush_scan_us, m);
|
Self::update_time_stat(&self.stats().flush_scan_us, m);
|
||||||
|
Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us);
|
||||||
|
|
||||||
|
FlushScanResult {
|
||||||
|
evictions,
|
||||||
|
evictions_random,
|
||||||
|
dirty_items,
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// synchronize the in-mem index with the disk index
|
||||||
|
fn flush_internal(&self, flush_guard: &FlushGuard) {
|
||||||
|
let current_age = self.storage.current_age();
|
||||||
|
let iterate_for_age = self.get_should_age(current_age);
|
||||||
|
let startup = self.storage.get_startup();
|
||||||
|
if !iterate_for_age && !startup {
|
||||||
|
// no need to age, so no need to flush this bucket
|
||||||
|
// but, at startup we want to evict from buckets as fast as possible if any items exist
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// may have to loop if disk has to grow and we have to restart
|
||||||
|
{
|
||||||
|
let disk = self.bucket.as_ref().unwrap();
|
||||||
|
|
||||||
|
let mut flush_entries_updated_on_disk = 0;
|
||||||
|
let FlushScanResult {
|
||||||
|
evictions,
|
||||||
|
evictions_random,
|
||||||
|
dirty_items,
|
||||||
|
} = self.flush_scan(current_age, startup, flush_guard);
|
||||||
{
|
{
|
||||||
// write to disk outside giant read lock
|
// write to disk outside giant read lock
|
||||||
let m = Measure::start("flush_update"); // we don't care about lock time in this metric - bg threads can wait
|
let m = Measure::start("flush_update"); // we don't care about lock time in this metric - bg threads can wait
|
||||||
@ -1024,7 +1039,6 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||||||
}
|
}
|
||||||
Self::update_time_stat(&self.stats().flush_update_us, m);
|
Self::update_time_stat(&self.stats().flush_update_us, m);
|
||||||
}
|
}
|
||||||
Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us);
|
|
||||||
|
|
||||||
Self::update_stat(
|
Self::update_stat(
|
||||||
&self.stats().flush_entries_updated_on_disk,
|
&self.stats().flush_entries_updated_on_disk,
|
||||||
@ -1044,6 +1058,21 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// calculate the estimated size of the in-mem index
|
||||||
|
/// return whether the size exceeds the specified budget
|
||||||
|
fn get_exceeds_budget(&self) -> bool {
|
||||||
|
let in_mem_count = self.stats().count_in_mem.load(Ordering::Relaxed);
|
||||||
|
let limit = self.storage.mem_budget_mb;
|
||||||
|
let estimate_mem = in_mem_count * Self::approx_size_of_one_entry();
|
||||||
|
let exceeds_budget = limit
|
||||||
|
.map(|limit| estimate_mem >= limit * 1024 * 1024)
|
||||||
|
.unwrap_or_default();
|
||||||
|
self.stats()
|
||||||
|
.estimate_mem
|
||||||
|
.store(estimate_mem as u64, Ordering::Relaxed);
|
||||||
|
exceeds_budget
|
||||||
|
}
|
||||||
|
|
||||||
/// for each key in 'keys', look up in map, set age to the future
|
/// for each key in 'keys', look up in map, set age to the future
|
||||||
fn move_ages_to_future(&self, next_age: Age, current_age: Age, keys: &[Pubkey]) {
|
fn move_ages_to_future(&self, next_age: Age, current_age: Age, keys: &[Pubkey]) {
|
||||||
let map = self.map().read().unwrap();
|
let map = self.map().read().unwrap();
|
||||||
@ -1120,7 +1149,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// all conditions for removing succeeded, so really evict item from in-mem cache
|
// all conditions for eviction succeeded, so really evict item from in-mem cache
|
||||||
evicted += 1;
|
evicted += 1;
|
||||||
occupied.remove();
|
occupied.remove();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user