AcctIdx: bucket perf improvements (#20328)
This commit is contained in:
committed by
GitHub
parent
59112adac1
commit
5e05f12c48
@ -198,6 +198,12 @@ impl<T: Clone + Copy> Bucket<T> {
|
|||||||
data: &[T],
|
data: &[T],
|
||||||
ref_count: u64,
|
ref_count: u64,
|
||||||
) -> Result<(), BucketMapError> {
|
) -> Result<(), BucketMapError> {
|
||||||
|
let best_fit_bucket = IndexEntry::data_bucket_from_num_slots(data.len() as u64);
|
||||||
|
if self.data.get(best_fit_bucket as usize).is_none() {
|
||||||
|
// fail early if the data bucket we need doesn't exist - we don't want the index entry partially allocated
|
||||||
|
//error!("resizing because missing bucket");
|
||||||
|
return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0)));
|
||||||
|
}
|
||||||
let index_entry = self.find_entry_mut(key);
|
let index_entry = self.find_entry_mut(key);
|
||||||
let (elem, elem_ix) = match index_entry {
|
let (elem, elem_ix) = match index_entry {
|
||||||
None => {
|
None => {
|
||||||
@ -213,11 +219,6 @@ impl<T: Clone + Copy> Bucket<T> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let elem_uid = self.index.uid(elem_ix);
|
let elem_uid = self.index.uid(elem_ix);
|
||||||
let best_fit_bucket = IndexEntry::data_bucket_from_num_slots(data.len() as u64);
|
|
||||||
if self.data.get(best_fit_bucket as usize).is_none() {
|
|
||||||
//error!("resizing because missing bucket");
|
|
||||||
return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0)));
|
|
||||||
}
|
|
||||||
let bucket_ix = elem.data_bucket_ix();
|
let bucket_ix = elem.data_bucket_ix();
|
||||||
let current_bucket = &self.data[bucket_ix as usize];
|
let current_bucket = &self.data[bucket_ix as usize];
|
||||||
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
|
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
|
||||||
@ -366,22 +367,27 @@ impl<T: Clone + Copy> Bucket<T> {
|
|||||||
//debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.capacity() );
|
//debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.capacity() );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// grow the appropriate piece
|
||||||
|
pub fn grow(&mut self, err: BucketMapError) {
|
||||||
|
match err {
|
||||||
|
BucketMapError::DataNoSpace(sz) => {
|
||||||
|
//debug!("GROWING SPACE {:?}", sz);
|
||||||
|
self.grow_data(sz);
|
||||||
|
}
|
||||||
|
BucketMapError::IndexNoSpace(sz) => {
|
||||||
|
//debug!("GROWING INDEX {}", sz);
|
||||||
|
self.grow_index(sz);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn insert(&mut self, key: &Pubkey, value: (&[T], RefCount)) {
|
pub fn insert(&mut self, key: &Pubkey, value: (&[T], RefCount)) {
|
||||||
let (new, refct) = value;
|
let (new, refct) = value;
|
||||||
loop {
|
loop {
|
||||||
let rv = self.try_write(key, new, refct);
|
let rv = self.try_write(key, new, refct);
|
||||||
match rv {
|
match rv {
|
||||||
Err(BucketMapError::DataNoSpace(sz)) => {
|
Ok(_) => return,
|
||||||
//debug!("GROWING SPACE {:?}", sz);
|
Err(err) => self.grow(err),
|
||||||
self.grow_data(sz);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(BucketMapError::IndexNoSpace(sz)) => {
|
|
||||||
//debug!("GROWING INDEX {}", sz);
|
|
||||||
self.grow_index(sz);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Ok(()) => return,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,7 @@ use std::fs;
|
|||||||
use std::ops::RangeBounds;
|
use std::ops::RangeBounds;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::RwLock;
|
use std::sync::{RwLock, RwLockWriteGuard};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone)]
|
#[derive(Debug, Default, Clone)]
|
||||||
@ -166,8 +166,12 @@ impl<T: Clone + Copy + Debug> BucketMap<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Update Pubkey `key`'s value with 'value'
|
/// Update Pubkey `key`'s value with 'value'
|
||||||
pub fn insert(&self, key: &Pubkey, value: (&[T], RefCount)) {
|
pub fn insert(&self, ix: usize, key: &Pubkey, value: (&[T], RefCount)) {
|
||||||
let ix = self.bucket_ix(key);
|
let mut bucket = self.get_bucket(ix);
|
||||||
|
bucket.as_mut().unwrap().insert(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_bucket(&self, ix: usize) -> RwLockWriteGuard<Option<Bucket<T>>> {
|
||||||
let mut bucket = self.buckets[ix].write().unwrap();
|
let mut bucket = self.buckets[ix].write().unwrap();
|
||||||
if bucket.is_none() {
|
if bucket.is_none() {
|
||||||
*bucket = Some(Bucket::new(
|
*bucket = Some(Bucket::new(
|
||||||
@ -176,8 +180,24 @@ impl<T: Clone + Copy + Debug> BucketMap<T> {
|
|||||||
Arc::clone(&self.stats),
|
Arc::clone(&self.stats),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
let bucket = bucket.as_mut().unwrap();
|
bucket
|
||||||
bucket.insert(key, value)
|
}
|
||||||
|
|
||||||
|
/// Update Pubkey `key`'s value with 'value'
|
||||||
|
pub fn try_insert(
|
||||||
|
&self,
|
||||||
|
ix: usize,
|
||||||
|
key: &Pubkey,
|
||||||
|
value: (&[T], RefCount),
|
||||||
|
) -> Result<(), BucketMapError> {
|
||||||
|
let mut bucket = self.get_bucket(ix);
|
||||||
|
bucket.as_mut().unwrap().try_write(key, value.0, value.1)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// if err is a grow error, then grow the appropriate piece
|
||||||
|
pub fn grow(&self, ix: usize, err: BucketMapError) {
|
||||||
|
let mut bucket = self.get_bucket(ix);
|
||||||
|
bucket.as_mut().unwrap().grow(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update Pubkey `key`'s value with function `updatefn`
|
/// Update Pubkey `key`'s value with function `updatefn`
|
||||||
@ -186,16 +206,8 @@ impl<T: Clone + Copy + Debug> BucketMap<T> {
|
|||||||
F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
|
F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
|
||||||
{
|
{
|
||||||
let ix = self.bucket_ix(key);
|
let ix = self.bucket_ix(key);
|
||||||
let mut bucket = self.buckets[ix].write().unwrap();
|
let mut bucket = self.get_bucket(ix);
|
||||||
if bucket.is_none() {
|
bucket.as_mut().unwrap().update(key, updatefn)
|
||||||
*bucket = Some(Bucket::new(
|
|
||||||
Arc::clone(&self.drives),
|
|
||||||
self.max_search,
|
|
||||||
Arc::clone(&self.stats),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
let bucket = bucket.as_mut().unwrap();
|
|
||||||
bucket.update(key, updatefn)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the bucket index for Pubkey `key`
|
/// Get the bucket index for Pubkey `key`
|
||||||
@ -247,11 +259,29 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn bucket_map_test_insert2() {
|
fn bucket_map_test_insert2() {
|
||||||
let key = Pubkey::new_unique();
|
for pass in 0..3 {
|
||||||
let config = BucketMapConfig::new(1 << 1);
|
let key = Pubkey::new_unique();
|
||||||
let index = BucketMap::new(config);
|
let config = BucketMapConfig::new(1 << 1);
|
||||||
index.insert(&key, (&[0], 0));
|
let index = BucketMap::new(config);
|
||||||
assert_eq!(index.read_value(&key), Some((vec![0], 0)));
|
let ix = index.bucket_ix(&key);
|
||||||
|
if pass == 0 {
|
||||||
|
index.insert(ix, &key, (&[0], 0));
|
||||||
|
} else {
|
||||||
|
let result = index.try_insert(ix, &key, (&[0], 0));
|
||||||
|
assert!(result.is_err());
|
||||||
|
assert_eq!(index.read_value(&key), None);
|
||||||
|
if pass == 2 {
|
||||||
|
// another call to try insert again - should still return an error
|
||||||
|
let result = index.try_insert(ix, &key, (&[0], 0));
|
||||||
|
assert!(result.is_err());
|
||||||
|
assert_eq!(index.read_value(&key), None);
|
||||||
|
}
|
||||||
|
index.grow(ix, result.unwrap_err());
|
||||||
|
let result = index.try_insert(ix, &key, (&[0], 0));
|
||||||
|
assert!(result.is_ok());
|
||||||
|
}
|
||||||
|
assert_eq!(index.read_value(&key), Some((vec![0], 0)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -259,9 +289,9 @@ mod tests {
|
|||||||
let key = Pubkey::new_unique();
|
let key = Pubkey::new_unique();
|
||||||
let config = BucketMapConfig::new(1 << 1);
|
let config = BucketMapConfig::new(1 << 1);
|
||||||
let index = BucketMap::new(config);
|
let index = BucketMap::new(config);
|
||||||
index.insert(&key, (&[0], 0));
|
index.insert(index.bucket_ix(&key), &key, (&[0], 0));
|
||||||
assert_eq!(index.read_value(&key), Some((vec![0], 0)));
|
assert_eq!(index.read_value(&key), Some((vec![0], 0)));
|
||||||
index.insert(&key, (&[1], 0));
|
index.insert(index.bucket_ix(&key), &key, (&[1], 0));
|
||||||
assert_eq!(index.read_value(&key), Some((vec![1], 0)));
|
assert_eq!(index.read_value(&key), Some((vec![1], 0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -475,7 +505,7 @@ mod tests {
|
|||||||
let insert = thread_rng().gen_range(0, 2) == 0;
|
let insert = thread_rng().gen_range(0, 2) == 0;
|
||||||
maps.iter().for_each(|map| {
|
maps.iter().for_each(|map| {
|
||||||
if insert {
|
if insert {
|
||||||
map.insert(&k, (&v.0, v.1))
|
map.insert(map.bucket_ix(&k), &k, (&v.0, v.1))
|
||||||
} else {
|
} else {
|
||||||
map.update(&k, |current| {
|
map.update(&k, |current| {
|
||||||
assert!(current.is_none());
|
assert!(current.is_none());
|
||||||
@ -494,7 +524,7 @@ mod tests {
|
|||||||
let insert = thread_rng().gen_range(0, 2) == 0;
|
let insert = thread_rng().gen_range(0, 2) == 0;
|
||||||
maps.iter().for_each(|map| {
|
maps.iter().for_each(|map| {
|
||||||
if insert {
|
if insert {
|
||||||
map.insert(&k, (&v, rc))
|
map.insert(map.bucket_ix(&k), &k, (&v, rc))
|
||||||
} else {
|
} else {
|
||||||
map.update(&k, |current| {
|
map.update(&k, |current| {
|
||||||
assert_eq!(current, v_old.map(|(v, rc)| (&v[..], *rc)), "{}", k);
|
assert_eq!(current, v_old.map(|(v, rc)| (&v[..], *rc)), "{}", k);
|
||||||
|
@ -690,7 +690,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||||||
continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed
|
continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed
|
||||||
}
|
}
|
||||||
flush_entries_updated_on_disk += 1;
|
flush_entries_updated_on_disk += 1;
|
||||||
disk.insert(&k, (&v.slot_list.read().unwrap(), v.ref_count()));
|
disk.insert(self.bin, &k, (&v.slot_list.read().unwrap(), v.ref_count()));
|
||||||
}
|
}
|
||||||
Self::update_time_stat(&self.stats().flush_update_us, m);
|
Self::update_time_stat(&self.stats().flush_update_us, m);
|
||||||
Self::update_stat(
|
Self::update_stat(
|
||||||
|
Reference in New Issue
Block a user