diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index e1c933f020..6791f5beb6 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -198,6 +198,12 @@ impl Bucket { data: &[T], ref_count: u64, ) -> Result<(), BucketMapError> { + let best_fit_bucket = IndexEntry::data_bucket_from_num_slots(data.len() as u64); + if self.data.get(best_fit_bucket as usize).is_none() { + // fail early if the data bucket we need doesn't exist - we don't want the index entry partially allocated + //error!("resizing because missing bucket"); + return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0))); + } let index_entry = self.find_entry_mut(key); let (elem, elem_ix) = match index_entry { None => { @@ -213,11 +219,6 @@ impl Bucket { } }; let elem_uid = self.index.uid(elem_ix); - let best_fit_bucket = IndexEntry::data_bucket_from_num_slots(data.len() as u64); - if self.data.get(best_fit_bucket as usize).is_none() { - //error!("resizing because missing bucket"); - return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0))); - } let bucket_ix = elem.data_bucket_ix(); let current_bucket = &self.data[bucket_ix as usize]; if best_fit_bucket == bucket_ix && elem.num_slots > 0 { @@ -366,22 +367,27 @@ impl Bucket { //debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.capacity() ); } + /// grow the appropriate piece + pub fn grow(&mut self, err: BucketMapError) { + match err { + BucketMapError::DataNoSpace(sz) => { + //debug!("GROWING SPACE {:?}", sz); + self.grow_data(sz); + } + BucketMapError::IndexNoSpace(sz) => { + //debug!("GROWING INDEX {}", sz); + self.grow_index(sz); + } + } + } + pub fn insert(&mut self, key: &Pubkey, value: (&[T], RefCount)) { let (new, refct) = value; loop { let rv = self.try_write(key, new, refct); match rv { - Err(BucketMapError::DataNoSpace(sz)) => { - //debug!("GROWING SPACE {:?}", sz); - self.grow_data(sz); - continue; - } - Err(BucketMapError::IndexNoSpace(sz)) => { - //debug!("GROWING INDEX {}", sz); - self.grow_index(sz); - continue; - } - Ok(()) => return, + Ok(_) => return, + Err(err) => self.grow(err), } } } diff --git a/bucket_map/src/bucket_map.rs b/bucket_map/src/bucket_map.rs index 5350963c4f..14a585c9eb 100644 --- a/bucket_map/src/bucket_map.rs +++ b/bucket_map/src/bucket_map.rs @@ -11,7 +11,7 @@ use std::fs; use std::ops::RangeBounds; use std::path::PathBuf; use std::sync::Arc; -use std::sync::RwLock; +use std::sync::{RwLock, RwLockWriteGuard}; use tempfile::TempDir; #[derive(Debug, Default, Clone)] @@ -166,8 +166,12 @@ impl BucketMap { } /// Update Pubkey `key`'s value with 'value' - pub fn insert(&self, key: &Pubkey, value: (&[T], RefCount)) { - let ix = self.bucket_ix(key); + pub fn insert(&self, ix: usize, key: &Pubkey, value: (&[T], RefCount)) { + let mut bucket = self.get_bucket(ix); + bucket.as_mut().unwrap().insert(key, value) + } + + fn get_bucket(&self, ix: usize) -> RwLockWriteGuard>> { let mut bucket = self.buckets[ix].write().unwrap(); if bucket.is_none() { *bucket = Some(Bucket::new( @@ -176,8 +180,24 @@ impl BucketMap { Arc::clone(&self.stats), )); } - let bucket = bucket.as_mut().unwrap(); - bucket.insert(key, value) + bucket + } + + /// Update Pubkey `key`'s value with 'value' + pub fn try_insert( + &self, + ix: usize, + key: &Pubkey, + value: (&[T], RefCount), + ) -> Result<(), BucketMapError> { + let mut bucket = self.get_bucket(ix); + bucket.as_mut().unwrap().try_write(key, value.0, value.1) + } + + /// if err is a grow error, then grow the appropriate piece + pub fn grow(&self, ix: usize, err: BucketMapError) { + let mut bucket = self.get_bucket(ix); + bucket.as_mut().unwrap().grow(err); } /// Update Pubkey `key`'s value with function `updatefn` @@ -186,16 +206,8 @@ impl BucketMap { F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, { let ix = self.bucket_ix(key); - let mut bucket = self.buckets[ix].write().unwrap(); - if bucket.is_none() { - *bucket = Some(Bucket::new( - Arc::clone(&self.drives), - self.max_search, - Arc::clone(&self.stats), - )); - } - let bucket = bucket.as_mut().unwrap(); - bucket.update(key, updatefn) + let mut bucket = self.get_bucket(ix); + bucket.as_mut().unwrap().update(key, updatefn) } /// Get the bucket index for Pubkey `key` @@ -247,11 +259,29 @@ mod tests { #[test] fn bucket_map_test_insert2() { - let key = Pubkey::new_unique(); - let config = BucketMapConfig::new(1 << 1); - let index = BucketMap::new(config); - index.insert(&key, (&[0], 0)); - assert_eq!(index.read_value(&key), Some((vec![0], 0))); + for pass in 0..3 { + let key = Pubkey::new_unique(); + let config = BucketMapConfig::new(1 << 1); + let index = BucketMap::new(config); + let ix = index.bucket_ix(&key); + if pass == 0 { + index.insert(ix, &key, (&[0], 0)); + } else { + let result = index.try_insert(ix, &key, (&[0], 0)); + assert!(result.is_err()); + assert_eq!(index.read_value(&key), None); + if pass == 2 { + // another call to try insert again - should still return an error + let result = index.try_insert(ix, &key, (&[0], 0)); + assert!(result.is_err()); + assert_eq!(index.read_value(&key), None); + } + index.grow(ix, result.unwrap_err()); + let result = index.try_insert(ix, &key, (&[0], 0)); + assert!(result.is_ok()); + } + assert_eq!(index.read_value(&key), Some((vec![0], 0))); + } } #[test] @@ -259,9 +289,9 @@ mod tests { let key = Pubkey::new_unique(); let config = BucketMapConfig::new(1 << 1); let index = BucketMap::new(config); - index.insert(&key, (&[0], 0)); + index.insert(index.bucket_ix(&key), &key, (&[0], 0)); assert_eq!(index.read_value(&key), Some((vec![0], 0))); - index.insert(&key, (&[1], 0)); + index.insert(index.bucket_ix(&key), &key, (&[1], 0)); assert_eq!(index.read_value(&key), Some((vec![1], 0))); } @@ -475,7 +505,7 @@ mod tests { let insert = thread_rng().gen_range(0, 2) == 0; maps.iter().for_each(|map| { if insert { - map.insert(&k, (&v.0, v.1)) + map.insert(map.bucket_ix(&k), &k, (&v.0, v.1)) } else { map.update(&k, |current| { assert!(current.is_none()); @@ -494,7 +524,7 @@ mod tests { let insert = thread_rng().gen_range(0, 2) == 0; maps.iter().for_each(|map| { if insert { - map.insert(&k, (&v, rc)) + map.insert(map.bucket_ix(&k), &k, (&v, rc)) } else { map.update(&k, |current| { assert_eq!(current, v_old.map(|(v, rc)| (&v[..], *rc)), "{}", k); diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index bce6dd3585..df74a05fb4 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -690,7 +690,7 @@ impl InMemAccountsIndex { continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed } flush_entries_updated_on_disk += 1; - disk.insert(&k, (&v.slot_list.read().unwrap(), v.ref_count())); + disk.insert(self.bin, &k, (&v.slot_list.read().unwrap(), v.ref_count())); } Self::update_time_stat(&self.stats().flush_update_us, m); Self::update_stat(