From 3996b699dc485e3110ad9aa64667b88b2f3cb658 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Thu, 1 Apr 2021 07:16:34 -0500 Subject: [PATCH] read only account cache for executable accounts - improve replay (#16150) * read only account cache * tests * clippy * cleanup * new file, add tests * remove copy/paste code from test * remove dead code * all loads use cache * remove stale comments * add metrics logging for read only cache size * report read only cache hits and misses * consistency * formatting * rename, add comment * u64 * better interaction with existing cache * lru list saved between cleans --- runtime/src/accounts_db.rs | 119 +++++++++++- runtime/src/lib.rs | 1 + runtime/src/read_only_accounts_cache.rs | 242 ++++++++++++++++++++++++ 3 files changed, 360 insertions(+), 2 deletions(-) create mode 100644 runtime/src/read_only_accounts_cache.rs diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index f3b7283d27..3a9e5ab660 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -27,6 +27,7 @@ use crate::{ }, append_vec::{AppendVec, StoredAccountMeta, StoredMeta}, contains::Contains, + read_only_accounts_cache::ReadOnlyAccountsCache, }; use blake3::traits::digest::Digest; use crossbeam_channel::{unbounded, Receiver, Sender}; @@ -287,6 +288,13 @@ impl<'a> LoadedAccount<'a> { }, } } + + pub fn is_cached(&self) -> bool { + match self { + LoadedAccount::Stored(_) => false, + LoadedAccount::Cached(_) => true, + } + } } #[derive(Clone, Default, Debug)] @@ -694,6 +702,7 @@ pub struct AccountsDb { pub accounts_cache: AccountsCache, sender_bg_hasher: Option>, + pub read_only_accounts_cache: ReadOnlyAccountsCache, recycle_stores: RwLock, @@ -1065,6 +1074,7 @@ impl solana_frozen_abi::abi_example::AbiExample for AccountsDb { impl Default for AccountsDb { fn default() -> Self { let num_threads = get_thread_count(); + const MAX_READ_ONLY_CACHE_DATA_SIZE: usize = 200_000_000; let mut bank_hashes = HashMap::new(); bank_hashes.insert(0, BankHashInfo::default()); @@ -1073,6 +1083,7 @@ impl Default for AccountsDb { storage: AccountStorage::default(), accounts_cache: AccountsCache::default(), sender_bg_hasher: None, + read_only_accounts_cache: ReadOnlyAccountsCache::new(MAX_READ_ONLY_CACHE_DATA_SIZE), recycle_stores: RwLock::new(RecycleStores::default()), uncleaned_pubkeys: DashMap::new(), next_id: AtomicUsize::new(0), @@ -2271,10 +2282,46 @@ impl AccountsDb { // `lock` released here }; + if self.caching_enabled && store_id != CACHE_VIRTUAL_STORAGE_ID { + let result = self.read_only_accounts_cache.load(pubkey, slot); + if let Some(account) = result { + return Some((account, slot)); + } + } + //TODO: thread this as a ref - self.get_account_accessor_from_cache_or_storage(slot, pubkey, store_id, offset) + let mut is_cached = false; + let loaded_account = self + .get_account_accessor_from_cache_or_storage(slot, pubkey, store_id, offset) .get_loaded_account() - .map(|loaded_account| (loaded_account.account(), slot)) + .map(|loaded_account| { + is_cached = loaded_account.is_cached(); + (loaded_account.account(), slot) + }); + + if self.caching_enabled && !is_cached { + match loaded_account { + Some((account, slot)) => { + /* + We show this store into the read-only cache for account 'A' and future loads of 'A' from the read-only cache are + safe/reflect 'A''s latest state on this fork. + This safety holds if during replay of slot 'S', we show we only read 'A' from the write cache, + not the read-only cache, after it's been updated in replay of slot 'S'. + Assume for contradiction this is not true, and we read 'A' from the read-only cache *after* it had been updated in 'S'. + This means an entry '(S, A)' was added to the read-only cache after 'A' had been updated in 'S'. + Now when '(S, A)' was being added to the read-only cache, it must have been true that 'is_cache == false', + which means '(S', A)' does not exist in the write cache yet. + However, by the assumption for contradiction above , 'A' has already been updated in 'S' which means '(S, A)' + must exist in the write cache, which is a contradiction. + */ + self.read_only_accounts_cache.store(pubkey, slot, &account); + Some((account, slot)) + } + _ => None, + } + } else { + loaded_account + } } pub fn load_account_hash(&self, ancestors: &Ancestors, pubkey: &Pubkey) -> Hash { @@ -3439,6 +3486,7 @@ impl AccountsDb { let accounts_and_meta_to_store: Vec<(StoredMeta, &AccountSharedData)> = accounts .iter() .map(|(pubkey, account)| { + self.read_only_accounts_cache.remove(pubkey, slot); let account = if account.lamports == 0 { &default_account } else { @@ -4288,6 +4336,8 @@ impl AccountsDb { Ordering::Relaxed, ) == Ok(last) { + let (read_only_cache_hits, read_only_cache_misses) = + self.read_only_accounts_cache.get_and_reset_stats(); datapoint_info!( "accounts_db_store_timings", ( @@ -4330,6 +4380,22 @@ impl AccountsDb { self.stats.store_total_data.swap(0, Ordering::Relaxed), i64 ), + ( + "read_only_accounts_cache_entries", + self.read_only_accounts_cache.cache_len(), + i64 + ), + ( + "read_only_accounts_cache_data_size", + self.read_only_accounts_cache.data_size(), + i64 + ), + ("read_only_accounts_cache_hits", read_only_cache_hits, i64), + ( + "read_only_accounts_cache_misses", + read_only_cache_misses, + i64 + ), ); let recycle_stores = self.recycle_stores.read().unwrap(); @@ -8521,6 +8587,53 @@ pub mod tests { .unwrap_or_default() } + #[test] + fn test_read_only_accounts_cache() { + let caching_enabled = true; + let db = Arc::new(AccountsDb::new_with_config( + Vec::new(), + &ClusterType::Development, + HashSet::new(), + caching_enabled, + )); + + let account_key = Pubkey::new_unique(); + let zero_lamport_account = + AccountSharedData::new(0, 0, &AccountSharedData::default().owner); + let slot1_account = AccountSharedData::new(1, 1, &AccountSharedData::default().owner); + db.store_cached(0, &[(&account_key, &zero_lamport_account)]); + db.store_cached(1, &[(&account_key, &slot1_account)]); + + db.add_root(0); + db.add_root(1); + db.clean_accounts(None); + db.flush_accounts_cache(true, None); + db.clean_accounts(None); + db.add_root(2); + + assert_eq!(db.read_only_accounts_cache.cache_len(), 0); + let account = db + .load(&Ancestors::default(), &account_key) + .map(|(account, _)| account) + .unwrap(); + assert_eq!(account.lamports, 1); + assert_eq!(db.read_only_accounts_cache.cache_len(), 1); + let account = db + .load(&Ancestors::default(), &account_key) + .map(|(account, _)| account) + .unwrap(); + assert_eq!(account.lamports, 1); + assert_eq!(db.read_only_accounts_cache.cache_len(), 1); + db.store_cached(2, &[(&account_key, &zero_lamport_account)]); + assert_eq!(db.read_only_accounts_cache.cache_len(), 1); + let account = db + .load(&Ancestors::default(), &account_key) + .map(|(account, _)| account) + .unwrap(); + assert_eq!(account.lamports, 0); + assert_eq!(db.read_only_accounts_cache.cache_len(), 1); + } + #[test] fn test_flush_cache_clean() { let caching_enabled = true; @@ -8547,6 +8660,8 @@ pub mod tests { .do_load(&Ancestors::default(), &account_key, Some(0)) .unwrap(); assert_eq!(account.0.lamports, 0); + // since this item is in the cache, it should not be in the read only cache + assert_eq!(db.read_only_accounts_cache.cache_len(), 0); // Flush, then clean again. Should not need another root to initiate the cleaning // because `accounts_index.uncleaned_roots` should be correct diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 9d8af25d79..f49a5a760a 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -25,6 +25,7 @@ pub mod loader_utils; pub mod log_collector; pub mod message_processor; mod native_loader; +mod read_only_accounts_cache; pub mod rent_collector; pub mod secondary_index; pub mod serde_snapshot; diff --git a/runtime/src/read_only_accounts_cache.rs b/runtime/src/read_only_accounts_cache.rs new file mode 100644 index 0000000000..6ba4e0dc79 --- /dev/null +++ b/runtime/src/read_only_accounts_cache.rs @@ -0,0 +1,242 @@ +//! ReadOnlyAccountsCache used to store accounts, such as executable accounts, +//! which can be large, loaded many times, and rarely change. +use dashmap::DashMap; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, + }, + time::Instant, +}; + +use solana_sdk::{ + account::{AccountSharedData, ReadableAccount}, + clock::Slot, + pubkey::Pubkey, +}; + +type ReadOnlyCacheKey = (Pubkey, Slot); +type LruEntry = (Instant, usize, ReadOnlyCacheKey); +type LruList = Arc>>; + +#[derive(Debug)] +pub struct ReadOnlyAccountCacheEntry { + pub account: AccountSharedData, + pub last_used: Arc>, +} + +#[derive(Debug)] +pub struct ReadOnlyAccountsCache { + cache: DashMap, + max_data_size: usize, + data_size: Arc>, + hits: AtomicU64, + misses: AtomicU64, + lru: LruList, +} + +impl ReadOnlyAccountsCache { + pub fn new(max_data_size: usize) -> Self { + Self { + max_data_size, + cache: DashMap::default(), + data_size: Arc::new(RwLock::new(0)), + hits: AtomicU64::new(0), + misses: AtomicU64::new(0), + lru: Arc::new(RwLock::new(Vec::new())), + } + } + + pub fn load(&self, pubkey: &Pubkey, slot: Slot) -> Option { + self.cache + .get(&(*pubkey, slot)) + .map(|account_ref| { + self.hits.fetch_add(1, Ordering::Relaxed); + let value = account_ref.value(); + // remember last use + let now = Instant::now(); + *value.last_used.write().unwrap() = now; + value.account.clone() + }) + .or_else(|| { + self.misses.fetch_add(1, Ordering::Relaxed); + None + }) + } + + pub fn store(&self, pubkey: &Pubkey, slot: Slot, account: &AccountSharedData) { + let len = account.data().len(); + self.cache.insert( + (*pubkey, slot), + ReadOnlyAccountCacheEntry { + account: account.clone(), + last_used: Arc::new(RwLock::new(Instant::now())), + }, + ); + + // maybe purge after we insert. Insert may have replaced. + let new_size = self.maybe_purge_lru_items(len); + *self.data_size.write().unwrap() = new_size; + } + + pub fn remove(&self, pubkey: &Pubkey, slot: Slot) { + // does not keep track of data size reduction here. + // data size will be recomputed the next time we store and we think we may now be too large. + self.cache.remove(&(*pubkey, slot)); + } + + fn purge_lru_list( + &self, + lru: &mut Vec, + verify_timestamp: bool, + mut current_size: usize, + ) -> usize { + let mut processed = 0; + for lru_item in lru.iter() { + let (timestamp, size, key) = lru_item; + processed += 1; + let mut try_remove = true; + if verify_timestamp { + let item = self.cache.get(key); + match item { + Some(item) => { + if *timestamp != *item.last_used.read().unwrap() { + // this item was used more recently than our list indicates, so skip it + continue; + } + // item is as old as we thought, so fall through and delete it + } + None => { + try_remove = false; + } + } + } + + if try_remove { + self.cache.remove(&key); + } + current_size = current_size.saturating_sub(*size); // we don't subtract on remove, so subtract now + if current_size <= self.max_data_size { + break; + } + } + lru.drain(0..processed); + current_size + } + + fn calculate_lru_list(&self, lru: &mut Vec) -> usize { + // purge in lru order + let mut new_size = 0; + for item in self.cache.iter() { + let value = item.value(); + let item_len = value.account.data().len(); + new_size += item_len; + lru.push((*value.last_used.read().unwrap(), item_len, *item.key())); + } + new_size + } + + fn maybe_purge_lru_items(&self, new_item_len: usize) -> usize { + let mut new_size = *self.data_size.read().unwrap() + new_item_len; + if new_size <= self.max_data_size { + return new_size; + } + + // purge from the lru list we last made + let mut list = self.lru.write().unwrap(); + new_size = self.purge_lru_list(&mut list, true, new_size); + if new_size <= self.max_data_size { + return new_size; + } + + // we didn't get enough, so calculate a new list and keep purging + new_size = self.calculate_lru_list(&mut list); + if new_size > self.max_data_size { + list.sort(); + new_size = self.purge_lru_list(&mut list, false, new_size); + // the list is stored in self so we use it to purge next time + } + new_size + } + + pub fn cache_len(&self) -> usize { + self.cache.len() + } + + pub fn data_size(&self) -> usize { + *self.data_size.read().unwrap() + } + + pub fn get_and_reset_stats(&self) -> (u64, u64) { + let hits = self.hits.swap(0, Ordering::Relaxed); + let misses = self.misses.swap(0, Ordering::Relaxed); + (hits, misses) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use solana_sdk::account::{accounts_equal, Account}; + #[test] + fn test_read_only_accounts_cache() { + solana_logger::setup(); + let max = 100; + let cache = ReadOnlyAccountsCache::new(max); + let slot = 0; + assert!(cache.load(&Pubkey::default(), slot).is_none()); + assert_eq!(0, cache.cache_len()); + assert_eq!(0, cache.data_size()); + cache.remove(&Pubkey::default(), slot); // assert no panic + let key1 = Pubkey::new_unique(); + let key2 = Pubkey::new_unique(); + let key3 = Pubkey::new_unique(); + let account1 = AccountSharedData::from(Account { + data: vec![0; max], + ..Account::default() + }); + let mut account2 = account1.clone(); + account2.lamports += 1; // so they compare differently + let mut account3 = account1.clone(); + account3.lamports += 4; // so they compare differently + cache.store(&key1, slot, &account1); + assert_eq!(100, cache.data_size()); + assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); + assert_eq!(1, cache.cache_len()); + cache.store(&key2, slot, &account2); + assert_eq!(100, cache.data_size()); + assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2)); + assert_eq!(1, cache.cache_len()); + cache.store(&key2, slot, &account1); // overwrite key2 with account1 + assert_eq!(100, cache.data_size()); + assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1)); + assert_eq!(1, cache.cache_len()); + cache.remove(&key2, slot); + assert_eq!(100, cache.data_size()); + assert_eq!(0, cache.cache_len()); + + // can store 2 items, 3rd item kicks oldest item out + let max = 200; + let cache = ReadOnlyAccountsCache::new(max); + cache.store(&key1, slot, &account1); + assert_eq!(100, cache.data_size()); + assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); + assert_eq!(1, cache.cache_len()); + cache.store(&key2, slot, &account2); + assert_eq!(200, cache.data_size()); + assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); + assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2)); + assert_eq!(2, cache.cache_len()); + cache.store(&key2, slot, &account1); // overwrite key2 with account1 + assert_eq!(200, cache.data_size()); + assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); + assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1)); + assert_eq!(2, cache.cache_len()); + cache.store(&key3, slot, &account3); + assert_eq!(200, cache.data_size()); + assert!(cache.load(&key1, slot).is_none()); // was lru purged + assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1)); + assert!(accounts_equal(&cache.load(&key3, slot).unwrap(), &account3)); + assert_eq!(2, cache.cache_len()); + } +}