AcctIdx: introduce BucketApi for access to a specific bucket (#20359)

This commit is contained in:
Jeff Washington (jwash)
2021-10-04 12:48:09 -04:00
committed by GitHub
parent fb8a7cfa92
commit 8da2eb980a
4 changed files with 194 additions and 113 deletions

View File

@ -6,6 +6,7 @@ use crate::bucket_map_holder::{Age, BucketMapHolder};
use crate::bucket_map_holder_stats::BucketMapHolderStats;
use rand::thread_rng;
use rand::Rng;
use solana_bucket_map::bucket_api::BucketApi;
use solana_measure::measure::Measure;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::collections::{hash_map::Entry, HashMap};
@ -28,6 +29,8 @@ pub struct InMemAccountsIndex<T: IndexValue> {
storage: Arc<BucketMapHolder<T>>,
bin: usize,
bucket: Option<Arc<BucketApi<SlotT<T>>>>,
// pubkey ranges that this bin must hold in the cache while the range is present in this vec
pub(crate) cache_ranges_held: CacheRangesHeld,
// true while ranges are being manipulated. Used to keep an async flush from removing things while a range is being held.
@ -51,6 +54,11 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
map_internal: RwLock::default(),
storage: Arc::clone(storage),
bin,
bucket: storage
.disk
.as_ref()
.map(|disk| disk.get_bucket_from_index(bin))
.map(Arc::clone),
cache_ranges_held: CacheRangesHeld::default(),
stop_flush: AtomicU64::default(),
bin_dirty: AtomicBool::default(),
@ -111,7 +119,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}
fn load_from_disk(&self, pubkey: &Pubkey) -> Option<(SlotList<T>, RefCount)> {
self.storage.disk.as_ref().and_then(|disk| {
self.bucket.as_ref().and_then(|disk| {
let m = Measure::start("load_disk_found_count");
let entry_disk = disk.read_value(pubkey);
match &entry_disk {
@ -209,7 +217,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}
fn delete_disk_key(&self, pubkey: &Pubkey) {
if let Some(disk) = self.storage.disk.as_ref() {
if let Some(disk) = self.bucket.as_ref() {
disk.delete_key(pubkey)
}
}
@ -579,8 +587,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
let m = Measure::start("range");
// load from disk
if let Some(disk) = self.storage.disk.as_ref() {
let items = disk.items_in_range(self.bin, range);
if let Some(disk) = self.bucket.as_ref() {
let items = disk.items_in_range(range);
let mut map = self.map().write().unwrap();
let future_age = self.storage.future_age_to_flush();
for item in items {
@ -668,7 +676,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
loop {
let mut removes = Vec::default();
let mut removes_random = Vec::default();
let disk = self.storage.disk.as_ref().unwrap();
let disk = self.bucket.as_ref().unwrap();
let mut updates = Vec::default();
let m = Measure::start("flush_scan");
@ -706,11 +714,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed
}
flush_entries_updated_on_disk += 1;
disk_resize = disk.try_insert(
self.bin,
&k,
(&v.slot_list.read().unwrap(), v.ref_count()),
);
disk_resize =
disk.try_write(&k, (&v.slot_list.read().unwrap(), v.ref_count()));
}
if disk_resize.is_err() {
// disk needs to resize, so mark all unprocessed items as dirty again so we pick them up after the resize
@ -745,7 +750,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
Err(err) => {
// grow the bucket, outside of all in-mem locks.
// then, loop to try again
disk.grow(self.bin, err);
disk.grow(err);
}
}
}