AcctIdx: insert goes directly to disk to avoid unnecessary allocations (#21490)
* AcctIdx: upsert avoids unnecessary allocation (during startup) * feedback
This commit is contained in:
		
				
					committed by
					
						 GitHub
						GitHub
					
				
			
			
				
	
			
			
			
						parent
						
							cb368e6554
						
					
				
				
					commit
					b108d7ddaa
				
			| @@ -292,6 +292,17 @@ pub enum PreAllocatedAccountMapEntry<T: IndexValue> { | |||||||
|     Raw((Slot, T)), |     Raw((Slot, T)), | ||||||
| } | } | ||||||
|  |  | ||||||
|  | impl<T: IndexValue> ZeroLamport for PreAllocatedAccountMapEntry<T> { | ||||||
|  |     fn is_zero_lamport(&self) -> bool { | ||||||
|  |         match self { | ||||||
|  |             PreAllocatedAccountMapEntry::Entry(entry) => { | ||||||
|  |                 entry.slot_list.read().unwrap()[0].1.is_zero_lamport() | ||||||
|  |             } | ||||||
|  |             PreAllocatedAccountMapEntry::Raw(raw) => raw.1.is_zero_lamport(), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| impl<T: IndexValue> From<PreAllocatedAccountMapEntry<T>> for (Slot, T) { | impl<T: IndexValue> From<PreAllocatedAccountMapEntry<T>> for (Slot, T) { | ||||||
|     fn from(source: PreAllocatedAccountMapEntry<T>) -> (Slot, T) { |     fn from(source: PreAllocatedAccountMapEntry<T>) -> (Slot, T) { | ||||||
|         match source { |         match source { | ||||||
|   | |||||||
| @@ -1,6 +1,6 @@ | |||||||
| use crate::accounts_index::{ | use crate::accounts_index::{ | ||||||
|     AccountMapEntry, AccountMapEntryInner, AccountMapEntryMeta, IndexValue, |     AccountMapEntry, AccountMapEntryInner, AccountMapEntryMeta, IndexValue, | ||||||
|     PreAllocatedAccountMapEntry, RefCount, SlotList, SlotSlice, |     PreAllocatedAccountMapEntry, RefCount, SlotList, SlotSlice, ZeroLamport, | ||||||
| }; | }; | ||||||
| use crate::bucket_map_holder::{Age, BucketMapHolder}; | use crate::bucket_map_holder::{Age, BucketMapHolder}; | ||||||
| use crate::bucket_map_holder_stats::BucketMapHolderStats; | use crate::bucket_map_holder_stats::BucketMapHolderStats; | ||||||
| @@ -9,7 +9,10 @@ use rand::Rng; | |||||||
| use solana_bucket_map::bucket_api::BucketApi; | use solana_bucket_map::bucket_api::BucketApi; | ||||||
| use solana_measure::measure::Measure; | use solana_measure::measure::Measure; | ||||||
| use solana_sdk::{clock::Slot, pubkey::Pubkey}; | use solana_sdk::{clock::Slot, pubkey::Pubkey}; | ||||||
| use std::collections::{hash_map::Entry, HashMap}; | use std::collections::{ | ||||||
|  |     hash_map::{Entry, VacantEntry}, | ||||||
|  |     HashMap, | ||||||
|  | }; | ||||||
| use std::ops::{Bound, RangeBounds, RangeInclusive}; | use std::ops::{Bound, RangeBounds, RangeInclusive}; | ||||||
| use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; | use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; | ||||||
| use std::sync::{Arc, RwLock, RwLockWriteGuard}; | use std::sync::{Arc, RwLock, RwLockWriteGuard}; | ||||||
| @@ -351,24 +354,41 @@ impl<T: IndexValue> InMemAccountsIndex<T> { | |||||||
|                     } |                     } | ||||||
|                     Entry::Vacant(vacant) => { |                     Entry::Vacant(vacant) => { | ||||||
|                         // not in cache, look on disk |                         // not in cache, look on disk | ||||||
|                         let disk_entry = self.load_account_entry_from_disk(vacant.key()); |                         let directly_to_disk = self.storage.get_startup(); | ||||||
|                         let new_value = if let Some(disk_entry) = disk_entry { |                         if directly_to_disk { | ||||||
|                             // on disk, so merge new_value with what was on disk |                             // We may like this to always run, but it is unclear. | ||||||
|                             Self::lock_and_update_slot_list( |                             // If disk bucket needs to resize, then this call can stall for a long time. | ||||||
|                                 &disk_entry, |                             // Right now, we know it is safe during startup. | ||||||
|                                 new_value.into(), |                             let already_existed = self.upsert_on_disk( | ||||||
|  |                                 vacant, | ||||||
|  |                                 new_value, | ||||||
|                                 reclaims, |                                 reclaims, | ||||||
|                                 previous_slot_entry_was_cached, |                                 previous_slot_entry_was_cached, | ||||||
|                             ); |                             ); | ||||||
|                             disk_entry |                             if !already_existed { | ||||||
|  |                                 self.stats().insert_or_delete(true, self.bin); | ||||||
|  |                             } | ||||||
|                         } else { |                         } else { | ||||||
|                             // not on disk, so insert new thing |                             // go to in-mem cache first | ||||||
|                             self.stats().insert_or_delete(true, self.bin); |                             let disk_entry = self.load_account_entry_from_disk(vacant.key()); | ||||||
|                             new_value.into_account_map_entry(&self.storage) |                             let new_value = if let Some(disk_entry) = disk_entry { | ||||||
|                         }; |                                 // on disk, so merge new_value with what was on disk | ||||||
|                         assert!(new_value.dirty()); |                                 Self::lock_and_update_slot_list( | ||||||
|                         vacant.insert(new_value); |                                     &disk_entry, | ||||||
|                         self.stats().insert_or_delete_mem(true, self.bin); |                                     new_value.into(), | ||||||
|  |                                     reclaims, | ||||||
|  |                                     previous_slot_entry_was_cached, | ||||||
|  |                                 ); | ||||||
|  |                                 disk_entry | ||||||
|  |                             } else { | ||||||
|  |                                 // not on disk, so insert new thing | ||||||
|  |                                 self.stats().insert_or_delete(true, self.bin); | ||||||
|  |                                 new_value.into_account_map_entry(&self.storage) | ||||||
|  |                             }; | ||||||
|  |                             assert!(new_value.dirty()); | ||||||
|  |                             vacant.insert(new_value); | ||||||
|  |                             self.stats().insert_or_delete_mem(true, self.bin); | ||||||
|  |                         } | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|  |  | ||||||
| @@ -490,12 +510,11 @@ impl<T: IndexValue> InMemAccountsIndex<T> { | |||||||
|         let mut map = self.map().write().unwrap(); |         let mut map = self.map().write().unwrap(); | ||||||
|         let entry = map.entry(pubkey); |         let entry = map.entry(pubkey); | ||||||
|         m.stop(); |         m.stop(); | ||||||
|         let mut new_entry_zero_lamports = false; |         let new_entry_zero_lamports = new_entry.is_zero_lamport(); | ||||||
|         let (found_in_mem, already_existed) = match entry { |         let (found_in_mem, already_existed) = match entry { | ||||||
|             Entry::Occupied(occupied) => { |             Entry::Occupied(occupied) => { | ||||||
|                 // in cache, so merge into cache |                 // in cache, so merge into cache | ||||||
|                 let (slot, account_info) = new_entry.into(); |                 let (slot, account_info) = new_entry.into(); | ||||||
|                 new_entry_zero_lamports = account_info.is_zero_lamport(); |  | ||||||
|                 InMemAccountsIndex::lock_and_update_slot_list( |                 InMemAccountsIndex::lock_and_update_slot_list( | ||||||
|                     occupied.get(), |                     occupied.get(), | ||||||
|                     (slot, account_info), |                     (slot, account_info), | ||||||
| @@ -509,41 +528,9 @@ impl<T: IndexValue> InMemAccountsIndex<T> { | |||||||
|             } |             } | ||||||
|             Entry::Vacant(vacant) => { |             Entry::Vacant(vacant) => { | ||||||
|                 // not in cache, look on disk |                 // not in cache, look on disk | ||||||
|                 let mut existed = false; |                 let already_existed = | ||||||
|                 if let Some(disk) = self.bucket.as_ref() { |                     self.upsert_on_disk(vacant, new_entry, &mut Vec::default(), false); | ||||||
|                     let (slot, account_info) = new_entry.into(); |                 (false, already_existed) | ||||||
|                     new_entry_zero_lamports = account_info.is_zero_lamport(); |  | ||||||
|                     disk.update(vacant.key(), |current| { |  | ||||||
|                         if let Some((slot_list, mut ref_count)) = current { |  | ||||||
|                             // on disk, so merge and update disk |  | ||||||
|                             let mut slot_list = slot_list.to_vec(); |  | ||||||
|                             let addref = Self::update_slot_list( |  | ||||||
|                                 &mut slot_list, |  | ||||||
|                                 slot, |  | ||||||
|                                 account_info, |  | ||||||
|                                 &mut Vec::default(), |  | ||||||
|                                 false, |  | ||||||
|                             ); |  | ||||||
|                             if addref { |  | ||||||
|                                 ref_count += 1 |  | ||||||
|                             }; |  | ||||||
|                             existed = true; |  | ||||||
|                             Some((slot_list, ref_count)) |  | ||||||
|                         } else { |  | ||||||
|                             // doesn't exist on disk yet, so insert it |  | ||||||
|                             let ref_count = if account_info.is_cached() { 0 } else { 1 }; |  | ||||||
|                             Some((vec![(slot, account_info)], ref_count)) |  | ||||||
|                         } |  | ||||||
|                     }); |  | ||||||
|                 } else { |  | ||||||
|                     // not using disk, so insert into mem |  | ||||||
|                     self.stats().insert_or_delete_mem(true, self.bin); |  | ||||||
|                     let new_entry: AccountMapEntry<T> = |  | ||||||
|                         new_entry.into_account_map_entry(&self.storage); |  | ||||||
|                     assert!(new_entry.dirty()); |  | ||||||
|                     vacant.insert(new_entry); |  | ||||||
|                 } |  | ||||||
|                 (false, existed) |  | ||||||
|             } |             } | ||||||
|         }; |         }; | ||||||
|         drop(map); |         drop(map); | ||||||
| @@ -563,6 +550,51 @@ impl<T: IndexValue> InMemAccountsIndex<T> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// return tuple: | ||||||
|  |     /// true if item already existed in the index | ||||||
|  |     fn upsert_on_disk( | ||||||
|  |         &self, | ||||||
|  |         vacant: VacantEntry<K, AccountMapEntry<T>>, | ||||||
|  |         new_entry: PreAllocatedAccountMapEntry<T>, | ||||||
|  |         reclaims: &mut SlotList<T>, | ||||||
|  |         previous_slot_entry_was_cached: bool, | ||||||
|  |     ) -> bool { | ||||||
|  |         if let Some(disk) = self.bucket.as_ref() { | ||||||
|  |             let mut existed = false; | ||||||
|  |             let (slot, account_info) = new_entry.into(); | ||||||
|  |             disk.update(vacant.key(), |current| { | ||||||
|  |                 if let Some((slot_list, mut ref_count)) = current { | ||||||
|  |                     // on disk, so merge and update disk | ||||||
|  |                     let mut slot_list = slot_list.to_vec(); | ||||||
|  |                     let addref = Self::update_slot_list( | ||||||
|  |                         &mut slot_list, | ||||||
|  |                         slot, | ||||||
|  |                         account_info, | ||||||
|  |                         reclaims, | ||||||
|  |                         previous_slot_entry_was_cached, | ||||||
|  |                     ); | ||||||
|  |                     if addref { | ||||||
|  |                         ref_count += 1 | ||||||
|  |                     }; | ||||||
|  |                     existed = true; // found on disk, so it did exist | ||||||
|  |                     Some((slot_list, ref_count)) | ||||||
|  |                 } else { | ||||||
|  |                     // doesn't exist on disk yet, so insert it | ||||||
|  |                     let ref_count = if account_info.is_cached() { 0 } else { 1 }; | ||||||
|  |                     Some((vec![(slot, account_info)], ref_count)) | ||||||
|  |                 } | ||||||
|  |             }); | ||||||
|  |             existed | ||||||
|  |         } else { | ||||||
|  |             // not using disk, so insert into mem | ||||||
|  |             self.stats().insert_or_delete_mem(true, self.bin); | ||||||
|  |             let new_entry: AccountMapEntry<T> = new_entry.into_account_map_entry(&self.storage); | ||||||
|  |             assert!(new_entry.dirty()); | ||||||
|  |             vacant.insert(new_entry); | ||||||
|  |             false // not using disk, not in mem, so did not exist | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn just_set_hold_range_in_memory<R>(&self, range: &R, start_holding: bool) |     pub fn just_set_hold_range_in_memory<R>(&self, range: &R, start_holding: bool) | ||||||
|     where |     where | ||||||
|         R: RangeBounds<Pubkey>, |         R: RangeBounds<Pubkey>, | ||||||
| @@ -643,8 +675,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> { | |||||||
|  |  | ||||||
|         // load from disk |         // load from disk | ||||||
|         if let Some(disk) = self.bucket.as_ref() { |         if let Some(disk) = self.bucket.as_ref() { | ||||||
|             let items = disk.items_in_range(range); |  | ||||||
|             let mut map = self.map().write().unwrap(); |             let mut map = self.map().write().unwrap(); | ||||||
|  |             let items = disk.items_in_range(range); // map's lock has to be held while we are getting items from disk | ||||||
|             let future_age = self.storage.future_age_to_flush(); |             let future_age = self.storage.future_age_to_flush(); | ||||||
|             for item in items { |             for item in items { | ||||||
|                 let entry = map.entry(item.pubkey); |                 let entry = map.entry(item.pubkey); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user