diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index 758cd0701c..0856ddaa8b 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -482,9 +482,9 @@ impl Bucket { } } - pub fn update(&mut self, key: &Pubkey, updatefn: F) + pub fn update(&mut self, key: &Pubkey, mut updatefn: F) where - F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, + F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, { let current = self.read_value(key); let new = updatefn(current); diff --git a/bucket_map/src/bucket_api.rs b/bucket_map/src/bucket_api.rs index a15c0ce64a..704467f296 100644 --- a/bucket_map/src/bucket_api.rs +++ b/bucket_map/src/bucket_api.rs @@ -128,7 +128,7 @@ impl BucketApi { pub fn update(&self, key: &Pubkey, updatefn: F) where - F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, + F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, { let mut bucket = self.get_write_bucket(); bucket.as_mut().unwrap().update(key, updatefn) diff --git a/bucket_map/src/bucket_map.rs b/bucket_map/src/bucket_map.rs index 08ac09b7d1..e29bd55151 100644 --- a/bucket_map/src/bucket_map.rs +++ b/bucket_map/src/bucket_map.rs @@ -148,7 +148,7 @@ impl BucketMap { /// Update Pubkey `key`'s value with function `updatefn` pub fn update(&self, key: &Pubkey, updatefn: F) where - F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, + F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, { self.get_bucket(key).update(key, updatefn) } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 6c94874ce4..df05547760 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -3,7 +3,7 @@ use crate::{ ancestors::Ancestors, bucket_map_holder::{Age, BucketMapHolder}, contains::Contains, - in_mem_accounts_index::InMemAccountsIndex, + in_mem_accounts_index::{InMemAccountsIndex, InsertNewEntryResults}, inline_spl_token_v2_0::{self, SPL_TOKEN_ACCOUNT_MINT_OFFSET, SPL_TOKEN_ACCOUNT_OWNER_OFFSET}, pubkey_bins::PubkeyBinCalculator24, secondary_index::*, @@ -1648,28 +1648,14 @@ impl AccountsIndex { let insertion_time = AtomicU64::new(0); binned.into_iter().for_each(|(pubkey_bin, items)| { - let mut _reclaims = SlotList::new(); - - // big enough so not likely to re-allocate, small enough to not over-allocate by too much - // this assumes 10% of keys are duplicates. This vector will be flattened below. let w_account_maps = self.account_maps[pubkey_bin].write().unwrap(); let mut insert_time = Measure::start("insert_into_primary_index"); items.into_iter().for_each(|(pubkey, new_item)| { - let already_exists = - w_account_maps.insert_new_entry_if_missing_with_lock(pubkey, new_item); - if let Some((account_entry, account_info, pubkey)) = already_exists { - let is_zero_lamport = account_info.is_zero_lamport(); - InMemAccountsIndex::lock_and_update_slot_list( - &account_entry, - (slot, account_info), - &mut _reclaims, - false, - ); - - if !is_zero_lamport { - // zero lamports were already added to dirty_pubkeys above - dirty_pubkeys.push(pubkey); - } + if let InsertNewEntryResults::ExistedNewEntryNonZeroLamports = + w_account_maps.insert_new_entry_if_missing_with_lock(pubkey, new_item) + { + // zero lamports were already added to dirty_pubkeys above + dirty_pubkeys.push(pubkey); } }); insert_time.stop(); diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 7f03a14e39..c3d00e327d 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -47,6 +47,12 @@ impl Debug for InMemAccountsIndex { } } +pub enum InsertNewEntryResults { + DidNotExist, + ExistedNewEntryZeroLamports, + ExistedNewEntryNonZeroLamports, +} + #[allow(dead_code)] // temporary during staging impl InMemAccountsIndex { pub fn new(storage: &Arc>, bin: usize) -> Self { @@ -479,52 +485,85 @@ impl InMemAccountsIndex { ) } - // return None if item was created new - // if entry for pubkey already existed, return Some(entry). Caller needs to call entry.update. pub fn insert_new_entry_if_missing_with_lock( &self, pubkey: Pubkey, new_entry: PreAllocatedAccountMapEntry, - ) -> Option<(AccountMapEntry, T, Pubkey)> { + ) -> InsertNewEntryResults { let mut m = Measure::start("entry"); let mut map = self.map().write().unwrap(); let entry = map.entry(pubkey); m.stop(); - let found = matches!(entry, Entry::Occupied(_)); - let result = match entry { - Entry::Occupied(occupied) => Some(Self::insert_returner( - occupied.get(), - occupied.key(), - new_entry, - )), + let mut new_entry_zero_lamports = false; + let (found_in_mem, already_existed) = match entry { + Entry::Occupied(occupied) => { + // in cache, so merge into cache + let (slot, account_info) = new_entry.into(); + new_entry_zero_lamports = account_info.is_zero_lamport(); + InMemAccountsIndex::lock_and_update_slot_list( + occupied.get(), + (slot, account_info), + &mut Vec::default(), + false, + ); + ( + true, /* found in mem */ + true, /* already existed */ + ) + } Entry::Vacant(vacant) => { // not in cache, look on disk - let disk_entry = self.load_account_entry_from_disk(vacant.key()); - self.stats().insert_or_delete_mem(true, self.bin); - if let Some(disk_entry) = disk_entry { - // on disk, so insert into cache, then return cache value so caller will merge - let result = Some(Self::insert_returner(&disk_entry, vacant.key(), new_entry)); - assert!(disk_entry.dirty()); - vacant.insert(disk_entry); - result + let mut existed = false; + if let Some(disk) = self.bucket.as_ref() { + let (slot, account_info) = new_entry.into(); + new_entry_zero_lamports = account_info.is_zero_lamport(); + disk.update(vacant.key(), |current| { + if let Some((slot_list, mut ref_count)) = current { + // on disk, so merge and update disk + let mut slot_list = slot_list.to_vec(); + let addref = Self::update_slot_list( + &mut slot_list, + slot, + account_info, + &mut Vec::default(), + false, + ); + if addref { + ref_count += 1 + }; + existed = true; + Some((slot_list, ref_count)) + } else { + // doesn't exist on disk yet, so insert it + let ref_count = if account_info.is_cached() { 0 } else { 1 }; + Some((vec![(slot, account_info)], ref_count)) + } + }); } else { - // not on disk, so insert new thing and we're done + // not using disk, so insert into mem + self.stats().insert_or_delete_mem(true, self.bin); let new_entry: AccountMapEntry = new_entry.into(); assert!(new_entry.dirty()); vacant.insert(new_entry); - None // returns None if item was created new } + (false, existed) } }; drop(map); - self.update_entry_stats(m, found); + self.update_entry_stats(m, found_in_mem); let stats = self.stats(); - if result.is_none() { + if !already_existed { stats.insert_or_delete(true, self.bin); } else { Self::update_stat(&stats.updates_in_mem, 1); } - result + if !already_existed { + InsertNewEntryResults::DidNotExist + } else if new_entry_zero_lamports { + InsertNewEntryResults::ExistedNewEntryZeroLamports + } else { + InsertNewEntryResults::ExistedNewEntryNonZeroLamports + } } pub fn just_set_hold_range_in_memory(&self, range: &R, start_holding: bool)