diff --git a/core/src/validator.rs b/core/src/validator.rs index c9c42809e6..7e889d1ca3 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -58,7 +58,7 @@ use { }, solana_runtime::{ accounts_db::AccountShrinkThreshold, - accounts_index::AccountSecondaryIndexes, + accounts_index::{AccountSecondaryIndexes, AccountsIndexConfig}, bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache, @@ -144,7 +144,7 @@ pub struct ValidatorConfig { pub poh_hashes_per_batch: u64, pub account_indexes: AccountSecondaryIndexes, pub accounts_db_caching_enabled: bool, - pub accounts_index_bins: Option, + pub accounts_index_config: Option, pub warp_slot: Option, pub accounts_db_test_hash_calculation: bool, pub accounts_db_skip_shrink: bool, @@ -210,7 +210,7 @@ impl Default for ValidatorConfig { validator_exit: Arc::new(RwLock::new(Exit::default())), no_wait_for_vote_to_start_leader: true, accounts_shrink_ratio: AccountShrinkThreshold::default(), - accounts_index_bins: None, + accounts_index_config: None, } } } @@ -1139,7 +1139,7 @@ fn new_banks_from_ledger( debug_keys: config.debug_keys.clone(), account_indexes: config.account_indexes.clone(), accounts_db_caching_enabled: config.accounts_db_caching_enabled, - accounts_index_bins: config.accounts_index_bins, + accounts_index_config: config.accounts_index_config, shrink_ratio: config.accounts_shrink_ratio, accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation, accounts_db_skip_shrink: config.accounts_db_skip_shrink, diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 967effff73..b2416b41a5 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -203,7 +203,7 @@ mod tests { check_hash_calculation, false, false, - Some(solana_runtime::accounts_index::BINS_FOR_TESTING), + Some(solana_runtime::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) .unwrap(); @@ -831,7 +831,7 @@ mod tests { false, false, false, - Some(solana_runtime::accounts_index::BINS_FOR_TESTING), + Some(solana_runtime::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), )?; assert_eq!(bank, &deserialized_bank); @@ -1011,7 +1011,7 @@ mod tests { false, false, false, - Some(solana_runtime::accounts_index::BINS_FOR_TESTING), + Some(solana_runtime::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) .unwrap(); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 841cc2408b..2711c52c89 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -26,6 +26,7 @@ use solana_ledger::{ shred::Shred, }; use solana_runtime::{ + accounts_index::AccountsIndexConfig, bank::{Bank, RewardCalculationEvent}, bank_forks::BankForks, hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, @@ -1863,6 +1864,10 @@ fn main() { } } ("verify", Some(arg_matches)) => { + let accounts_index_config = value_t!(arg_matches, "accounts_index_bins", usize) + .ok() + .map(|bins| AccountsIndexConfig { bins: Some(bins) }); + let process_options = ProcessOptions { dev_halt_at_slot: value_t!(arg_matches, "halt_at_slot", Slot).ok(), new_hard_forks: hardforks_of(arg_matches, "hard_forks"), @@ -1875,7 +1880,7 @@ fn main() { usize ) .ok(), - accounts_index_bins: value_t!(arg_matches, "accounts_index_bins", usize).ok(), + accounts_index_config, verify_index: arg_matches.is_present("verify_accounts_index"), allow_dead_slots: arg_matches.is_present("allow_dead_slots"), accounts_db_test_hash_calculation: arg_matches diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 04937bbdda..28092cd596 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -132,7 +132,7 @@ fn load_from_snapshot( process_options.accounts_db_test_hash_calculation, process_options.accounts_db_skip_shrink, process_options.verify_index, - process_options.accounts_index_bins, + process_options.accounts_index_config, ) .expect("Load from snapshot failed"); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 1a379191bd..cf72b0deeb 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -17,7 +17,7 @@ use solana_metrics::{datapoint_error, inc_new_counter_debug}; use solana_rayon_threadlimit::get_thread_count; use solana_runtime::{ accounts_db::AccountShrinkThreshold, - accounts_index::AccountSecondaryIndexes, + accounts_index::{AccountSecondaryIndexes, AccountsIndexConfig}, bank::{ Bank, ExecuteTimings, InnerInstructionsList, RentDebits, TransactionBalancesSet, TransactionExecutionResult, TransactionLogMessages, TransactionResults, @@ -461,7 +461,7 @@ pub struct ProcessOptions { pub allow_dead_slots: bool, pub accounts_db_test_hash_calculation: bool, pub accounts_db_skip_shrink: bool, - pub accounts_index_bins: Option, + pub accounts_index_config: Option, pub verify_index: bool, pub shrink_ratio: AccountShrinkThreshold, } @@ -493,7 +493,7 @@ pub fn process_blockstore( opts.accounts_db_caching_enabled, opts.shrink_ratio, false, - opts.accounts_index_bins, + opts.accounts_index_config, ); let bank0 = Arc::new(bank0); info!("processing ledger for slot 0..."); diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index c218947640..47bfd1f136 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -57,7 +57,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { poh_hashes_per_batch: config.poh_hashes_per_batch, no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader, accounts_shrink_ratio: config.accounts_shrink_ratio, - accounts_index_bins: config.accounts_index_bins, + accounts_index_config: config.accounts_index_config, } } diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs index a1c98ce7be..6a0a6016fd 100644 --- a/replica-node/src/replica_node.rs +++ b/replica-node/src/replica_node.rs @@ -127,7 +127,7 @@ fn initialize_from_snapshot( process_options.accounts_db_test_hash_calculation, false, process_options.verify_index, - process_options.accounts_index_bins, + process_options.accounts_index_config, ) .unwrap(); diff --git a/runtime/benches/accounts_index.rs b/runtime/benches/accounts_index.rs index 9fee97a0b0..4aa378a4e8 100644 --- a/runtime/benches/accounts_index.rs +++ b/runtime/benches/accounts_index.rs @@ -5,7 +5,9 @@ extern crate test; use rand::{thread_rng, Rng}; use solana_runtime::{ accounts_db::AccountInfo, - accounts_index::{AccountSecondaryIndexes, AccountsIndex, BINS_FOR_BENCHMARKS}, + accounts_index::{ + AccountSecondaryIndexes, AccountsIndex, ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, + }, }; use solana_sdk::pubkey::{self, Pubkey}; use test::Bencher; @@ -18,7 +20,7 @@ fn bench_accounts_index(bencher: &mut Bencher) { const NUM_FORKS: u64 = 16; let mut reclaims = vec![]; - let index = AccountsIndex::::new(Some(BINS_FOR_BENCHMARKS)); + let index = AccountsIndex::::new(Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS)); for f in 0..NUM_FORKS { for pubkey in pubkeys.iter().take(NUM_PUBKEYS) { index.upsert( diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 2304879906..26ce4bd3bc 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -4,7 +4,8 @@ use crate::{ ScanStorageResult, }, accounts_index::{ - AccountSecondaryIndexes, IndexKey, ScanResult, BINS_FOR_BENCHMARKS, BINS_FOR_TESTING, + AccountSecondaryIndexes, AccountsIndexConfig, IndexKey, ScanResult, + ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, ACCOUNTS_INDEX_CONFIG_FOR_TESTING, }, ancestors::Ancestors, bank::{ @@ -139,7 +140,7 @@ impl Accounts { account_indexes, caching_enabled, shrink_ratio, - Some(BINS_FOR_TESTING), + Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) } @@ -156,7 +157,7 @@ impl Accounts { account_indexes, caching_enabled, shrink_ratio, - Some(BINS_FOR_BENCHMARKS), + Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS), ) } @@ -166,7 +167,7 @@ impl Accounts { account_indexes: AccountSecondaryIndexes, caching_enabled: bool, shrink_ratio: AccountShrinkThreshold, - accounts_index_bins: Option, + accounts_index_config: Option, ) -> Self { Self { accounts_db: Arc::new(AccountsDb::new_with_config( @@ -175,7 +176,7 @@ impl Accounts { account_indexes, caching_enabled, shrink_ratio, - accounts_index_bins, + accounts_index_config, )), account_locks: Mutex::new(AccountLocks::default()), } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 06d6313186..74249d804b 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -23,9 +23,9 @@ use crate::{ accounts_cache::{AccountsCache, CachedAccount, SlotCache}, accounts_hash::{AccountsHash, CalculateHashIntermediate, HashStats, PreviousPass}, accounts_index::{ - AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexRootsStats, - IndexKey, IsCached, RefCount, ScanResult, SlotList, SlotSlice, ZeroLamport, - BINS_FOR_TESTING, + AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexConfig, + AccountsIndexRootsStats, IndexKey, IsCached, RefCount, ScanResult, SlotList, SlotSlice, + ZeroLamport, ACCOUNTS_INDEX_CONFIG_FOR_TESTING, }, ancestors::Ancestors, append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion}, @@ -1426,7 +1426,7 @@ impl AccountsDb { AccountSecondaryIndexes::default(), false, AccountShrinkThreshold::default(), - Some(BINS_FOR_TESTING), + Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) } @@ -1436,9 +1436,9 @@ impl AccountsDb { account_indexes: AccountSecondaryIndexes, caching_enabled: bool, shrink_ratio: AccountShrinkThreshold, - accounts_index_bins: Option, + accounts_index_config: Option, ) -> Self { - let accounts_index = AccountsIndex::new(accounts_index_bins); + let accounts_index = AccountsIndex::new(accounts_index_config); let mut new = if !paths.is_empty() { Self { paths, @@ -6327,7 +6327,7 @@ impl AccountsDb { account_indexes, caching_enabled, shrink_ratio, - Some(BINS_FOR_TESTING), + Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 991b62e7a7..76ab801509 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -33,8 +33,12 @@ use thiserror::Error; pub const ITER_BATCH_SIZE: usize = 1000; pub const BINS_DEFAULT: usize = 16; -pub const BINS_FOR_TESTING: usize = BINS_DEFAULT; -pub const BINS_FOR_BENCHMARKS: usize = BINS_DEFAULT; +pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndexConfig { + bins: Some(BINS_DEFAULT), +}; +pub const ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS: AccountsIndexConfig = AccountsIndexConfig { + bins: Some(BINS_DEFAULT), +}; pub type ScanResult = Result; pub type SlotList = Vec<(Slot, T)>; pub type SlotSlice<'s, T> = &'s [(Slot, T)]; @@ -78,6 +82,11 @@ pub struct AccountSecondaryIndexesIncludeExclude { pub keys: HashSet, } +#[derive(Debug, Default, Clone, Copy)] +pub struct AccountsIndexConfig { + pub bins: Option, +} + #[derive(Debug, Default, Clone)] pub struct AccountSecondaryIndexes { pub keys: Option, @@ -761,11 +770,11 @@ pub struct AccountsIndex { impl AccountsIndex { pub fn default_for_tests() -> Self { - Self::new(Some(BINS_FOR_TESTING)) + Self::new(Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING)) } - pub fn new(bins: Option) -> Self { - let (account_maps, bin_calculator) = Self::allocate_accounts_index(bins); + pub fn new(config: Option) -> Self { + let (account_maps, bin_calculator) = Self::allocate_accounts_index(config); Self { account_maps, bin_calculator, @@ -784,8 +793,12 @@ impl AccountsIndex { } } - fn allocate_accounts_index(bins: Option) -> (LockMapType, PubkeyBinCalculator16) { - let bins = bins.unwrap_or(BINS_DEFAULT); + fn allocate_accounts_index( + config: Option, + ) -> (LockMapType, PubkeyBinCalculator16) { + let bins = config + .and_then(|config| config.bins) + .unwrap_or(BINS_DEFAULT); let account_maps = (0..bins) .into_iter() .map(|_| RwLock::new(AccountMap::default())) @@ -4024,6 +4037,6 @@ pub mod tests { #[test] #[should_panic(expected = "bins.is_power_of_two()")] fn test_illegal_bins() { - AccountsIndex::::new(Some(3)); + AccountsIndex::::new(Some(AccountsIndexConfig { bins: Some(3) })); } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 372b717755..0d3919f313 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -40,7 +40,8 @@ use crate::{ }, accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorages}, accounts_index::{ - AccountSecondaryIndexes, IndexKey, ScanResult, BINS_FOR_BENCHMARKS, BINS_FOR_TESTING, + AccountSecondaryIndexes, AccountsIndexConfig, IndexKey, ScanResult, + ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, ACCOUNTS_INDEX_CONFIG_FOR_TESTING, }, ancestors::{Ancestors, AncestorsForSerialization}, blockhash_queue::BlockhashQueue, @@ -1190,7 +1191,7 @@ impl Bank { accounts_db_caching_enabled, shrink_ratio, debug_do_not_add_builtins, - Some(BINS_FOR_TESTING), + Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) } @@ -1215,7 +1216,7 @@ impl Bank { accounts_db_caching_enabled, shrink_ratio, debug_do_not_add_builtins, - Some(BINS_FOR_BENCHMARKS), + Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS), ) } @@ -1230,7 +1231,7 @@ impl Bank { accounts_db_caching_enabled: bool, shrink_ratio: AccountShrinkThreshold, debug_do_not_add_builtins: bool, - accounts_index_bins: Option, + accounts_index_config: Option, ) -> Self { let accounts = Accounts::new_with_config( paths, @@ -1238,7 +1239,7 @@ impl Bank { account_indexes, accounts_db_caching_enabled, shrink_ratio, - accounts_index_bins, + accounts_index_config, ); let mut bank = Self::default_with_accounts(accounts); bank.ancestors = Ancestors::from(vec![bank.slot()]); diff --git a/runtime/src/hybrid_btree_map.rs b/runtime/src/hybrid_btree_map.rs new file mode 100644 index 0000000000..785833373e --- /dev/null +++ b/runtime/src/hybrid_btree_map.rs @@ -0,0 +1,458 @@ +use crate::accounts_index::AccountMapEntry; +use crate::accounts_index::{IsCached, RefCount, SlotList, ACCOUNTS_INDEX_CONFIG_FOR_TESTING}; +use crate::bucket_map_holder::{BucketMapWriteHolder}; + +use crate::pubkey_bins::PubkeyBinCalculator16; +use solana_bucket_map::bucket_map::BucketMap; +use solana_sdk::clock::Slot; +use solana_sdk::pubkey::Pubkey; +use std::collections::btree_map::BTreeMap; +use std::fmt::Debug; +use std::ops::Bound; +use std::ops::{Range, RangeBounds}; +use std::sync::Arc; + +type K = Pubkey; + +#[derive(Clone, Debug)] +pub struct HybridAccountEntry { + entry: V, + //exists_on_disk: bool, +} +//type V2 = HybridAccountEntry; +pub type V2 = AccountMapEntry; +/* +trait RealEntry { + fn real_entry(&self) -> T; +} + +impl RealEntry for T { + fn real_entry(&self) -> T + { + self + } +} +*/ +pub type SlotT = (Slot, T); + +#[derive(Debug)] +pub struct HybridBTreeMap { + in_memory: BTreeMap>, + disk: Arc>, + bin_index: usize, + bins: usize, +} + +// TODO: we need a bit for 'exists on disk' for updates +/* +impl Default for HybridBTreeMap { + /// Creates an empty `BTreeMap`. + fn default() -> HybridBTreeMap { + Self { + in_memory: BTreeMap::default(), + disk: BucketMap::new_buckets(PubkeyBinCalculator16::log_2(BINS as u32) as u8), + } + } +} +*/ + +/* +impl<'a, K: 'a, V: 'a> Iterator for HybridBTreeMap<'a, V> { + type Item = (&'a K, &'a V); + + fn next(&mut self) -> Option<(&'a K, &'a V)> { + if self.length == 0 { + None + } else { + self.length -= 1; + Some(unsafe { self.range.inner.next_unchecked() }) + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.length, Some(self.length)) + } + + fn last(mut self) -> Option<(&'a K, &'a V)> { + self.next_back() + } + + fn min(mut self) -> Option<(&'a K, &'a V)> { + self.next() + } + + fn max(mut self) -> Option<(&'a K, &'a V)> { + self.next_back() + } +} +*/ + +pub enum HybridEntry<'a, V: 'static + Clone + IsCached + Debug> { + /// A vacant entry. + Vacant(HybridVacantEntry<'a, V>), + + /// An occupied entry. + Occupied(HybridOccupiedEntry<'a, V>), +} + +pub struct Keys { + keys: Vec, + index: usize, +} + +impl Keys { + pub fn len(&self) -> usize { + self.keys.len() + } +} + +impl Iterator for Keys { + type Item = Pubkey; + fn next(&mut self) -> Option { + if self.index >= self.keys.len() { + None + } else { + let r = Some(self.keys[self.index]); + self.index += 1; + r + } + } +} + +pub struct Values { + values: Vec>, + index: usize, +} + +impl Iterator for Values { + type Item = V2; + fn next(&mut self) -> Option { + if self.index >= self.values.len() { + None + } else { + let r = Some(AccountMapEntry { + slot_list: self.values[self.index].clone(), + ref_count: RefCount::MAX, // todo: no clone + }); + self.index += 1; + r + } + } +} + +pub struct HybridOccupiedEntry<'a, V: 'static + Clone + IsCached + Debug> { + pubkey: Pubkey, + entry: V2, + map: &'a HybridBTreeMap, +} +pub struct HybridVacantEntry<'a, V: 'static + Clone + IsCached + Debug> { + pubkey: Pubkey, + map: &'a HybridBTreeMap, +} + +impl<'a, V: 'a + Clone + IsCached + Debug> HybridOccupiedEntry<'a, V> { + pub fn get(&self) -> &V2 { + &self.entry + } + pub fn update(&mut self, new_data: &SlotList, new_rc: Option) { + //error!("update: {}", self.pubkey); + self.map.disk.update( + &self.pubkey, + |previous| { + if previous.is_some() { + //error!("update {} to {:?}", self.pubkey, new_data); + } + Some((new_data.clone(), new_rc.unwrap_or(self.entry.ref_count))) + // TODO no clone here + }, + Some(&self.entry), + ); + let g = self.map.disk.get(&self.pubkey).unwrap(); + assert_eq!(format!("{:?}", g.1), format!("{:?}", new_data)); + } + pub fn addref(&mut self) { + self.entry.ref_count += 1; + + self.map + .disk + .addref(&self.pubkey, self.entry.ref_count, &self.entry.slot_list); + //error!("addref: {}, {}, {:?}", self.pubkey, self.entry.ref_count(), result); + } + pub fn unref(&mut self) { + self.entry.ref_count -= 1; + self.map + .disk + .unref(&self.pubkey, self.entry.ref_count, &self.entry.slot_list); + //error!("addref: {}, {}, {:?}", self.pubkey, self.entry.ref_count(), result); + } + /* + pub fn get_mut(&mut self) -> &mut V2 { + self.entry.get_mut() + } + */ + pub fn key(&self) -> &K { + &self.pubkey + } + pub fn remove(self) { + self.map.disk.delete_key(&self.pubkey) + } +} + +impl<'a, V: 'a + Clone + Debug + IsCached> HybridVacantEntry<'a, V> { + pub fn insert(self, value: V2) { + // -> &'a mut V2 { + /* + let value = V2:: { + entry: value, + //exists_on_disk: false, + }; + */ + //let mut sl = SlotList::default(); + //std::mem::swap(&mut sl, &mut value.slot_list); + self.map.disk.update( + &self.pubkey, + |_previous| { + Some((value.slot_list.clone() /* todo bad */, value.ref_count)) + }, + None, + ); + } +} + +impl HybridBTreeMap { + /// Creates an empty `BTreeMap`. + pub fn new2(bucket_map: &Arc>, bin_index: usize, bins: usize) -> Self { + Self { + in_memory: BTreeMap::default(), + disk: bucket_map.clone(), + bin_index, + bins: bins, //bucket_map.num_buckets(), + } + } + + pub fn new_for_testing() -> Self { + let map = Self::new_bucket_map(ACCOUNTS_INDEX_CONFIG_FOR_TESTING); + Self::new2(&map, 0, 1) + } + + pub fn new_bucket_map(bins: usize) -> Arc> { + let buckets = PubkeyBinCalculator16::log_2(bins as u32) as u8; // make more buckets to try to spread things out + // 15 hopefully avoids too many files open problem + //buckets = std::cmp::min(buckets + 11, 15); // max # that works with open file handles and such + //buckets = + //error!("creating: {} for {}", buckets, BUCKET_BINS); + Arc::new(BucketMapWriteHolder::new(BucketMap::new_buckets(buckets))) + } + + pub fn flush(&self) -> usize { + let num_buckets = self.disk.num_buckets(); + let mystart = num_buckets * self.bin_index / self.bins; + let myend = num_buckets * (self.bin_index + 1) / self.bins; + assert_eq!(myend - mystart, 1, "{}", self.bin_index); + (mystart..myend) + .map(|ix| self.disk.flush(ix, false, None).1) + .sum() + + /* + { + // put entire contents of this map into the disk backing + let mut keys = Vec::with_capacity(self.in_memory.len()); + for k in self.in_memory.keys() { + keys.push(k); + } + self.disk.update_batch(&keys[..], |previous, key, orig_i| { + let item = self.in_memory.get(key); + item.map(|item| (item.slot_list.clone(), item.ref_count())) + }); + self.in_memory.clear(); + }*/ + } + pub fn distribution(&self) { + self.disk.distribution(); + } + fn bound<'a, T>(bound: Bound<&'a T>, unbounded: &'a T) -> &'a T { + match bound { + Bound::Included(b) | Bound::Excluded(b) => b, + _ => unbounded, + } + } + pub fn range(&self, range: Option) -> Vec<(Pubkey, SlotList)> + where + R: RangeBounds, + { + //self.disk.range.fetch_add(1, Ordering::Relaxed); + + let num_buckets = self.disk.num_buckets(); + if self.bin_index != 0 && self.disk.unified_backing { + return vec![]; + } + let mut start = 0; + let mut end = num_buckets; + if let Some(range) = &range { + let default = Pubkey::default(); + let max = Pubkey::new(&[0xff; 32]); + let start_bound = Self::bound(range.start_bound(), &default); + start = self.disk.bucket_ix(start_bound); + // end is exclusive, so it is end + 1 we care about here + let end_bound = Self::bound(range.end_bound(), &max); + end = std::cmp::min(num_buckets, 1 + self.disk.bucket_ix(end_bound)); // ugly + assert!( + start_bound <= end_bound, + "range start is greater than range end" + ); + } + let len = (start..end) + .into_iter() + .map(|ix| self.disk.bucket_len(ix) as usize) + .sum::(); + + let mystart = num_buckets * self.bin_index / self.bins; + let myend = num_buckets * (self.bin_index + 1) / self.bins; + start = std::cmp::max(start, mystart); + end = std::cmp::min(end, myend); + let mut keys = Vec::with_capacity(len); + (start..end).into_iter().for_each(|ix| { + let mut ks = self.disk.range(ix, range.as_ref()); + keys.append(&mut ks); + }); + keys.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + keys + } + + pub fn keys2(&self) -> Keys { + // used still? + let num_buckets = self.disk.num_buckets(); + let start = num_buckets * self.bin_index / self.bins; + let end = num_buckets * (self.bin_index + 1) / self.bins; + let len = (start..end) + .into_iter() + .map(|ix| self.disk.bucket_len(ix) as usize) + .sum::(); + let mut keys = Vec::with_capacity(len); + let _len = (start..end).into_iter().for_each(|ix| { + keys.append( + &mut self + .disk + .keys3(ix, None::<&Range>) + .unwrap_or_default(), + ) + }); + keys.sort_unstable(); + Keys { keys, index: 0 } + } + pub fn values(&self) -> Values { + let num_buckets = self.disk.num_buckets(); + if self.bin_index != 0 && self.disk.unified_backing { + return Values { + values: vec![], + index: 0, + }; + } + // todo: this may be unsafe if we are asking for things with an update cache active. thankfully, we only call values at startup right now + let start = num_buckets * self.bin_index / self.bins; + let end = num_buckets * (self.bin_index + 1) / self.bins; + let len = (start..end) + .into_iter() + .map(|ix| self.disk.bucket_len(ix) as usize) + .sum::(); + let mut values = Vec::with_capacity(len); + (start..end).into_iter().for_each(|ix| { + values.append( + &mut self + .disk + .values(ix, None::<&Range>) + .unwrap_or_default(), + ) + }); + //error!("getting values: {}, bin: {}, bins: {}, start: {}, end: {}", values.len(), self.bin_index, self.bins, start, end); + //keys.sort_unstable(); + if self.bin_index == 0 { + //error!("getting values: {}, {}, {}", values.len(), start, end); + } + Values { values, index: 0 } + } + + pub fn upsert( + &self, + pubkey: &Pubkey, + new_value: AccountMapEntry, + reclaims: &mut SlotList, + reclaims_must_be_empty: bool, + ) { + self.disk + .upsert(pubkey, new_value, reclaims, reclaims_must_be_empty); + } + + pub fn entry(&mut self, key: K) -> HybridEntry<'_, V> { + match self.disk.get(&key) { + Some(entry) => HybridEntry::Occupied(HybridOccupiedEntry { + pubkey: key, + entry: AccountMapEntry:: { + slot_list: entry.1, + ref_count: entry.0, + }, + map: self, + }), + None => HybridEntry::Vacant(HybridVacantEntry { + pubkey: key, + map: self, + }), + } + } + + pub fn insert2(&mut self, key: K, value: V2) { + match self.entry(key) { + HybridEntry::Occupied(_occupied) => { + panic!(""); + } + HybridEntry::Vacant(vacant) => vacant.insert(value), + } + } + + pub fn get(&self, key: &K) -> Option> { + let lookup = || { + let disk = self.disk.get(key); + disk.map(|disk| AccountMapEntry { + ref_count: disk.0, + slot_list: disk.1, + }) + }; + + if true { + lookup() + } else { + let in_mem = self.in_memory.get(key); + match in_mem { + Some(in_mem) => Some(in_mem.clone()), + None => { + // we have to load this into the in-mem cache so we can get a ref_count, if nothing else + lookup() + /* + disk.map(|item| { + self.in_memory.entry(*key).map(|entry| { + + } + })*/ + } + } + } + } + pub fn remove(&mut self, key: &K) { + self.disk.delete_key(key); //.map(|x| x.entry) + } + pub fn len(&self) -> usize { + self.disk.keys3(self.bin_index, None::<&Range>).map(|x| x.len()).unwrap_or_default() + } + + pub fn set_startup(&self, startup: bool) { + self.disk.set_startup(startup); + } + + pub fn update_or_insert_async(&self, pubkey: Pubkey, new_entry: AccountMapEntry) { + self.disk + .update_or_insert_async(self.bin_index, pubkey, new_entry); + } + pub fn dump_metrics(&self) { + self.disk.dump_metrics(); + } +} diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 94339d7d88..1761bf6939 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -4,7 +4,7 @@ use { accounts_db::{ AccountShrinkThreshold, AccountStorageEntry, AccountsDb, AppendVecId, BankHashInfo, }, - accounts_index::AccountSecondaryIndexes, + accounts_index::{AccountSecondaryIndexes, AccountsIndexConfig}, ancestors::Ancestors, append_vec::{AppendVec, StoredMetaWriteVersion}, bank::{Bank, BankFieldsToDeserialize, BankRc, Builtins}, @@ -197,7 +197,7 @@ pub(crate) fn bank_from_streams( limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, verify_index: bool, - accounts_index_bins: Option, + accounts_index_config: Option, ) -> std::result::Result where R: Read, @@ -235,7 +235,7 @@ where limit_load_slot_count_from_snapshot, shrink_ratio, verify_index, - accounts_index_bins, + accounts_index_config, )?; Ok(bank) }}; @@ -328,7 +328,7 @@ fn reconstruct_bank_from_fields( limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, verify_index: bool, - accounts_index_bins: Option, + accounts_index_config: Option, ) -> Result where E: SerializableStorage + std::marker::Sync, @@ -343,7 +343,7 @@ where limit_load_slot_count_from_snapshot, shrink_ratio, verify_index, - accounts_index_bins, + accounts_index_config, )?; accounts_db.freeze_accounts( &Ancestors::from(&bank_fields.ancestors), @@ -395,7 +395,7 @@ fn reconstruct_accountsdb_from_fields( limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, verify_index: bool, - accounts_index_bins: Option, + accounts_index_config: Option, ) -> Result where E: SerializableStorage + std::marker::Sync, @@ -406,7 +406,7 @@ where account_secondary_indexes, caching_enabled, shrink_ratio, - accounts_index_bins, + accounts_index_config, ); let AccountsDbFields( diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index b2d5c0fb38..2c55da0fc9 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -81,7 +81,7 @@ where None, AccountShrinkThreshold::default(), false, - Some(crate::accounts_index::BINS_FOR_TESTING), + Some(crate::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) } @@ -243,7 +243,7 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) { None, AccountShrinkThreshold::default(), false, - Some(crate::accounts_index::BINS_FOR_TESTING), + Some(crate::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) .unwrap(); dbank.src = ref_sc; diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 140820fa03..9e1a7bf55e 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -1,7 +1,7 @@ use { crate::{ accounts_db::{AccountShrinkThreshold, AccountsDb}, - accounts_index::AccountSecondaryIndexes, + accounts_index::{AccountSecondaryIndexes, AccountsIndexConfig}, bank::{Bank, BankSlotDelta, Builtins}, hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap}, serde_snapshot::{ @@ -728,7 +728,7 @@ pub fn bank_from_snapshot_archives( test_hash_calculation: bool, accounts_db_skip_shrink: bool, verify_index: bool, - accounts_index_bins: Option, + accounts_index_config: Option, ) -> Result<(Bank, BankFromArchiveTimings)> { check_are_snapshots_compatible( full_snapshot_archive_info, @@ -792,7 +792,7 @@ pub fn bank_from_snapshot_archives( limit_load_slot_count_from_snapshot, shrink_ratio, verify_index, - accounts_index_bins, + accounts_index_config, )?; measure_rebuild.stop(); info!("{}", measure_rebuild); @@ -838,7 +838,7 @@ pub fn bank_from_latest_snapshot_archives( test_hash_calculation: bool, accounts_db_skip_shrink: bool, verify_index: bool, - accounts_index_bins: Option, + accounts_index_config: Option, ) -> Result<(Bank, BankFromArchiveTimings)> { let full_snapshot_archive_info = get_highest_full_snapshot_archive_info(&snapshot_archives_dir) .ok_or(SnapshotError::NoSnapshotArchives)?; @@ -876,7 +876,7 @@ pub fn bank_from_latest_snapshot_archives( test_hash_calculation, accounts_db_skip_shrink, verify_index, - accounts_index_bins, + accounts_index_config, )?; verify_bank_against_expected_slot_hash( @@ -1376,7 +1376,7 @@ fn rebuild_bank_from_snapshots( limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, verify_index: bool, - accounts_index_bins: Option, + accounts_index_config: Option, ) -> Result { let (full_snapshot_version, full_snapshot_root_paths) = verify_unpacked_snapshots_dir_and_version( @@ -1424,7 +1424,7 @@ fn rebuild_bank_from_snapshots( limit_load_slot_count_from_snapshot, shrink_ratio, verify_index, - accounts_index_bins, + accounts_index_config, ), }?, ) @@ -2524,7 +2524,7 @@ mod tests { false, false, false, - Some(crate::accounts_index::BINS_FOR_TESTING), + Some(crate::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) .unwrap(); @@ -2615,7 +2615,7 @@ mod tests { false, false, false, - Some(crate::accounts_index::BINS_FOR_TESTING), + Some(crate::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) .unwrap(); @@ -2725,7 +2725,7 @@ mod tests { false, false, false, - Some(crate::accounts_index::BINS_FOR_TESTING), + Some(crate::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) .unwrap(); @@ -2824,7 +2824,7 @@ mod tests { false, false, false, - Some(crate::accounts_index::BINS_FOR_TESTING), + Some(crate::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) .unwrap(); @@ -2964,7 +2964,7 @@ mod tests { false, false, false, - Some(crate::accounts_index::BINS_FOR_TESTING), + Some(crate::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) .unwrap(); assert_eq!( @@ -3026,7 +3026,7 @@ mod tests { false, false, false, - Some(crate::accounts_index::BINS_FOR_TESTING), + Some(crate::accounts_index::ACCOUNTS_INDEX_CONFIG_FOR_TESTING), ) .unwrap(); assert_eq!( diff --git a/validator/src/main.rs b/validator/src/main.rs index 10b16b6522..bfa5d6325d 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -45,6 +45,7 @@ use { }, accounts_index::{ AccountIndex, AccountSecondaryIndexes, AccountSecondaryIndexesIncludeExclude, + AccountsIndexConfig, }, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, snapshot_archive_info::SnapshotArchiveInfoGetter, @@ -2387,6 +2388,10 @@ pub fn main() { _ => unreachable!(), }; + let accounts_index_config = value_t!(matches, "accounts_index_bins", usize) + .ok() + .map(|bins| AccountsIndexConfig { bins: Some(bins) }); + let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_storage, @@ -2484,7 +2489,7 @@ pub fn main() { account_indexes, accounts_db_caching_enabled: !matches.is_present("no_accounts_db_caching"), accounts_db_test_hash_calculation: matches.is_present("accounts_db_test_hash_calculation"), - accounts_index_bins: value_t!(matches, "accounts_index_bins", usize).ok(), + accounts_index_config, accounts_db_skip_shrink: matches.is_present("accounts_db_skip_shrink"), accounts_db_use_index_hash_calculation: matches.is_present("accounts_db_index_hashing"), tpu_coalesce_ms,