diff --git a/Cargo.lock b/Cargo.lock index e1e616bfbe..d36f9be278 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2065,6 +2065,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9007da9cacbd3e6343da136e98b0d2df013f553d35bdec8b518f07bea768e19c" +[[package]] +name = "index_list" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a9d968042a4902e08810946fc7cd5851eb75e80301342305af755ca06cb82ce" + [[package]] name = "indexmap" version = "1.8.0" @@ -5702,6 +5708,7 @@ dependencies = [ "ed25519-dalek", "flate2", "fnv", + "index_list", "itertools 0.10.3", "lazy_static", "libsecp256k1 0.6.0", @@ -5712,6 +5719,7 @@ dependencies = [ "num_cpus", "ouroboros", "rand 0.7.3", + "rand_chacha 0.2.2", "rayon", "regex", "rustc_version 0.4.0", diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 6c9fbbaf52..d04012968b 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -1363,6 +1363,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "index_list" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a9d968042a4902e08810946fc7cd5851eb75e80301342305af755ca06cb82ce" + [[package]] name = "indexmap" version = "1.8.0" @@ -3447,6 +3453,7 @@ dependencies = [ "dir-diff", "flate2", "fnv", + "index_list", "itertools 0.10.3", "lazy_static", "log", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 7d3d0dc775..4475b8713c 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -21,6 +21,7 @@ crossbeam-channel = "0.5" dir-diff = "0.3.2" flate2 = "1.0.22" fnv = "1.0.7" +index_list = "0.2.7" itertools = "0.10.3" lazy_static = "1.4.0" log = "0.4.14" @@ -60,9 +61,10 @@ crate-type = ["lib"] name = "solana_runtime" [dev-dependencies] +assert_matches = "1.5.0" ed25519-dalek = "=1.0.1" libsecp256k1 = "0.6.0" -assert_matches = "1.5.0" +rand_chacha = "0.2.2" solana-logger = { path = "../logger", version = "=1.10.0" } [package.metadata.docs.rs] diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 60ca2b4bb3..8358ab19f6 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -963,7 +963,7 @@ pub struct AccountsDb { write_cache_limit_bytes: Option, sender_bg_hasher: Option>, - pub read_only_accounts_cache: ReadOnlyAccountsCache, + read_only_accounts_cache: ReadOnlyAccountsCache, recycle_stores: RwLock, @@ -3649,7 +3649,7 @@ impl AccountsDb { // Notice the subtle `?` at previous line, we bail out pretty early if missing. if self.caching_enabled && !storage_location.is_cached() { - let result = self.read_only_accounts_cache.load(pubkey, slot); + let result = self.read_only_accounts_cache.load(*pubkey, slot); if let Some(account) = result { return Some((account, slot)); } @@ -3680,7 +3680,8 @@ impl AccountsDb { However, by the assumption for contradiction above , 'A' has already been updated in 'S' which means '(S, A)' must exist in the write cache, which is a contradiction. */ - self.read_only_accounts_cache.store(pubkey, slot, &account); + self.read_only_accounts_cache + .store(*pubkey, slot, account.clone()); } Some((account, slot)) } @@ -4928,7 +4929,7 @@ impl AccountsDb { let accounts_and_meta_to_store: Vec<_> = accounts .iter() .map(|(pubkey, account)| { - self.read_only_accounts_cache.remove(pubkey, slot); + self.read_only_accounts_cache.remove(**pubkey, slot); // this is the source of Some(Account) or None. // Some(Account) = store 'Account' // None = store a default/empty account with 0 lamports diff --git a/runtime/src/read_only_accounts_cache.rs b/runtime/src/read_only_accounts_cache.rs index d4c023541b..2ff5ad3711 100644 --- a/runtime/src/read_only_accounts_cache.rs +++ b/runtime/src/read_only_accounts_cache.rs @@ -1,238 +1,136 @@ //! ReadOnlyAccountsCache used to store accounts, such as executable accounts, //! which can be large, loaded many times, and rarely change. -//use mapref::entry::{Entry, OccupiedEntry, VacantEntry}; use { dashmap::{mapref::entry::Entry, DashMap}, + index_list::{Index, IndexList}, solana_sdk::{ account::{AccountSharedData, ReadableAccount}, clock::Slot, pubkey::Pubkey, }, - std::{ - sync::{ - atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - Arc, RwLock, - }, - thread::{sleep, Builder, JoinHandle}, - time::{Duration, Instant}, + std::sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Mutex, }, }; +const CACHE_ENTRY_SIZE: usize = + std::mem::size_of::() + 2 * std::mem::size_of::(); + type ReadOnlyCacheKey = (Pubkey, Slot); -type LruEntry = (Instant, ReadOnlyCacheKey); #[derive(Debug)] -pub struct ReadOnlyAccountCacheEntry { - pub account: AccountSharedData, - pub last_used: Arc>, +struct ReadOnlyAccountCacheEntry { + account: AccountSharedData, + index: Index, // Index of the entry in the eviction queue. } #[derive(Debug)] -pub struct ReadOnlyAccountsCache { - cache: Arc>, +pub(crate) struct ReadOnlyAccountsCache { + cache: DashMap, + // When an item is first entered into the cache, it is added to the end of + // the queue. Also each time an entry is looked up from the cache it is + // moved to the end of the queue. As a result, items in the queue are + // always sorted in the order that they have last been accessed. When doing + // LRU eviction, cache entries are evicted from the front of the queue. + queue: Mutex>, max_data_size: usize, - data_size: Arc, + data_size: AtomicUsize, hits: AtomicU64, misses: AtomicU64, - per_account_size: usize, - stop: Arc, - background: Option>, -} - -impl Drop for ReadOnlyAccountsCache { - fn drop(&mut self) { - self.stop.store(true, Ordering::Relaxed); - if let Some(background) = self.background.take() { - background.join().unwrap(); - } - } } impl ReadOnlyAccountsCache { - pub fn new(max_data_size: usize) -> Self { - let mut result = Self::new_test(max_data_size); - - let bg = Self { - max_data_size, - cache: result.cache.clone(), - data_size: result.data_size.clone(), - hits: AtomicU64::new(0), - misses: AtomicU64::new(0), - per_account_size: Self::per_account_size(), - stop: result.stop.clone(), - background: None, - }; - - result.background = Some( - Builder::new() - .name("solana-readonly-accounts-cache".to_string()) - .spawn(move || { - bg.bg_purge_lru_items(false); - }) - .unwrap(), - ); - - result - } - - fn new_test(max_data_size: usize) -> Self { + pub(crate) fn new(max_data_size: usize) -> Self { Self { max_data_size, - cache: Arc::new(DashMap::default()), - data_size: Arc::new(AtomicUsize::new(0)), - hits: AtomicU64::new(0), - misses: AtomicU64::new(0), - per_account_size: Self::per_account_size(), - stop: Arc::new(AtomicBool::new(false)), - background: None, + cache: DashMap::default(), + queue: Mutex::>::default(), + data_size: AtomicUsize::default(), + hits: AtomicU64::default(), + misses: AtomicU64::default(), } } - fn per_account_size() -> usize { - // size_of(arc(x)) does not return the size of x, so we have to add the size of RwLock... - std::mem::size_of::() + std::mem::size_of::>() - } - - pub fn load(&self, pubkey: &Pubkey, slot: Slot) -> Option { - self.cache - .get(&(*pubkey, slot)) - .map(|account_ref| { - self.hits.fetch_add(1, Ordering::Relaxed); - let value = account_ref.value(); - // remember last use - let now = Instant::now(); - *value.last_used.write().unwrap() = now; - value.account.clone() - }) - .or_else(|| { + pub(crate) fn load(&self, pubkey: Pubkey, slot: Slot) -> Option { + let key = (pubkey, slot); + let mut entry = match self.cache.get_mut(&key) { + None => { self.misses.fetch_add(1, Ordering::Relaxed); - None - }) + return None; + } + Some(entry) => entry, + }; + self.hits.fetch_add(1, Ordering::Relaxed); + // Move the entry to the end of the queue. + // self.queue is modified while holding a reference to the cache entry; + // so that another thread cannot write to the same key. + { + let mut queue = self.queue.lock().unwrap(); + queue.remove(entry.index); + entry.index = queue.insert_last(key); + } + Some(entry.account.clone()) } fn account_size(&self, account: &AccountSharedData) -> usize { - account.data().len() + self.per_account_size + CACHE_ENTRY_SIZE + account.data().len() } - pub fn store(&self, pubkey: &Pubkey, slot: Slot, account: &AccountSharedData) { - let len = self.account_size(account); - let previous_len = if let Some(previous) = self.cache.insert( - (*pubkey, slot), - ReadOnlyAccountCacheEntry { - account: account.clone(), - last_used: Arc::new(RwLock::new(Instant::now())), - }, - ) { - self.account_size(&previous.account) - } else { - 0 - }; - - match len.cmp(&previous_len) { - std::cmp::Ordering::Greater => { - self.data_size - .fetch_add(len - previous_len, Ordering::Relaxed); + pub(crate) fn store(&self, pubkey: Pubkey, slot: Slot, account: AccountSharedData) { + let key = (pubkey, slot); + let account_size = self.account_size(&account); + self.data_size.fetch_add(account_size, Ordering::Relaxed); + // self.queue is modified while holding a reference to the cache entry; + // so that another thread cannot write to the same key. + match self.cache.entry(key) { + Entry::Vacant(entry) => { + // Insert the entry at the end of the queue. + let mut queue = self.queue.lock().unwrap(); + let index = queue.insert_last(key); + entry.insert(ReadOnlyAccountCacheEntry { account, index }); } - std::cmp::Ordering::Less => { - self.data_size - .fetch_sub(previous_len - len, Ordering::Relaxed); - } - std::cmp::Ordering::Equal => { - // no change in size + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + let account_size = self.account_size(&entry.account); + self.data_size.fetch_sub(account_size, Ordering::Relaxed); + entry.account = account; + // Move the entry to the end of the queue. + let mut queue = self.queue.lock().unwrap(); + queue.remove(entry.index); + entry.index = queue.insert_last(key); } }; - } - - pub fn remove(&self, pubkey: &Pubkey, slot: Slot) { - if let Some((_, value)) = self.cache.remove(&(*pubkey, slot)) { - self.data_size - .fetch_sub(self.account_size(&value.account), Ordering::Relaxed); + // Evict entries from the front of the queue. + while self.data_size.load(Ordering::Relaxed) > self.max_data_size { + let (pubkey, slot) = match self.queue.lock().unwrap().get_first() { + None => break, + Some(key) => *key, + }; + self.remove(pubkey, slot); } } - fn purge_lru_list(&self, lru: &[LruEntry], lru_index: &mut usize) -> bool { - let mut freed_bytes = 0; - let start = *lru_index; - let mut done = false; - let current_size = self.data_size.load(Ordering::Relaxed); - for (timestamp, key) in lru.iter().skip(start) { - if current_size.saturating_sub(freed_bytes) <= self.max_data_size { - done = true; - break; - } - *lru_index += 1; - match self.cache.entry(*key) { - Entry::Vacant(_entry) => (), - Entry::Occupied(entry) => { - if *timestamp == *entry.get().last_used.read().unwrap() { - let size = self.account_size(&entry.get().account); - freed_bytes += size; - entry.remove(); - } - } - } - } - if freed_bytes > 0 { - // if this overflows, we'll have a really big data size, so we'll clean everything, scan all, and reset the size. Not ideal, but not terrible. - self.data_size.fetch_sub(freed_bytes, Ordering::Relaxed); - } - done + pub(crate) fn remove(&self, pubkey: Pubkey, slot: Slot) -> Option { + let (_, entry) = self.cache.remove(&(pubkey, slot))?; + // self.queue should be modified only after removing the entry from the + // cache, so that this is still safe if another thread writes to the + // same key. + self.queue.lock().unwrap().remove(entry.index); + let account_size = self.account_size(&entry.account); + self.data_size.fetch_sub(account_size, Ordering::Relaxed); + Some(entry.account) } - fn calculate_lru_list(&self, lru: &mut Vec) -> usize { - lru.clear(); - lru.reserve(self.cache.len()); - let mut new_size = 0; - for item in self.cache.iter() { - let value = item.value(); - let item_len = self.account_size(&value.account); - new_size += item_len; - lru.push((*value.last_used.read().unwrap(), *item.key())); - } - new_size - } - - fn bg_purge_lru_items(&self, once: bool) { - let mut lru = Vec::new(); - let mut lru_index = 0; - let mut stop = false; - loop { - if !once { - sleep(Duration::from_millis(200)); - } else { - if stop { - break; - } - stop = true; - } - - if self.stop.load(Ordering::Relaxed) { - break; - } - - // purge from the lru list we last made - if self.purge_lru_list(&lru, &mut lru_index) { - continue; - } - - // we didn't get enough, so calculate a new list and keep purging - let new_size = self.calculate_lru_list(&mut lru); - lru_index = 0; - self.data_size.store(new_size, Ordering::Relaxed); - lru.sort(); - self.purge_lru_list(&lru, &mut lru_index); - } - } - - pub fn cache_len(&self) -> usize { + pub(crate) fn cache_len(&self) -> usize { self.cache.len() } - pub fn data_size(&self) -> usize { + pub(crate) fn data_size(&self) -> usize { self.data_size.load(Ordering::Relaxed) } - pub fn get_and_reset_stats(&self) -> (u64, u64) { + pub(crate) fn get_and_reset_stats(&self) -> (u64, u64) { let hits = self.hits.swap(0, Ordering::Relaxed); let misses = self.misses.swap(0, Ordering::Relaxed); (hits, misses) @@ -240,10 +138,16 @@ impl ReadOnlyAccountsCache { } #[cfg(test)] -pub mod tests { +mod tests { use { super::*, + rand::{ + seq::{IteratorRandom, SliceRandom}, + Rng, SeedableRng, + }, + rand_chacha::ChaChaRng, solana_sdk::account::{accounts_equal, Account, WritableAccount}, + std::{collections::HashMap, iter::repeat_with, sync::Arc}, }; #[test] fn test_accountsdb_sizeof() { @@ -252,27 +156,18 @@ pub mod tests { assert!(std::mem::size_of::>() == std::mem::size_of::>()); } - #[test] - fn test_read_only_accounts_cache_drop() { - solana_logger::setup(); - let cache = ReadOnlyAccountsCache::new_test(100); - let stop = cache.stop.clone(); - drop(cache); - assert!(stop.load(Ordering::Relaxed)); - } - #[test] fn test_read_only_accounts_cache() { solana_logger::setup(); - let per_account_size = ReadOnlyAccountsCache::per_account_size(); + let per_account_size = CACHE_ENTRY_SIZE; let data_size = 100; let max = data_size + per_account_size; - let cache = ReadOnlyAccountsCache::new_test(max); + let cache = ReadOnlyAccountsCache::new(max); let slot = 0; - assert!(cache.load(&Pubkey::default(), slot).is_none()); + assert!(cache.load(Pubkey::default(), slot).is_none()); assert_eq!(0, cache.cache_len()); assert_eq!(0, cache.data_size()); - cache.remove(&Pubkey::default(), slot); // assert no panic + cache.remove(Pubkey::default(), slot); // assert no panic let key1 = Pubkey::new_unique(); let key2 = Pubkey::new_unique(); let key3 = Pubkey::new_unique(); @@ -284,46 +179,102 @@ pub mod tests { account2.checked_add_lamports(1).unwrap(); // so they compare differently let mut account3 = account1.clone(); account3.checked_add_lamports(4).unwrap(); // so they compare differently - cache.store(&key1, slot, &account1); + cache.store(key1, slot, account1.clone()); assert_eq!(100 + per_account_size, cache.data_size()); - assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); + assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1)); assert_eq!(1, cache.cache_len()); - cache.store(&key2, slot, &account2); - cache.bg_purge_lru_items(true); + cache.store(key2, slot, account2.clone()); assert_eq!(100 + per_account_size, cache.data_size()); - assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2)); + assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account2)); assert_eq!(1, cache.cache_len()); - cache.store(&key2, slot, &account1); // overwrite key2 with account1 + cache.store(key2, slot, account1.clone()); // overwrite key2 with account1 assert_eq!(100 + per_account_size, cache.data_size()); - assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1)); + assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account1)); assert_eq!(1, cache.cache_len()); - cache.remove(&key2, slot); + cache.remove(key2, slot); assert_eq!(0, cache.data_size()); assert_eq!(0, cache.cache_len()); // can store 2 items, 3rd item kicks oldest item out let max = (data_size + per_account_size) * 2; - let cache = ReadOnlyAccountsCache::new_test(max); - cache.store(&key1, slot, &account1); + let cache = ReadOnlyAccountsCache::new(max); + cache.store(key1, slot, account1.clone()); assert_eq!(100 + per_account_size, cache.data_size()); - assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); + assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1)); assert_eq!(1, cache.cache_len()); - cache.store(&key2, slot, &account2); + cache.store(key2, slot, account2.clone()); assert_eq!(max, cache.data_size()); - assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); - assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2)); + assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1)); + assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account2)); assert_eq!(2, cache.cache_len()); - cache.store(&key2, slot, &account1); // overwrite key2 with account1 + cache.store(key2, slot, account1.clone()); // overwrite key2 with account1 assert_eq!(max, cache.data_size()); - assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); - assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1)); + assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1)); + assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account1)); assert_eq!(2, cache.cache_len()); - cache.store(&key3, slot, &account3); - cache.bg_purge_lru_items(true); + cache.store(key3, slot, account3.clone()); assert_eq!(max, cache.data_size()); - assert!(cache.load(&key1, slot).is_none()); // was lru purged - assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1)); - assert!(accounts_equal(&cache.load(&key3, slot).unwrap(), &account3)); + assert!(cache.load(key1, slot).is_none()); // was lru purged + assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account1)); + assert!(accounts_equal(&cache.load(key3, slot).unwrap(), &account3)); assert_eq!(2, cache.cache_len()); } + + #[test] + fn test_read_only_accounts_cache_random() { + const SEED: [u8; 32] = [0xdb; 32]; + const DATA_SIZE: usize = 19; + const MAX_CACHE_SIZE: usize = 17 * (CACHE_ENTRY_SIZE + DATA_SIZE); + let mut rng = ChaChaRng::from_seed(SEED); + let cache = ReadOnlyAccountsCache::new(MAX_CACHE_SIZE); + let slots: Vec = repeat_with(|| rng.gen_range(0, 1000)).take(5).collect(); + let pubkeys: Vec = repeat_with(|| { + let mut arr = [0u8; 32]; + rng.fill(&mut arr[..]); + Pubkey::new_from_array(arr) + }) + .take(7) + .collect(); + let mut hash_map = HashMap::::new(); + for ix in 0..1000 { + if rng.gen_bool(0.1) { + let key = *cache.cache.iter().choose(&mut rng).unwrap().key(); + let (pubkey, slot) = key; + let account = cache.load(pubkey, slot).unwrap(); + let (other, index) = hash_map.get_mut(&key).unwrap(); + assert_eq!(account, *other); + *index = ix; + } else { + let mut data = vec![0u8; DATA_SIZE]; + rng.fill(&mut data[..]); + let account = AccountSharedData::from(Account { + lamports: rng.gen(), + data, + executable: rng.gen(), + rent_epoch: rng.gen(), + owner: Pubkey::default(), + }); + let slot = *slots.choose(&mut rng).unwrap(); + let pubkey = *pubkeys.choose(&mut rng).unwrap(); + let key = (pubkey, slot); + hash_map.insert(key, (account.clone(), ix)); + cache.store(pubkey, slot, account); + } + } + assert_eq!(cache.cache_len(), 17); + assert_eq!(hash_map.len(), 35); + let index = hash_map + .iter() + .filter(|(k, _)| cache.cache.contains_key(k)) + .map(|(_, (_, ix))| *ix) + .min() + .unwrap(); + for (key, (account, ix)) in hash_map { + let (pubkey, slot) = key; + assert_eq!( + cache.load(pubkey, slot), + if ix < index { None } else { Some(account) } + ); + } + } }