AcctIdx: hold_range_in_memory (#19955)

This commit is contained in:
Jeff Washington (jwash)
2021-09-17 17:19:29 -05:00
committed by GitHub
parent 24b136a993
commit 4dc2f08198
5 changed files with 257 additions and 2 deletions

View File

@@ -6,12 +6,14 @@ use crate::bucket_map_holder_stats::BucketMapHolderStats;
use solana_measure::measure::Measure;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::collections::{hash_map::Entry, HashMap};
use std::ops::{Bound, RangeBounds, RangeInclusive};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::fmt::Debug;
use std::ops::RangeBounds;
type K = Pubkey;
type CacheRangesHeld = RwLock<Vec<Option<RangeInclusive<Pubkey>>>>;
pub type SlotT<T> = (Slot, T);
// one instance of this represents one bin of the accounts index.
pub struct InMemAccountsIndex<T: IndexValue> {
@@ -19,6 +21,11 @@ pub struct InMemAccountsIndex<T: IndexValue> {
map_internal: RwLock<HashMap<Pubkey, AccountMapEntry<T>>>,
storage: Arc<BucketMapHolder<T>>,
bin: usize,
// pubkey ranges that this bin must hold in the cache while the range is present in this vec
pub(crate) cache_ranges_held: CacheRangesHeld,
// true while ranges are being manipulated. Used to keep an async flush from removing things while a range is being held.
stop_flush: AtomicU64,
}
impl<T: IndexValue> Debug for InMemAccountsIndex<T> {
@@ -33,6 +40,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
map_internal: RwLock::default(),
storage: Arc::clone(storage),
bin,
cache_ranges_held: CacheRangesHeld::default(),
stop_flush: AtomicU64::default(),
}
}
@@ -264,6 +273,91 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
result
}
pub fn just_set_hold_range_in_memory<R>(&self, range: &R, start_holding: bool)
where
R: RangeBounds<Pubkey>,
{
let start = match range.start_bound() {
Bound::Included(bound) | Bound::Excluded(bound) => *bound,
Bound::Unbounded => Pubkey::new(&[0; 32]),
};
let end = match range.end_bound() {
Bound::Included(bound) | Bound::Excluded(bound) => *bound,
Bound::Unbounded => Pubkey::new(&[0xff; 32]),
};
// this becomes inclusive - that is ok - we are just roughly holding a range of items.
// inclusive is bigger than exclusive so we may hold 1 extra item worst case
let inclusive_range = Some(start..=end);
let mut ranges = self.cache_ranges_held.write().unwrap();
if start_holding {
ranges.push(inclusive_range);
} else {
// find the matching range and delete it since we don't want to hold it anymore
let none = inclusive_range.is_none();
for (i, r) in ranges.iter().enumerate() {
if r.is_none() != none {
continue;
}
if !none {
// neither are none, so check values
if let (Bound::Included(start_found), Bound::Included(end_found)) = r
.as_ref()
.map(|r| (r.start_bound(), r.end_bound()))
.unwrap()
{
if start_found != &start || end_found != &end {
continue;
}
}
}
// found a match. There may be dups, that's ok, we expect another call to remove the dup.
ranges.remove(i);
break;
}
}
}
fn start_stop_flush(&self, stop: bool) {
if stop {
self.stop_flush.fetch_add(1, Ordering::Acquire);
} else {
self.stop_flush.fetch_sub(1, Ordering::Release);
}
}
pub fn hold_range_in_memory<R>(&self, range: &R, start_holding: bool)
where
R: RangeBounds<Pubkey> + Debug,
{
self.start_stop_flush(true);
if start_holding {
// put everything in the cache and it will be held there
self.put_range_in_cache(Some(range));
}
// do this AFTER items have been put in cache - that way anyone who finds this range can know that the items are already in the cache
self.just_set_hold_range_in_memory(range, start_holding);
self.start_stop_flush(false);
}
fn put_range_in_cache<R>(&self, _range: Option<&R>)
where
R: RangeBounds<Pubkey>,
{
assert!(self.get_stop_flush()); // caller should be controlling the lifetime of how long this needs to be present
let m = Measure::start("range");
// load from disk here
Self::update_time_stat(&self.stats().get_range_us, m);
}
fn get_stop_flush(&self) -> bool {
self.stop_flush.load(Ordering::Relaxed) > 0
}
fn stats(&self) -> &BucketMapHolderStats {
&self.storage.stats
}
@@ -280,3 +374,65 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
Self::update_stat(stat, value);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::accounts_index::BINS_FOR_TESTING;
fn new_for_test<T: IndexValue>() -> InMemAccountsIndex<T> {
let holder = Arc::new(BucketMapHolder::new(BINS_FOR_TESTING));
InMemAccountsIndex::new(&holder, BINS_FOR_TESTING)
}
#[test]
fn test_hold_range_in_memory() {
let accts = new_for_test::<u64>();
// 0x81 is just some other range
let ranges = [
Pubkey::new(&[0; 32])..=Pubkey::new(&[0xff; 32]),
Pubkey::new(&[0x81; 32])..=Pubkey::new(&[0xff; 32]),
];
for range in ranges.clone() {
assert!(accts.cache_ranges_held.read().unwrap().is_empty());
accts.hold_range_in_memory(&range, true);
assert_eq!(
accts.cache_ranges_held.read().unwrap().to_vec(),
vec![Some(range.clone())]
);
accts.hold_range_in_memory(&range, false);
assert!(accts.cache_ranges_held.read().unwrap().is_empty());
accts.hold_range_in_memory(&range, true);
assert_eq!(
accts.cache_ranges_held.read().unwrap().to_vec(),
vec![Some(range.clone())]
);
accts.hold_range_in_memory(&range, true);
assert_eq!(
accts.cache_ranges_held.read().unwrap().to_vec(),
vec![Some(range.clone()), Some(range.clone())]
);
accts.hold_range_in_memory(&ranges[0], true);
assert_eq!(
accts.cache_ranges_held.read().unwrap().to_vec(),
vec![
Some(range.clone()),
Some(range.clone()),
Some(ranges[0].clone())
]
);
accts.hold_range_in_memory(&range, false);
assert_eq!(
accts.cache_ranges_held.read().unwrap().to_vec(),
vec![Some(range.clone()), Some(ranges[0].clone())]
);
accts.hold_range_in_memory(&range, false);
assert_eq!(
accts.cache_ranges_held.read().unwrap().to_vec(),
vec![Some(ranges[0].clone())]
);
accts.hold_range_in_memory(&ranges[0].clone(), false);
assert!(accts.cache_ranges_held.read().unwrap().is_empty());
}
}
}