Remove unnecesary flushes in previous roots (#14596) (#14803)

Co-authored-by: Carl Lin <carl@solana.com>
(cherry picked from commit c77461e428)

Co-authored-by: carllin <wumu727@gmail.com>
This commit is contained in:
mergify[bot]
2021-01-23 15:02:32 +00:00
committed by GitHub
parent 68bf58aac0
commit fef4100f5f
5 changed files with 640 additions and 129 deletions

View File

@ -275,6 +275,10 @@ impl AccountsBackgroundService {
let snapshot_block_height = let snapshot_block_height =
request_handler.handle_snapshot_requests(accounts_db_caching_enabled); request_handler.handle_snapshot_requests(accounts_db_caching_enabled);
if accounts_db_caching_enabled { if accounts_db_caching_enabled {
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.flush_accounts_cache_if_needed(); bank.flush_accounts_cache_if_needed();
} }
@ -300,6 +304,10 @@ impl AccountsBackgroundService {
> (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10)) > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
{ {
if accounts_db_caching_enabled { if accounts_db_caching_enabled {
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.force_flush_accounts_cache(); bank.force_flush_accounts_cache();
} }
bank.clean_accounts(true); bank.clean_accounts(true);

View File

@ -1,7 +1,7 @@
use dashmap::DashMap; use dashmap::DashMap;
use solana_sdk::{account::Account, clock::Slot, hash::Hash, pubkey::Pubkey}; use solana_sdk::{account::Account, clock::Slot, hash::Hash, pubkey::Pubkey};
use std::{ use std::{
collections::HashSet, collections::BTreeSet,
ops::Deref, ops::Deref,
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicBool, AtomicU64, Ordering},
@ -90,7 +90,7 @@ pub struct AccountsCache {
cache: DashMap<Slot, SlotCache>, cache: DashMap<Slot, SlotCache>,
// Queue of potentially unflushed roots. Random eviction + cache too large // Queue of potentially unflushed roots. Random eviction + cache too large
// could have triggered a flush of this slot already // could have triggered a flush of this slot already
maybe_unflushed_roots: RwLock<HashSet<Slot>>, maybe_unflushed_roots: RwLock<BTreeSet<Slot>>,
max_flushed_root: AtomicU64, max_flushed_root: AtomicU64,
} }
@ -147,14 +147,25 @@ impl AccountsCache {
} }
pub fn add_root(&self, root: Slot) { pub fn add_root(&self, root: Slot) {
self.maybe_unflushed_roots.write().unwrap().insert(root); let max_flushed_root = self.fetch_max_flush_root();
if root > max_flushed_root || (root == max_flushed_root && root == 0) {
self.maybe_unflushed_roots.write().unwrap().insert(root);
}
} }
pub fn clear_roots(&self) -> HashSet<Slot> { pub fn clear_roots(&self, max_root: Option<Slot>) -> BTreeSet<Slot> {
std::mem::replace( let mut w_maybe_unflushed_roots = self.maybe_unflushed_roots.write().unwrap();
&mut self.maybe_unflushed_roots.write().unwrap(), if let Some(max_root) = max_root {
HashSet::new(), // `greater_than_max_root` contains all slots >= `max_root + 1`, or alternatively,
) // all slots > `max_root`. Meanwhile, `w_maybe_unflushed_roots` is left with all slots
// <= `max_root`.
let greater_than_max_root = w_maybe_unflushed_roots.split_off(&(max_root + 1));
// After the replace, `w_maybe_unflushed_roots` contains slots > `max_root`, and
// we return all slots <= `max_root`
std::mem::replace(&mut w_maybe_unflushed_roots, greater_than_max_root)
} else {
std::mem::replace(&mut *w_maybe_unflushed_roots, BTreeSet::new())
}
} }
// Removes slots less than or equal to `max_root`. Only safe to pass in a rooted slot, // Removes slots less than or equal to `max_root`. Only safe to pass in a rooted slot,

View File

@ -49,7 +49,7 @@ use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::{ use std::{
borrow::Cow, borrow::Cow,
boxed::Box, boxed::Box,
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, BTreeSet, HashMap, HashSet},
convert::{TryFrom, TryInto}, convert::{TryFrom, TryInto},
io::{Error as IOError, Result as IOResult}, io::{Error as IOError, Result as IOResult},
ops::RangeBounds, ops::RangeBounds,
@ -931,23 +931,30 @@ impl AccountsDB {
reclaims reclaims
} }
fn max_clean_root(&self, proposed_clean_root: Option<Slot>) -> Option<Slot> {
match (
self.accounts_index.min_ongoing_scan_root(),
proposed_clean_root,
) {
(None, None) => None,
(Some(min_scan_root), None) => Some(min_scan_root),
(None, Some(proposed_clean_root)) => Some(proposed_clean_root),
(Some(min_scan_root), Some(proposed_clean_root)) => {
Some(std::cmp::min(min_scan_root, proposed_clean_root))
}
}
}
// Purge zero lamport accounts and older rooted account states as garbage // Purge zero lamport accounts and older rooted account states as garbage
// collection // collection
// Only remove those accounts where the entire rooted history of the account // Only remove those accounts where the entire rooted history of the account
// can be purged because there are no live append vecs in the ancestors // can be purged because there are no live append vecs in the ancestors
pub fn clean_accounts(&self, max_clean_root: Option<Slot>) { pub fn clean_accounts(&self, max_clean_root: Option<Slot>) {
let max_clean_root = self.max_clean_root(max_clean_root);
// hold a lock to prevent slot shrinking from running because it might modify some rooted // hold a lock to prevent slot shrinking from running because it might modify some rooted
// slot storages which can not happen as long as we're cleaning accounts because we're also // slot storages which can not happen as long as we're cleaning accounts because we're also
// modifying the rooted slot storages! // modifying the rooted slot storages!
let max_clean_root = match (self.accounts_index.min_ongoing_scan_root(), max_clean_root) {
(None, None) => None,
(Some(min_scan_root), None) => Some(min_scan_root),
(None, Some(max_clean_root)) => Some(max_clean_root),
(Some(min_scan_root), Some(max_clean_root)) => {
Some(std::cmp::min(min_scan_root, max_clean_root))
}
};
let mut candidates_v1 = self.shrink_candidate_slots_v1.lock().unwrap(); let mut candidates_v1 = self.shrink_candidate_slots_v1.lock().unwrap();
self.report_store_stats(); self.report_store_stats();
@ -2408,21 +2415,37 @@ impl AccountsDB {
); );
} }
fn purge_slot_cache_keys(&self, dead_slot: Slot, slot_cache: SlotCache) { fn purge_slot_cache(&self, purged_slot: Slot, slot_cache: SlotCache) {
// Slot purged from cache should not exist in the backing store
assert!(self.storage.get_slot_stores(dead_slot).is_none());
let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new(); let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new();
let pubkey_to_slot_set: Vec<(Pubkey, Slot)> = slot_cache let pubkey_to_slot_set: Vec<(Pubkey, Slot)> = slot_cache
.iter() .iter()
.map(|account| { .map(|account| {
purged_slot_pubkeys.insert((dead_slot, *account.key())); purged_slot_pubkeys.insert((purged_slot, *account.key()));
(*account.key(), dead_slot) (*account.key(), purged_slot)
}) })
.collect(); .collect();
self.purge_slot_cache_pubkeys(purged_slot, purged_slot_pubkeys, pubkey_to_slot_set, true);
}
fn purge_slot_cache_pubkeys(
&self,
purged_slot: Slot,
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
pubkey_to_slot_set: Vec<(Pubkey, Slot)>,
is_dead: bool,
) {
// Slot purged from cache should not exist in the backing store
assert!(self.storage.get_slot_stores(purged_slot).is_none());
let num_purged_keys = pubkey_to_slot_set.len(); let num_purged_keys = pubkey_to_slot_set.len();
let reclaims = self.purge_keys_exact(&pubkey_to_slot_set); let reclaims = self.purge_keys_exact(&pubkey_to_slot_set);
assert_eq!(reclaims.len(), num_purged_keys); assert_eq!(reclaims.len(), num_purged_keys);
self.finalize_dead_slot_removal(std::iter::once(&dead_slot), purged_slot_pubkeys, None); if is_dead {
self.finalize_dead_slot_removal(
std::iter::once(&purged_slot),
purged_slot_pubkeys,
None,
);
}
} }
fn purge_slots(&self, slots: &HashSet<Slot>) { fn purge_slots(&self, slots: &HashSet<Slot>) {
@ -2440,8 +2463,10 @@ impl AccountsDB {
for remove_slot in non_roots { for remove_slot in non_roots {
if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) { if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) {
// If the slot is still in the cache, remove the backing storages for // If the slot is still in the cache, remove the backing storages for
// the slot and from the Accounts Index // the slot. The accounts index cleaning (removing from the slot list,
self.purge_slot_cache_keys(*remove_slot, slot_cache); // decrementing the account ref count), is handled in
// clean_accounts() -> purge_older_root_entries()
self.purge_slot_cache(*remove_slot, slot_cache);
} else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) { } else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) {
// Because AccountsBackgroundService synchronously flushes from the accounts cache // Because AccountsBackgroundService synchronously flushes from the accounts cache
// and handles all Bank::drop() (the cleanup function that leads to this // and handles all Bank::drop() (the cleanup function that leads to this
@ -2518,7 +2543,7 @@ impl AccountsDB {
if let Some(slot_cache) = self.accounts_cache.remove_slot(remove_slot) { if let Some(slot_cache) = self.accounts_cache.remove_slot(remove_slot) {
// If the slot is still in the cache, remove it from the cache // If the slot is still in the cache, remove it from the cache
self.purge_slot_cache_keys(remove_slot, slot_cache); self.purge_slot_cache(remove_slot, slot_cache);
} }
// TODO: Handle if the slot was flushed to storage while we were removing the cached // TODO: Handle if the slot was flushed to storage while we were removing the cached
@ -2839,32 +2864,29 @@ impl AccountsDB {
self.accounts_cache.report_size(); self.accounts_cache.report_size();
} }
// Force flush the cached roots, flush any unrooted frozen slots as well if there are // `force_flush` flushes all the cached roots `<= max_clean_root`. It also then
// > MAX_CACHE_SLOTS of them. // flushes:
pub fn force_flush_accounts_cache(&self) { // 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache,
self.flush_accounts_cache(true); // 2) It there are still > MAX_CACHE_SLOTS remaining slots in the cache, the excess
} // unrooted slots
pub fn flush_accounts_cache(&self, force_flush: bool, max_clean_root: Option<Slot>) {
#[cfg(not(test))]
assert!(max_clean_root.is_some());
pub fn flush_accounts_cache_if_needed(&self) {
self.flush_accounts_cache(false);
}
fn flush_accounts_cache(&self, force_flush: bool) {
if !force_flush && self.accounts_cache.num_slots() <= MAX_CACHE_SLOTS { if !force_flush && self.accounts_cache.num_slots() <= MAX_CACHE_SLOTS {
return; return;
} }
// Flush all roots // Flush only the roots <= max_clean_root, so that snapshotting has all
// the relevant roots in storage.
let mut flush_roots_elapsed = Measure::start("flush_roots_elapsed"); let mut flush_roots_elapsed = Measure::start("flush_roots_elapsed");
let cached_roots = self.accounts_cache.clear_roots(); let mut account_bytes_saved = 0;
for root in &cached_roots { let mut num_accounts_saved = 0;
self.flush_slot_cache(*root); let (total_new_cleaned_roots, num_cleaned_roots_flushed) = self
self.accounts_cache.set_max_flush_root(*root); .flush_rooted_accounts_cache(
} max_clean_root,
Some((&mut account_bytes_saved, &mut num_accounts_saved)),
// Only add to the uncleaned roots set *after* we've flushed the previous roots, );
// so that clean will actually be able to clean the slots.
self.accounts_index.add_uncleaned_roots(cached_roots);
flush_roots_elapsed.stop(); flush_roots_elapsed.stop();
// Note we don't purge unrooted slots here because there may be ongoing scans/references // Note we don't purge unrooted slots here because there may be ongoing scans/references
@ -2872,27 +2894,49 @@ impl AccountsDB {
// banks // banks
// If there are > MAX_CACHE_SLOTS, then flush the excess ones to storage // If there are > MAX_CACHE_SLOTS, then flush the excess ones to storage
let (total_new_excess_roots, num_excess_roots_flushed) =
if self.accounts_cache.num_slots() > MAX_CACHE_SLOTS {
// Start by flushing the roots
//
// Cannot do any cleaning on roots past `max_clean_root` because future
// snapshots may need updates from those later slots, hence we pass `None`
// for `should_clean`.
self.flush_rooted_accounts_cache(None, None)
} else {
(0, 0)
};
let old_slots = self.accounts_cache.find_older_frozen_slots(MAX_CACHE_SLOTS); let old_slots = self.accounts_cache.find_older_frozen_slots(MAX_CACHE_SLOTS);
let total_excess_slot_count = old_slots.len(); let excess_slot_count = old_slots.len();
let mut unflushable_unrooted_slot_count = 0; let mut unflushable_unrooted_slot_count = 0;
let max_flushed_root = self.accounts_cache.fetch_max_flush_root(); let max_flushed_root = self.accounts_cache.fetch_max_flush_root();
for old_slot in old_slots { for old_slot in old_slots {
// Don't flush slots that are known to be unrooted // Don't flush slots that are known to be unrooted
if old_slot > max_flushed_root { if old_slot > max_flushed_root {
self.flush_slot_cache(old_slot); self.flush_slot_cache(old_slot, None::<&mut fn(&_, &_) -> bool>);
} else { } else {
unflushable_unrooted_slot_count += 1; unflushable_unrooted_slot_count += 1;
} }
} }
datapoint_info!( datapoint_info!(
"accounts_db-cache-limit-slots", "accounts_db-flush_accounts_cache",
("total_excess_slot_count", total_excess_slot_count, i64), ("total_new_cleaned_roots", total_new_cleaned_roots, i64),
("num_cleaned_roots_flushed", num_cleaned_roots_flushed, i64),
("total_new_excess_roots", total_new_excess_roots, i64),
("num_excess_roots_flushed", num_excess_roots_flushed, i64),
("excess_slot_count", excess_slot_count, i64),
( (
"unflushable_unrooted_slot_count", "unflushable_unrooted_slot_count",
unflushable_unrooted_slot_count, unflushable_unrooted_slot_count,
i64 i64
), ),
(
"flush_roots_elapsed",
flush_roots_elapsed.as_us() as i64,
i64
),
("account_bytes_saved", account_bytes_saved, i64),
("num_accounts_saved", num_accounts_saved, i64),
); );
// Flush a random slot out after every force flush to catch any inconsistencies // Flush a random slot out after every force flush to catch any inconsistencies
@ -2910,58 +2954,158 @@ impl AccountsDB {
"Flushing random slot: {}, num_remaining: {}", "Flushing random slot: {}, num_remaining: {}",
*rand_slot, num_slots_remaining *rand_slot, num_slots_remaining
); );
self.flush_slot_cache(*rand_slot); self.flush_slot_cache(*rand_slot, None::<&mut fn(&_, &_) -> bool>);
} }
} }
inc_new_counter_info!("flush_roots_elapsed", flush_roots_elapsed.as_us() as usize);
} }
fn flush_slot_cache(&self, slot: Slot) { fn flush_rooted_accounts_cache(
&self,
mut max_flush_root: Option<Slot>,
should_clean: Option<(&mut usize, &mut usize)>,
) -> (usize, usize) {
if should_clean.is_some() {
max_flush_root = self.max_clean_root(max_flush_root);
}
// If there is a long running scan going on, this could prevent any cleaning
// past `max_flush_root`.
let cached_roots: BTreeSet<Slot> = self.accounts_cache.clear_roots(max_flush_root);
// Use HashMap because HashSet doesn't provide Entry api
let mut written_accounts = HashMap::new();
// If `should_clean` is None, then`should_flush_f` is also None, which will cause
// `flush_slot_cache` to flush all accounts to storage without cleaning any accounts.
let mut should_flush_f = should_clean.map(|(account_bytes_saved, num_accounts_saved)| {
move |&pubkey: &Pubkey, account: &Account| {
use std::collections::hash_map::Entry::{Occupied, Vacant};
let should_flush = match written_accounts.entry(pubkey) {
Vacant(vacant_entry) => {
vacant_entry.insert(());
true
}
Occupied(_occupied_entry) => {
*account_bytes_saved += account.data.len();
*num_accounts_saved += 1;
// If a later root already wrote this account, no point
// in flushing it
false
}
};
should_flush
}
});
// Iterate from highest to lowest so that we don't need to flush earlier
// outdated updates in earlier roots
let mut num_roots_flushed = 0;
for &root in cached_roots.iter().rev() {
if self.flush_slot_cache(root, should_flush_f.as_mut()) {
num_roots_flushed += 1;
}
// Regardless of whether this slot was *just* flushed from the cache by the above
// `flush_slot_cache()`, we should update the `max_flush_root`.
// This is because some rooted slots may be flushed to storage *before* they are marked as root.
// This can occur for instance when:
// 1) The cache is overwhelmed, we we flushed some yet to be rooted frozen slots
// 2) Random evictions
// These slots may then *later* be marked as root, so we still need to handle updating the
// `max_flush_root` in the accounts cache.
self.accounts_cache.set_max_flush_root(root);
}
// Only add to the uncleaned roots set *after* we've flushed the previous roots,
// so that clean will actually be able to clean the slots.
let num_new_roots = cached_roots.len();
self.accounts_index.add_uncleaned_roots(cached_roots);
(num_new_roots, num_roots_flushed)
}
// `should_flush_f` is an optional closure that determines wehther a given
// account should be flushed. Passing `None` will by default flush all
// accounts
fn flush_slot_cache(
&self,
slot: Slot,
mut should_flush_f: Option<&mut impl FnMut(&Pubkey, &Account) -> bool>,
) -> bool {
info!("flush_slot_cache slot: {}", slot); info!("flush_slot_cache slot: {}", slot);
if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) { let slot_cache = self.accounts_cache.slot_cache(slot);
if let Some(slot_cache) = slot_cache {
let iter_items: Vec<_> = slot_cache.iter().collect(); let iter_items: Vec<_> = slot_cache.iter().collect();
let mut total_size = 0; let mut total_size = 0;
let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new();
let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![];
let (accounts, hashes): (Vec<(&Pubkey, &Account)>, Vec<Hash>) = iter_items let (accounts, hashes): (Vec<(&Pubkey, &Account)>, Vec<Hash>) = iter_items
.iter() .iter()
.map(|iter_item| { .filter_map(|iter_item| {
let key = iter_item.key(); let key = iter_item.key();
let account = &iter_item.value().account; let account = &iter_item.value().account;
let hash = iter_item.value().hash; let should_flush = should_flush_f
total_size += (account.data.len() + STORE_META_OVERHEAD) as u64; .as_mut()
((key, account), hash) .map(|should_flush_f| should_flush_f(key, account))
.unwrap_or(true);
if should_flush {
let hash = iter_item.value().hash;
total_size += (account.data.len() + STORE_META_OVERHEAD) as u64;
Some(((key, account), hash))
} else {
// If we don't flush, we have to remove the entry from the
// index, since it's equivalent to purging
purged_slot_pubkeys.insert((slot, *key));
pubkey_to_slot_set.push((*key, slot));
None
}
}) })
.unzip(); .unzip();
let aligned_total_size = self.page_align(total_size);
// This ensures that all updates are written to an AppendVec, before any let is_dead_slot = accounts.is_empty();
// updates to the index happen, so anybody that sees a real entry in the index, // Remove the account index entries from earlier roots that are outdated by later roots.
// will be able to find the account in storage // Safe because queries to the index will be reading updates from later roots.
let flushed_store = self.purge_slot_cache_pubkeys(
self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache");
self.store_accounts_custom(
slot, slot,
&accounts, purged_slot_pubkeys,
&hashes, pubkey_to_slot_set,
Some(Box::new(move |_, _| flushed_store.clone())), is_dead_slot,
None,
false,
);
// If the above sizing function is correct, just one AppendVec is enough to hold
// all the data for the slot
assert_eq!(
self.storage
.get_slot_stores(slot)
.unwrap()
.read()
.unwrap()
.len(),
1
); );
if !is_dead_slot {
let aligned_total_size = self.page_align(total_size);
// This ensures that all updates are written to an AppendVec, before any
// updates to the index happen, so anybody that sees a real entry in the index,
// will be able to find the account in storage
let flushed_store =
self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache");
self.store_accounts_custom(
slot,
&accounts,
&hashes,
Some(Box::new(move |_, _| flushed_store.clone())),
None,
false,
);
// If the above sizing function is correct, just one AppendVec is enough to hold
// all the data for the slot
assert_eq!(
self.storage
.get_slot_stores(slot)
.unwrap()
.read()
.unwrap()
.len(),
1
);
}
// Remove this slot from the cache, which will to AccountsDb readers should look like an // Remove this slot from the cache, which will to AccountsDb readers should look like an
// atomic switch from the cache to storage // atomic switch from the cache to storage
assert!(self.accounts_cache.remove_slot(slot).is_some()); assert!(self.accounts_cache.remove_slot(slot).is_some());
true
} else {
false
} }
} }
@ -6948,7 +7092,7 @@ pub mod tests {
// No root was added yet, requires an ancestor to find // No root was added yet, requires an ancestor to find
// the account // the account
db.force_flush_accounts_cache(); db.flush_accounts_cache(true, None);
let ancestors = vec![(slot, 1)].into_iter().collect(); let ancestors = vec![(slot, 1)].into_iter().collect();
assert_eq!( assert_eq!(
db.load_slow(&ancestors, &key), db.load_slow(&ancestors, &key),
@ -6957,7 +7101,7 @@ pub mod tests {
// Add root then flush // Add root then flush
db.add_root(slot); db.add_root(slot);
db.force_flush_accounts_cache(); db.flush_accounts_cache(true, None);
assert_eq!(db.load_slow(&HashMap::new(), &key), Some((account0, slot))); assert_eq!(db.load_slow(&HashMap::new(), &key), Some((account0, slot)));
} }
@ -6988,7 +7132,7 @@ pub mod tests {
db.load_slow(&ancestors, &unrooted_key), db.load_slow(&ancestors, &unrooted_key),
Some((account0.clone(), unrooted_slot)) Some((account0.clone(), unrooted_slot))
); );
db.force_flush_accounts_cache(); db.flush_accounts_cache(true, None);
// After the flush, the unrooted slot is still in the cache // After the flush, the unrooted slot is still in the cache
assert!(db.load_slow(&ancestors, &unrooted_key).is_some()); assert!(db.load_slow(&ancestors, &unrooted_key).is_some());
@ -7037,7 +7181,7 @@ pub mod tests {
} }
} }
db.flush_accounts_cache_if_needed(); db.flush_accounts_cache(false, None);
let total_slots = num_roots + num_unrooted; let total_slots = num_roots + num_unrooted;
// If there's <= the max size, then nothing will be flushed from the slot // If there's <= the max size, then nothing will be flushed from the slot
@ -7099,7 +7243,7 @@ pub mod tests {
// Flush, then clean again. Should not need another root to initiate the cleaning // Flush, then clean again. Should not need another root to initiate the cleaning
// because `accounts_index.uncleaned_roots` should be correct // because `accounts_index.uncleaned_roots` should be correct
db.force_flush_accounts_cache(); db.flush_accounts_cache(true, None);
db.clean_accounts(None); db.clean_accounts(None);
assert!(db assert!(db
.do_load(&Ancestors::default(), &account_key, Some(0)) .do_load(&Ancestors::default(), &account_key, Some(0))
@ -7176,7 +7320,7 @@ pub mod tests {
db.add_root(2); db.add_root(2);
// Flush the cache, slot 1 should remain in the cache, everything else should be flushed // Flush the cache, slot 1 should remain in the cache, everything else should be flushed
db.force_flush_accounts_cache(); db.flush_accounts_cache(true, None);
assert_eq!(db.accounts_cache.num_slots(), 1); assert_eq!(db.accounts_cache.num_slots(), 1);
assert!(db.accounts_cache.slot_cache(1).is_some()); assert!(db.accounts_cache.slot_cache(1).is_some());
@ -7223,7 +7367,7 @@ pub mod tests {
} }
accounts_db.add_root(slot); accounts_db.add_root(slot);
accounts_db.force_flush_accounts_cache(); accounts_db.flush_accounts_cache(true, None);
let mut storage_maps: Vec<Arc<AccountStorageEntry>> = accounts_db let mut storage_maps: Vec<Arc<AccountStorageEntry>> = accounts_db
.storage .storage
@ -7257,4 +7401,320 @@ pub mod tests {
assert_eq!(before_size, after_size + account.stored_size); assert_eq!(before_size, after_size + account.stored_size);
} }
} }
fn setup_accounts_db_cache_clean(num_slots: usize) -> (AccountsDB, Vec<Pubkey>, Vec<Slot>) {
let caching_enabled = true;
let accounts_db = AccountsDB::new_with_config(
Vec::new(),
&ClusterType::Development,
HashSet::new(),
caching_enabled,
);
let slots: Vec<_> = (0..num_slots as Slot).into_iter().collect();
let keys: Vec<Pubkey> = std::iter::repeat_with(Pubkey::new_unique)
.take(num_slots)
.collect();
// Store some subset of the keys in slots 0..num_slots
for slot in &slots {
for key in &keys[*slot as usize..] {
accounts_db.store_cached(*slot, &[(key, &Account::new(1, 0, &Pubkey::default()))]);
}
accounts_db.add_root(*slot as Slot);
}
// If there's <= MAX_CACHE_SLOTS, no slots should be flushed
if accounts_db.accounts_cache.num_slots() <= MAX_CACHE_SLOTS {
accounts_db.flush_accounts_cache(false, None);
assert_eq!(accounts_db.accounts_cache.num_slots(), num_slots);
}
(accounts_db, keys, slots)
}
#[test]
fn test_accounts_db_cache_clean_dead_slots() {
let num_slots = 10;
let (accounts_db, keys, mut slots) = setup_accounts_db_cache_clean(num_slots);
let last_dead_slot = (num_slots - 1) as Slot;
assert_eq!(*slots.last().unwrap(), last_dead_slot);
let alive_slot = last_dead_slot as Slot + 1;
slots.push(alive_slot);
for key in &keys {
// Store a slot that overwrites all previous keys, rendering all previous keys dead
accounts_db.store_cached(
alive_slot,
&[(key, &Account::new(1, 0, &Pubkey::default()))],
);
accounts_db.add_root(alive_slot);
}
// Before the flush, we can find entries in the database for slots < alive_slot if we specify
// a smaller max root
for key in &keys {
assert!(accounts_db
.do_load(&Ancestors::default(), key, Some(last_dead_slot))
.is_some());
}
// If no `max_clean_root` is specified, cleaning should purge all flushed slots
accounts_db.flush_accounts_cache(true, None);
assert_eq!(accounts_db.accounts_cache.num_slots(), 0);
let mut uncleaned_roots = accounts_db
.accounts_index
.clear_uncleaned_roots(None)
.into_iter()
.collect::<Vec<_>>();
uncleaned_roots.sort_unstable();
assert_eq!(uncleaned_roots, slots);
assert_eq!(
accounts_db.accounts_cache.fetch_max_flush_root(),
alive_slot,
);
// Specifying a max_root < alive_slot, should not return any more entries,
// as those have been purged from the accounts index for the dead slots.
for key in &keys {
assert!(accounts_db
.do_load(&Ancestors::default(), key, Some(last_dead_slot))
.is_none());
}
// Each slot should only have one entry in the storage, since all other accounts were
// cleaned due to later updates
for slot in &slots {
if let ScanStorageResult::Stored(slot_accounts) = accounts_db.scan_account_storage(
*slot as Slot,
|_| Some(0),
|slot_accounts: &DashSet<Pubkey>, loaded_account: LoadedAccount| {
slot_accounts.insert(*loaded_account.pubkey());
},
) {
if *slot == alive_slot {
assert_eq!(slot_accounts.len(), keys.len());
} else {
assert!(slot_accounts.is_empty());
}
} else {
panic!("Expected slot to be in storage, not cache");
}
}
}
#[test]
fn test_accounts_db_cache_clean() {
let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(10);
// If no `max_clean_root` is specified, cleaning should purge all flushed slots
accounts_db.flush_accounts_cache(true, None);
assert_eq!(accounts_db.accounts_cache.num_slots(), 0);
let mut uncleaned_roots = accounts_db
.accounts_index
.clear_uncleaned_roots(None)
.into_iter()
.collect::<Vec<_>>();
uncleaned_roots.sort_unstable();
assert_eq!(uncleaned_roots, slots);
assert_eq!(
accounts_db.accounts_cache.fetch_max_flush_root(),
*slots.last().unwrap()
);
// Each slot should only have one entry in the storage, since all other accounts were
// cleaned due to later updates
for slot in &slots {
if let ScanStorageResult::Stored(slot_account) = accounts_db.scan_account_storage(
*slot as Slot,
|_| Some(0),
|slot_account: &Arc<RwLock<Pubkey>>, loaded_account: LoadedAccount| {
*slot_account.write().unwrap() = *loaded_account.pubkey();
},
) {
assert_eq!(*slot_account.read().unwrap(), keys[*slot as usize]);
} else {
panic!("Everything should have been flushed")
}
}
}
fn run_test_accounts_db_cache_clean_max_root(num_slots: usize, max_clean_root: Slot) {
assert!(max_clean_root < (num_slots as Slot));
let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(num_slots);
let is_cache_at_limit = num_slots - max_clean_root as usize - 1 > MAX_CACHE_SLOTS;
// If:
// 1) `max_clean_root` is specified,
// 2) not at the cache limit, i.e. `is_cache_at_limit == false`, then
// `flush_accounts_cache()` should clean and flushed only slots < max_clean_root,
accounts_db.flush_accounts_cache(true, Some(max_clean_root));
if !is_cache_at_limit {
// Should flush all slots between 0..=max_clean_root
assert_eq!(
accounts_db.accounts_cache.num_slots(),
slots.len() - max_clean_root as usize - 1
);
} else {
// Otherwise, if we are at the cache limit, all roots will be flushed
assert_eq!(accounts_db.accounts_cache.num_slots(), 0,);
}
let mut uncleaned_roots = accounts_db
.accounts_index
.clear_uncleaned_roots(None)
.into_iter()
.collect::<Vec<_>>();
uncleaned_roots.sort_unstable();
let expected_max_clean_root = if !is_cache_at_limit {
// Should flush all slots between 0..=max_clean_root
max_clean_root
} else {
// Otherwise, if we are at the cache limit, all roots will be flushed
num_slots as Slot - 1
};
assert_eq!(
uncleaned_roots,
slots[0..=expected_max_clean_root as usize].to_vec()
);
assert_eq!(
accounts_db.accounts_cache.fetch_max_flush_root(),
expected_max_clean_root,
);
// Updates from slots > max_clean_root should still be flushed to storage
for slot in &slots {
let slot_accounts = accounts_db.scan_account_storage(
*slot as Slot,
|loaded_account: LoadedAccount| {
if is_cache_at_limit {
panic!(
"When cache is at limit, all roots should have been flushed to storage"
);
}
assert!(*slot > max_clean_root);
Some(*loaded_account.pubkey())
},
|slot_accounts: &DashSet<Pubkey>, loaded_account: LoadedAccount| {
slot_accounts.insert(*loaded_account.pubkey());
if !is_cache_at_limit {
// Only true when the limit hasn't been reached and there are still
// slots left in the cache
assert!(*slot <= max_clean_root);
}
},
);
let slot_accounts = match slot_accounts {
ScanStorageResult::Cached(slot_accounts) => {
slot_accounts.into_iter().collect::<HashSet<Pubkey>>()
}
ScanStorageResult::Stored(slot_accounts) => {
slot_accounts.into_iter().collect::<HashSet<Pubkey>>()
}
};
if *slot >= max_clean_root {
// 1) If slot > `max_clean_root`, then either:
// a) If `is_cache_at_limit == true`, still in the cache
// b) if `is_cache_at_limit == false`, were not cleaned before being flushed to storage.
//
// In both cases all the *original* updates at index `slot` were uncleaned and thus
// should be discoverable by this scan.
//
// 2) If slot == `max_clean_root`, the slot was not cleaned before being flushed to storage,
// so it also contains all the original updates.
assert_eq!(
slot_accounts,
keys[*slot as usize..]
.iter()
.cloned()
.collect::<HashSet<Pubkey>>()
);
} else {
// Slots less than `max_clean_root` were cleaned in the cache before being flushed
// to storage, should only contain one account
assert_eq!(
slot_accounts,
std::iter::once(keys[*slot as usize])
.into_iter()
.collect::<HashSet<Pubkey>>()
);
}
}
}
#[test]
fn test_accounts_db_cache_clean_max_root() {
let max_clean_root = 5;
run_test_accounts_db_cache_clean_max_root(10, max_clean_root);
}
#[test]
fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit() {
let max_clean_root = 5;
// Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots
// will be flushed
run_test_accounts_db_cache_clean_max_root(
MAX_CACHE_SLOTS + max_clean_root as usize + 2,
max_clean_root,
);
}
fn run_flush_rooted_accounts_cache(should_clean: bool) {
let num_slots = 10;
let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(num_slots);
let mut cleaned_bytes = 0;
let mut cleaned_accounts = 0;
let should_clean_tracker = if should_clean {
Some((&mut cleaned_bytes, &mut cleaned_accounts))
} else {
None
};
// If no cleaning is specified, then flush everything
accounts_db.flush_rooted_accounts_cache(None, should_clean_tracker);
for slot in &slots {
let slot_accounts = if let ScanStorageResult::Stored(slot_accounts) = accounts_db
.scan_account_storage(
*slot as Slot,
|_| Some(0),
|slot_account: &DashSet<Pubkey>, loaded_account: LoadedAccount| {
slot_account.insert(*loaded_account.pubkey());
},
) {
slot_accounts.into_iter().collect::<HashSet<Pubkey>>()
} else {
panic!("All roots should have been flushed to storage");
};
if !should_clean || slot == slots.last().unwrap() {
// The slot was not cleaned before being flushed to storage,
// so it also contains all the original updates.
assert_eq!(
slot_accounts,
keys[*slot as usize..]
.iter()
.cloned()
.collect::<HashSet<Pubkey>>()
);
} else {
// If clean was specified, only the latest slot should have all the updates.
// All these other slots have been cleaned before flush
assert_eq!(
slot_accounts,
std::iter::once(keys[*slot as usize])
.into_iter()
.collect::<HashSet<Pubkey>>()
);
}
}
}
#[test]
fn test_flush_rooted_accounts_cache_with_clean() {
run_flush_rooted_accounts_cache(true);
}
#[test]
fn test_flush_rooted_accounts_cache_without_clean() {
run_flush_rooted_accounts_cache(false);
}
} }

View File

@ -956,6 +956,23 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
std::mem::replace(&mut w_roots_tracker.previous_uncleaned_roots, cleaned_roots) std::mem::replace(&mut w_roots_tracker.previous_uncleaned_roots, cleaned_roots)
} }
#[cfg(test)]
pub fn clear_uncleaned_roots(&self, max_clean_root: Option<Slot>) -> HashSet<Slot> {
let mut cleaned_roots = HashSet::new();
let mut w_roots_tracker = self.roots_tracker.write().unwrap();
w_roots_tracker.uncleaned_roots.retain(|root| {
let is_cleaned = max_clean_root
.map(|max_clean_root| *root <= max_clean_root)
.unwrap_or(true);
if is_cleaned {
cleaned_roots.insert(*root);
}
// Only keep the slots that have yet to be cleaned
!is_cleaned
});
cleaned_roots
}
pub fn is_uncleaned_root(&self, slot: Slot) -> bool { pub fn is_uncleaned_root(&self, slot: Slot) -> bool {
self.roots_tracker self.roots_tracker
.read() .read()

View File

@ -3904,14 +3904,17 @@ impl Bank {
} }
pub fn force_flush_accounts_cache(&self) { pub fn force_flush_accounts_cache(&self) {
self.rc.accounts.accounts_db.force_flush_accounts_cache() self.rc
.accounts
.accounts_db
.flush_accounts_cache(true, Some(self.slot()))
} }
pub fn flush_accounts_cache_if_needed(&self) { pub fn flush_accounts_cache_if_needed(&self) {
self.rc self.rc
.accounts .accounts
.accounts_db .accounts_db
.flush_accounts_cache_if_needed() .flush_accounts_cache(false, Some(self.slot()))
} }
fn store_account_and_update_capitalization(&self, pubkey: &Pubkey, new_account: &Account) { fn store_account_and_update_capitalization(&self, pubkey: &Pubkey, new_account: &Account) {
@ -10235,7 +10238,8 @@ pub(crate) mod tests {
fn get_shrink_account_size() -> usize { fn get_shrink_account_size() -> usize {
let (genesis_config, _mint_keypair) = create_genesis_config(1_000_000_000); let (genesis_config, _mint_keypair) = create_genesis_config(1_000_000_000);
// Set root for bank 0, with caching enabled // Set root for bank 0, with caching disabled so we can get the size
// of the storage for this slot
let mut bank0 = Arc::new(Bank::new_with_config( let mut bank0 = Arc::new(Bank::new_with_config(
&genesis_config, &genesis_config,
HashSet::new(), HashSet::new(),
@ -10273,12 +10277,16 @@ pub(crate) mod tests {
bank0.restore_old_behavior_for_fragile_tests(); bank0.restore_old_behavior_for_fragile_tests();
let pubkey0_size = get_shrink_account_size(); let pubkey0_size = get_shrink_account_size();
let account0 = Account::new(1000, pubkey0_size as usize, &Pubkey::new_unique()); let account0 = Account::new(1000, pubkey0_size as usize, &Pubkey::new_unique());
bank0.store_account(&pubkey0, &account0); bank0.store_account(&pubkey0, &account0);
goto_end_of_slot(Arc::<Bank>::get_mut(&mut bank0).unwrap()); goto_end_of_slot(Arc::<Bank>::get_mut(&mut bank0).unwrap());
bank0.freeze(); bank0.freeze();
bank0.squash(); bank0.squash();
// Flush now so that accounts cache cleaning doesn't clean up bank 0 when later
// slots add updates to the cache
bank0.force_flush_accounts_cache();
// Store some lamports in bank 1 // Store some lamports in bank 1
let some_lamports = 123; let some_lamports = 123;
@ -10286,6 +10294,11 @@ pub(crate) mod tests {
bank1.deposit(&pubkey1, some_lamports); bank1.deposit(&pubkey1, some_lamports);
bank1.deposit(&pubkey2, some_lamports); bank1.deposit(&pubkey2, some_lamports);
goto_end_of_slot(Arc::<Bank>::get_mut(&mut bank1).unwrap()); goto_end_of_slot(Arc::<Bank>::get_mut(&mut bank1).unwrap());
bank1.freeze();
bank1.squash();
// Flush now so that accounts cache cleaning doesn't clean up bank 0 when later
// slots add updates to the cache
bank1.force_flush_accounts_cache();
// Store some lamports for pubkey1 in bank 2, root bank 2 // Store some lamports for pubkey1 in bank 2, root bank 2
let mut bank2 = Arc::new(new_from_parent(&bank1)); let mut bank2 = Arc::new(new_from_parent(&bank1));
@ -11803,43 +11816,45 @@ pub(crate) mod tests {
#[test] #[test]
fn test_store_scan_consistency_root() { fn test_store_scan_consistency_root() {
test_store_scan_consistency( for accounts_db_caching_enabled in &[false, true] {
false, test_store_scan_consistency(
|bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| { *accounts_db_caching_enabled,
let mut current_bank = bank0.clone(); |bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| {
let mut prev_bank = bank0; let mut current_bank = bank0.clone();
loop { let mut prev_bank = bank0;
let lamports_this_round = current_bank.slot() + starting_lamports + 1; loop {
let account = Account::new(lamports_this_round, 0, &program_id); let lamports_this_round = current_bank.slot() + starting_lamports + 1;
for key in pubkeys_to_modify.iter() { let account = Account::new(lamports_this_round, 0, &program_id);
current_bank.store_account(key, &account); for key in pubkeys_to_modify.iter() {
current_bank.store_account(key, &account);
}
current_bank.freeze();
// Send the previous bank to the scan thread to perform the scan.
// Meanwhile this thread will squash and update roots immediately after
// so the roots will update while scanning.
//
// The capacity of the channel is 1 so that this thread will wait for the scan to finish before starting
// the next iteration, allowing the scan to stay in sync with these updates
// such that every scan will see this interruption.
if bank_to_scan_sender.send(prev_bank).is_err() {
// Channel was disconnected, exit
return;
}
current_bank.squash();
if current_bank.slot() % 2 == 0 {
current_bank.force_flush_accounts_cache();
current_bank.clean_accounts(true);
}
prev_bank = current_bank.clone();
current_bank = Arc::new(Bank::new_from_parent(
&current_bank,
&solana_sdk::pubkey::new_rand(),
current_bank.slot() + 1,
));
} }
current_bank.freeze(); },
// Send the previous bank to the scan thread to perform the scan. );
// Meanwhile this thread will squash and update roots immediately after }
// so the roots will update while scanning.
//
// The capacity of the channel is 1 so that this thread will wait for the scan to finish before starting
// the next iteration, allowing the scan to stay in sync with these updates
// such that every scan will see this interruption.
if bank_to_scan_sender.send(prev_bank).is_err() {
// Channel was disconnected, exit
return;
}
current_bank.freeze();
current_bank.squash();
current_bank.force_flush_accounts_cache();
current_bank.clean_accounts(true);
prev_bank = current_bank.clone();
current_bank = Arc::new(Bank::new_from_parent(
&current_bank,
&solana_sdk::pubkey::new_rand(),
current_bank.slot() + 1,
));
}
},
);
} }
#[test] #[test]