Remove unnecesary flushes in previous roots (#14596)

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
carllin 2021-01-23 04:02:44 -08:00 committed by GitHub
parent 170a3aec14
commit c77461e428
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 640 additions and 129 deletions

View File

@ -275,6 +275,10 @@ impl AccountsBackgroundService {
let snapshot_block_height =
request_handler.handle_snapshot_requests(accounts_db_caching_enabled);
if accounts_db_caching_enabled {
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.flush_accounts_cache_if_needed();
}
@ -300,6 +304,10 @@ impl AccountsBackgroundService {
> (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
{
if accounts_db_caching_enabled {
// Note that the flush will do an internal clean of the
// cache up to bank.slot(), so should be safe as long
// as any later snapshots that are taken are of
// slots >= bank.slot()
bank.force_flush_accounts_cache();
}
bank.clean_accounts(true);

View File

@ -1,7 +1,7 @@
use dashmap::DashMap;
use solana_sdk::{account::Account, clock::Slot, hash::Hash, pubkey::Pubkey};
use std::{
collections::HashSet,
collections::BTreeSet,
ops::Deref,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
@ -90,7 +90,7 @@ pub struct AccountsCache {
cache: DashMap<Slot, SlotCache>,
// Queue of potentially unflushed roots. Random eviction + cache too large
// could have triggered a flush of this slot already
maybe_unflushed_roots: RwLock<HashSet<Slot>>,
maybe_unflushed_roots: RwLock<BTreeSet<Slot>>,
max_flushed_root: AtomicU64,
}
@ -147,14 +147,25 @@ impl AccountsCache {
}
pub fn add_root(&self, root: Slot) {
self.maybe_unflushed_roots.write().unwrap().insert(root);
let max_flushed_root = self.fetch_max_flush_root();
if root > max_flushed_root || (root == max_flushed_root && root == 0) {
self.maybe_unflushed_roots.write().unwrap().insert(root);
}
}
pub fn clear_roots(&self) -> HashSet<Slot> {
std::mem::replace(
&mut self.maybe_unflushed_roots.write().unwrap(),
HashSet::new(),
)
pub fn clear_roots(&self, max_root: Option<Slot>) -> BTreeSet<Slot> {
let mut w_maybe_unflushed_roots = self.maybe_unflushed_roots.write().unwrap();
if let Some(max_root) = max_root {
// `greater_than_max_root` contains all slots >= `max_root + 1`, or alternatively,
// all slots > `max_root`. Meanwhile, `w_maybe_unflushed_roots` is left with all slots
// <= `max_root`.
let greater_than_max_root = w_maybe_unflushed_roots.split_off(&(max_root + 1));
// After the replace, `w_maybe_unflushed_roots` contains slots > `max_root`, and
// we return all slots <= `max_root`
std::mem::replace(&mut w_maybe_unflushed_roots, greater_than_max_root)
} else {
std::mem::replace(&mut *w_maybe_unflushed_roots, BTreeSet::new())
}
}
// Removes slots less than or equal to `max_root`. Only safe to pass in a rooted slot,

View File

@ -49,7 +49,7 @@ use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::{
borrow::Cow,
boxed::Box,
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
convert::{TryFrom, TryInto},
io::{Error as IOError, Result as IOResult},
ops::RangeBounds,
@ -931,23 +931,30 @@ impl AccountsDB {
reclaims
}
fn max_clean_root(&self, proposed_clean_root: Option<Slot>) -> Option<Slot> {
match (
self.accounts_index.min_ongoing_scan_root(),
proposed_clean_root,
) {
(None, None) => None,
(Some(min_scan_root), None) => Some(min_scan_root),
(None, Some(proposed_clean_root)) => Some(proposed_clean_root),
(Some(min_scan_root), Some(proposed_clean_root)) => {
Some(std::cmp::min(min_scan_root, proposed_clean_root))
}
}
}
// Purge zero lamport accounts and older rooted account states as garbage
// collection
// Only remove those accounts where the entire rooted history of the account
// can be purged because there are no live append vecs in the ancestors
pub fn clean_accounts(&self, max_clean_root: Option<Slot>) {
let max_clean_root = self.max_clean_root(max_clean_root);
// hold a lock to prevent slot shrinking from running because it might modify some rooted
// slot storages which can not happen as long as we're cleaning accounts because we're also
// modifying the rooted slot storages!
let max_clean_root = match (self.accounts_index.min_ongoing_scan_root(), max_clean_root) {
(None, None) => None,
(Some(min_scan_root), None) => Some(min_scan_root),
(None, Some(max_clean_root)) => Some(max_clean_root),
(Some(min_scan_root), Some(max_clean_root)) => {
Some(std::cmp::min(min_scan_root, max_clean_root))
}
};
let mut candidates_v1 = self.shrink_candidate_slots_v1.lock().unwrap();
self.report_store_stats();
@ -2408,21 +2415,37 @@ impl AccountsDB {
);
}
fn purge_slot_cache_keys(&self, dead_slot: Slot, slot_cache: SlotCache) {
// Slot purged from cache should not exist in the backing store
assert!(self.storage.get_slot_stores(dead_slot).is_none());
fn purge_slot_cache(&self, purged_slot: Slot, slot_cache: SlotCache) {
let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new();
let pubkey_to_slot_set: Vec<(Pubkey, Slot)> = slot_cache
.iter()
.map(|account| {
purged_slot_pubkeys.insert((dead_slot, *account.key()));
(*account.key(), dead_slot)
purged_slot_pubkeys.insert((purged_slot, *account.key()));
(*account.key(), purged_slot)
})
.collect();
self.purge_slot_cache_pubkeys(purged_slot, purged_slot_pubkeys, pubkey_to_slot_set, true);
}
fn purge_slot_cache_pubkeys(
&self,
purged_slot: Slot,
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
pubkey_to_slot_set: Vec<(Pubkey, Slot)>,
is_dead: bool,
) {
// Slot purged from cache should not exist in the backing store
assert!(self.storage.get_slot_stores(purged_slot).is_none());
let num_purged_keys = pubkey_to_slot_set.len();
let reclaims = self.purge_keys_exact(&pubkey_to_slot_set);
assert_eq!(reclaims.len(), num_purged_keys);
self.finalize_dead_slot_removal(std::iter::once(&dead_slot), purged_slot_pubkeys, None);
if is_dead {
self.finalize_dead_slot_removal(
std::iter::once(&purged_slot),
purged_slot_pubkeys,
None,
);
}
}
fn purge_slots(&self, slots: &HashSet<Slot>) {
@ -2440,8 +2463,10 @@ impl AccountsDB {
for remove_slot in non_roots {
if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) {
// If the slot is still in the cache, remove the backing storages for
// the slot and from the Accounts Index
self.purge_slot_cache_keys(*remove_slot, slot_cache);
// the slot. The accounts index cleaning (removing from the slot list,
// decrementing the account ref count), is handled in
// clean_accounts() -> purge_older_root_entries()
self.purge_slot_cache(*remove_slot, slot_cache);
} else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) {
// Because AccountsBackgroundService synchronously flushes from the accounts cache
// and handles all Bank::drop() (the cleanup function that leads to this
@ -2518,7 +2543,7 @@ impl AccountsDB {
if let Some(slot_cache) = self.accounts_cache.remove_slot(remove_slot) {
// If the slot is still in the cache, remove it from the cache
self.purge_slot_cache_keys(remove_slot, slot_cache);
self.purge_slot_cache(remove_slot, slot_cache);
}
// TODO: Handle if the slot was flushed to storage while we were removing the cached
@ -2839,32 +2864,29 @@ impl AccountsDB {
self.accounts_cache.report_size();
}
// Force flush the cached roots, flush any unrooted frozen slots as well if there are
// > MAX_CACHE_SLOTS of them.
pub fn force_flush_accounts_cache(&self) {
self.flush_accounts_cache(true);
}
// `force_flush` flushes all the cached roots `<= max_clean_root`. It also then
// flushes:
// 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache,
// 2) It there are still > MAX_CACHE_SLOTS remaining slots in the cache, the excess
// unrooted slots
pub fn flush_accounts_cache(&self, force_flush: bool, max_clean_root: Option<Slot>) {
#[cfg(not(test))]
assert!(max_clean_root.is_some());
pub fn flush_accounts_cache_if_needed(&self) {
self.flush_accounts_cache(false);
}
fn flush_accounts_cache(&self, force_flush: bool) {
if !force_flush && self.accounts_cache.num_slots() <= MAX_CACHE_SLOTS {
return;
}
// Flush all roots
// Flush only the roots <= max_clean_root, so that snapshotting has all
// the relevant roots in storage.
let mut flush_roots_elapsed = Measure::start("flush_roots_elapsed");
let cached_roots = self.accounts_cache.clear_roots();
for root in &cached_roots {
self.flush_slot_cache(*root);
self.accounts_cache.set_max_flush_root(*root);
}
// Only add to the uncleaned roots set *after* we've flushed the previous roots,
// so that clean will actually be able to clean the slots.
self.accounts_index.add_uncleaned_roots(cached_roots);
let mut account_bytes_saved = 0;
let mut num_accounts_saved = 0;
let (total_new_cleaned_roots, num_cleaned_roots_flushed) = self
.flush_rooted_accounts_cache(
max_clean_root,
Some((&mut account_bytes_saved, &mut num_accounts_saved)),
);
flush_roots_elapsed.stop();
// Note we don't purge unrooted slots here because there may be ongoing scans/references
@ -2872,27 +2894,49 @@ impl AccountsDB {
// banks
// If there are > MAX_CACHE_SLOTS, then flush the excess ones to storage
let (total_new_excess_roots, num_excess_roots_flushed) =
if self.accounts_cache.num_slots() > MAX_CACHE_SLOTS {
// Start by flushing the roots
//
// Cannot do any cleaning on roots past `max_clean_root` because future
// snapshots may need updates from those later slots, hence we pass `None`
// for `should_clean`.
self.flush_rooted_accounts_cache(None, None)
} else {
(0, 0)
};
let old_slots = self.accounts_cache.find_older_frozen_slots(MAX_CACHE_SLOTS);
let total_excess_slot_count = old_slots.len();
let excess_slot_count = old_slots.len();
let mut unflushable_unrooted_slot_count = 0;
let max_flushed_root = self.accounts_cache.fetch_max_flush_root();
for old_slot in old_slots {
// Don't flush slots that are known to be unrooted
if old_slot > max_flushed_root {
self.flush_slot_cache(old_slot);
self.flush_slot_cache(old_slot, None::<&mut fn(&_, &_) -> bool>);
} else {
unflushable_unrooted_slot_count += 1;
}
}
datapoint_info!(
"accounts_db-cache-limit-slots",
("total_excess_slot_count", total_excess_slot_count, i64),
"accounts_db-flush_accounts_cache",
("total_new_cleaned_roots", total_new_cleaned_roots, i64),
("num_cleaned_roots_flushed", num_cleaned_roots_flushed, i64),
("total_new_excess_roots", total_new_excess_roots, i64),
("num_excess_roots_flushed", num_excess_roots_flushed, i64),
("excess_slot_count", excess_slot_count, i64),
(
"unflushable_unrooted_slot_count",
unflushable_unrooted_slot_count,
i64
),
(
"flush_roots_elapsed",
flush_roots_elapsed.as_us() as i64,
i64
),
("account_bytes_saved", account_bytes_saved, i64),
("num_accounts_saved", num_accounts_saved, i64),
);
// Flush a random slot out after every force flush to catch any inconsistencies
@ -2910,58 +2954,158 @@ impl AccountsDB {
"Flushing random slot: {}, num_remaining: {}",
*rand_slot, num_slots_remaining
);
self.flush_slot_cache(*rand_slot);
self.flush_slot_cache(*rand_slot, None::<&mut fn(&_, &_) -> bool>);
}
}
inc_new_counter_info!("flush_roots_elapsed", flush_roots_elapsed.as_us() as usize);
}
fn flush_slot_cache(&self, slot: Slot) {
fn flush_rooted_accounts_cache(
&self,
mut max_flush_root: Option<Slot>,
should_clean: Option<(&mut usize, &mut usize)>,
) -> (usize, usize) {
if should_clean.is_some() {
max_flush_root = self.max_clean_root(max_flush_root);
}
// If there is a long running scan going on, this could prevent any cleaning
// past `max_flush_root`.
let cached_roots: BTreeSet<Slot> = self.accounts_cache.clear_roots(max_flush_root);
// Use HashMap because HashSet doesn't provide Entry api
let mut written_accounts = HashMap::new();
// If `should_clean` is None, then`should_flush_f` is also None, which will cause
// `flush_slot_cache` to flush all accounts to storage without cleaning any accounts.
let mut should_flush_f = should_clean.map(|(account_bytes_saved, num_accounts_saved)| {
move |&pubkey: &Pubkey, account: &Account| {
use std::collections::hash_map::Entry::{Occupied, Vacant};
let should_flush = match written_accounts.entry(pubkey) {
Vacant(vacant_entry) => {
vacant_entry.insert(());
true
}
Occupied(_occupied_entry) => {
*account_bytes_saved += account.data.len();
*num_accounts_saved += 1;
// If a later root already wrote this account, no point
// in flushing it
false
}
};
should_flush
}
});
// Iterate from highest to lowest so that we don't need to flush earlier
// outdated updates in earlier roots
let mut num_roots_flushed = 0;
for &root in cached_roots.iter().rev() {
if self.flush_slot_cache(root, should_flush_f.as_mut()) {
num_roots_flushed += 1;
}
// Regardless of whether this slot was *just* flushed from the cache by the above
// `flush_slot_cache()`, we should update the `max_flush_root`.
// This is because some rooted slots may be flushed to storage *before* they are marked as root.
// This can occur for instance when:
// 1) The cache is overwhelmed, we we flushed some yet to be rooted frozen slots
// 2) Random evictions
// These slots may then *later* be marked as root, so we still need to handle updating the
// `max_flush_root` in the accounts cache.
self.accounts_cache.set_max_flush_root(root);
}
// Only add to the uncleaned roots set *after* we've flushed the previous roots,
// so that clean will actually be able to clean the slots.
let num_new_roots = cached_roots.len();
self.accounts_index.add_uncleaned_roots(cached_roots);
(num_new_roots, num_roots_flushed)
}
// `should_flush_f` is an optional closure that determines wehther a given
// account should be flushed. Passing `None` will by default flush all
// accounts
fn flush_slot_cache(
&self,
slot: Slot,
mut should_flush_f: Option<&mut impl FnMut(&Pubkey, &Account) -> bool>,
) -> bool {
info!("flush_slot_cache slot: {}", slot);
if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) {
let slot_cache = self.accounts_cache.slot_cache(slot);
if let Some(slot_cache) = slot_cache {
let iter_items: Vec<_> = slot_cache.iter().collect();
let mut total_size = 0;
let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new();
let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![];
let (accounts, hashes): (Vec<(&Pubkey, &Account)>, Vec<Hash>) = iter_items
.iter()
.map(|iter_item| {
.filter_map(|iter_item| {
let key = iter_item.key();
let account = &iter_item.value().account;
let hash = iter_item.value().hash;
total_size += (account.data.len() + STORE_META_OVERHEAD) as u64;
((key, account), hash)
let should_flush = should_flush_f
.as_mut()
.map(|should_flush_f| should_flush_f(key, account))
.unwrap_or(true);
if should_flush {
let hash = iter_item.value().hash;
total_size += (account.data.len() + STORE_META_OVERHEAD) as u64;
Some(((key, account), hash))
} else {
// If we don't flush, we have to remove the entry from the
// index, since it's equivalent to purging
purged_slot_pubkeys.insert((slot, *key));
pubkey_to_slot_set.push((*key, slot));
None
}
})
.unzip();
let aligned_total_size = self.page_align(total_size);
// This ensures that all updates are written to an AppendVec, before any
// updates to the index happen, so anybody that sees a real entry in the index,
// will be able to find the account in storage
let flushed_store =
self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache");
self.store_accounts_custom(
let is_dead_slot = accounts.is_empty();
// Remove the account index entries from earlier roots that are outdated by later roots.
// Safe because queries to the index will be reading updates from later roots.
self.purge_slot_cache_pubkeys(
slot,
&accounts,
&hashes,
Some(Box::new(move |_, _| flushed_store.clone())),
None,
false,
);
// If the above sizing function is correct, just one AppendVec is enough to hold
// all the data for the slot
assert_eq!(
self.storage
.get_slot_stores(slot)
.unwrap()
.read()
.unwrap()
.len(),
1
purged_slot_pubkeys,
pubkey_to_slot_set,
is_dead_slot,
);
if !is_dead_slot {
let aligned_total_size = self.page_align(total_size);
// This ensures that all updates are written to an AppendVec, before any
// updates to the index happen, so anybody that sees a real entry in the index,
// will be able to find the account in storage
let flushed_store =
self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache");
self.store_accounts_custom(
slot,
&accounts,
&hashes,
Some(Box::new(move |_, _| flushed_store.clone())),
None,
false,
);
// If the above sizing function is correct, just one AppendVec is enough to hold
// all the data for the slot
assert_eq!(
self.storage
.get_slot_stores(slot)
.unwrap()
.read()
.unwrap()
.len(),
1
);
}
// Remove this slot from the cache, which will to AccountsDb readers should look like an
// atomic switch from the cache to storage
assert!(self.accounts_cache.remove_slot(slot).is_some());
true
} else {
false
}
}
@ -6959,7 +7103,7 @@ pub mod tests {
// No root was added yet, requires an ancestor to find
// the account
db.force_flush_accounts_cache();
db.flush_accounts_cache(true, None);
let ancestors = vec![(slot, 1)].into_iter().collect();
assert_eq!(
db.load_slow(&ancestors, &key),
@ -6968,7 +7112,7 @@ pub mod tests {
// Add root then flush
db.add_root(slot);
db.force_flush_accounts_cache();
db.flush_accounts_cache(true, None);
assert_eq!(db.load_slow(&HashMap::new(), &key), Some((account0, slot)));
}
@ -6999,7 +7143,7 @@ pub mod tests {
db.load_slow(&ancestors, &unrooted_key),
Some((account0.clone(), unrooted_slot))
);
db.force_flush_accounts_cache();
db.flush_accounts_cache(true, None);
// After the flush, the unrooted slot is still in the cache
assert!(db.load_slow(&ancestors, &unrooted_key).is_some());
@ -7048,7 +7192,7 @@ pub mod tests {
}
}
db.flush_accounts_cache_if_needed();
db.flush_accounts_cache(false, None);
let total_slots = num_roots + num_unrooted;
// If there's <= the max size, then nothing will be flushed from the slot
@ -7110,7 +7254,7 @@ pub mod tests {
// Flush, then clean again. Should not need another root to initiate the cleaning
// because `accounts_index.uncleaned_roots` should be correct
db.force_flush_accounts_cache();
db.flush_accounts_cache(true, None);
db.clean_accounts(None);
assert!(db
.do_load(&Ancestors::default(), &account_key, Some(0))
@ -7187,7 +7331,7 @@ pub mod tests {
db.add_root(2);
// Flush the cache, slot 1 should remain in the cache, everything else should be flushed
db.force_flush_accounts_cache();
db.flush_accounts_cache(true, None);
assert_eq!(db.accounts_cache.num_slots(), 1);
assert!(db.accounts_cache.slot_cache(1).is_some());
@ -7234,7 +7378,7 @@ pub mod tests {
}
accounts_db.add_root(slot);
accounts_db.force_flush_accounts_cache();
accounts_db.flush_accounts_cache(true, None);
let mut storage_maps: Vec<Arc<AccountStorageEntry>> = accounts_db
.storage
@ -7268,4 +7412,320 @@ pub mod tests {
assert_eq!(before_size, after_size + account.stored_size);
}
}
fn setup_accounts_db_cache_clean(num_slots: usize) -> (AccountsDB, Vec<Pubkey>, Vec<Slot>) {
let caching_enabled = true;
let accounts_db = AccountsDB::new_with_config(
Vec::new(),
&ClusterType::Development,
HashSet::new(),
caching_enabled,
);
let slots: Vec<_> = (0..num_slots as Slot).into_iter().collect();
let keys: Vec<Pubkey> = std::iter::repeat_with(Pubkey::new_unique)
.take(num_slots)
.collect();
// Store some subset of the keys in slots 0..num_slots
for slot in &slots {
for key in &keys[*slot as usize..] {
accounts_db.store_cached(*slot, &[(key, &Account::new(1, 0, &Pubkey::default()))]);
}
accounts_db.add_root(*slot as Slot);
}
// If there's <= MAX_CACHE_SLOTS, no slots should be flushed
if accounts_db.accounts_cache.num_slots() <= MAX_CACHE_SLOTS {
accounts_db.flush_accounts_cache(false, None);
assert_eq!(accounts_db.accounts_cache.num_slots(), num_slots);
}
(accounts_db, keys, slots)
}
#[test]
fn test_accounts_db_cache_clean_dead_slots() {
let num_slots = 10;
let (accounts_db, keys, mut slots) = setup_accounts_db_cache_clean(num_slots);
let last_dead_slot = (num_slots - 1) as Slot;
assert_eq!(*slots.last().unwrap(), last_dead_slot);
let alive_slot = last_dead_slot as Slot + 1;
slots.push(alive_slot);
for key in &keys {
// Store a slot that overwrites all previous keys, rendering all previous keys dead
accounts_db.store_cached(
alive_slot,
&[(key, &Account::new(1, 0, &Pubkey::default()))],
);
accounts_db.add_root(alive_slot);
}
// Before the flush, we can find entries in the database for slots < alive_slot if we specify
// a smaller max root
for key in &keys {
assert!(accounts_db
.do_load(&Ancestors::default(), key, Some(last_dead_slot))
.is_some());
}
// If no `max_clean_root` is specified, cleaning should purge all flushed slots
accounts_db.flush_accounts_cache(true, None);
assert_eq!(accounts_db.accounts_cache.num_slots(), 0);
let mut uncleaned_roots = accounts_db
.accounts_index
.clear_uncleaned_roots(None)
.into_iter()
.collect::<Vec<_>>();
uncleaned_roots.sort_unstable();
assert_eq!(uncleaned_roots, slots);
assert_eq!(
accounts_db.accounts_cache.fetch_max_flush_root(),
alive_slot,
);
// Specifying a max_root < alive_slot, should not return any more entries,
// as those have been purged from the accounts index for the dead slots.
for key in &keys {
assert!(accounts_db
.do_load(&Ancestors::default(), key, Some(last_dead_slot))
.is_none());
}
// Each slot should only have one entry in the storage, since all other accounts were
// cleaned due to later updates
for slot in &slots {
if let ScanStorageResult::Stored(slot_accounts) = accounts_db.scan_account_storage(
*slot as Slot,
|_| Some(0),
|slot_accounts: &DashSet<Pubkey>, loaded_account: LoadedAccount| {
slot_accounts.insert(*loaded_account.pubkey());
},
) {
if *slot == alive_slot {
assert_eq!(slot_accounts.len(), keys.len());
} else {
assert!(slot_accounts.is_empty());
}
} else {
panic!("Expected slot to be in storage, not cache");
}
}
}
#[test]
fn test_accounts_db_cache_clean() {
let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(10);
// If no `max_clean_root` is specified, cleaning should purge all flushed slots
accounts_db.flush_accounts_cache(true, None);
assert_eq!(accounts_db.accounts_cache.num_slots(), 0);
let mut uncleaned_roots = accounts_db
.accounts_index
.clear_uncleaned_roots(None)
.into_iter()
.collect::<Vec<_>>();
uncleaned_roots.sort_unstable();
assert_eq!(uncleaned_roots, slots);
assert_eq!(
accounts_db.accounts_cache.fetch_max_flush_root(),
*slots.last().unwrap()
);
// Each slot should only have one entry in the storage, since all other accounts were
// cleaned due to later updates
for slot in &slots {
if let ScanStorageResult::Stored(slot_account) = accounts_db.scan_account_storage(
*slot as Slot,
|_| Some(0),
|slot_account: &Arc<RwLock<Pubkey>>, loaded_account: LoadedAccount| {
*slot_account.write().unwrap() = *loaded_account.pubkey();
},
) {
assert_eq!(*slot_account.read().unwrap(), keys[*slot as usize]);
} else {
panic!("Everything should have been flushed")
}
}
}
fn run_test_accounts_db_cache_clean_max_root(num_slots: usize, max_clean_root: Slot) {
assert!(max_clean_root < (num_slots as Slot));
let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(num_slots);
let is_cache_at_limit = num_slots - max_clean_root as usize - 1 > MAX_CACHE_SLOTS;
// If:
// 1) `max_clean_root` is specified,
// 2) not at the cache limit, i.e. `is_cache_at_limit == false`, then
// `flush_accounts_cache()` should clean and flushed only slots < max_clean_root,
accounts_db.flush_accounts_cache(true, Some(max_clean_root));
if !is_cache_at_limit {
// Should flush all slots between 0..=max_clean_root
assert_eq!(
accounts_db.accounts_cache.num_slots(),
slots.len() - max_clean_root as usize - 1
);
} else {
// Otherwise, if we are at the cache limit, all roots will be flushed
assert_eq!(accounts_db.accounts_cache.num_slots(), 0,);
}
let mut uncleaned_roots = accounts_db
.accounts_index
.clear_uncleaned_roots(None)
.into_iter()
.collect::<Vec<_>>();
uncleaned_roots.sort_unstable();
let expected_max_clean_root = if !is_cache_at_limit {
// Should flush all slots between 0..=max_clean_root
max_clean_root
} else {
// Otherwise, if we are at the cache limit, all roots will be flushed
num_slots as Slot - 1
};
assert_eq!(
uncleaned_roots,
slots[0..=expected_max_clean_root as usize].to_vec()
);
assert_eq!(
accounts_db.accounts_cache.fetch_max_flush_root(),
expected_max_clean_root,
);
// Updates from slots > max_clean_root should still be flushed to storage
for slot in &slots {
let slot_accounts = accounts_db.scan_account_storage(
*slot as Slot,
|loaded_account: LoadedAccount| {
if is_cache_at_limit {
panic!(
"When cache is at limit, all roots should have been flushed to storage"
);
}
assert!(*slot > max_clean_root);
Some(*loaded_account.pubkey())
},
|slot_accounts: &DashSet<Pubkey>, loaded_account: LoadedAccount| {
slot_accounts.insert(*loaded_account.pubkey());
if !is_cache_at_limit {
// Only true when the limit hasn't been reached and there are still
// slots left in the cache
assert!(*slot <= max_clean_root);
}
},
);
let slot_accounts = match slot_accounts {
ScanStorageResult::Cached(slot_accounts) => {
slot_accounts.into_iter().collect::<HashSet<Pubkey>>()
}
ScanStorageResult::Stored(slot_accounts) => {
slot_accounts.into_iter().collect::<HashSet<Pubkey>>()
}
};
if *slot >= max_clean_root {
// 1) If slot > `max_clean_root`, then either:
// a) If `is_cache_at_limit == true`, still in the cache
// b) if `is_cache_at_limit == false`, were not cleaned before being flushed to storage.
//
// In both cases all the *original* updates at index `slot` were uncleaned and thus
// should be discoverable by this scan.
//
// 2) If slot == `max_clean_root`, the slot was not cleaned before being flushed to storage,
// so it also contains all the original updates.
assert_eq!(
slot_accounts,
keys[*slot as usize..]
.iter()
.cloned()
.collect::<HashSet<Pubkey>>()
);
} else {
// Slots less than `max_clean_root` were cleaned in the cache before being flushed
// to storage, should only contain one account
assert_eq!(
slot_accounts,
std::iter::once(keys[*slot as usize])
.into_iter()
.collect::<HashSet<Pubkey>>()
);
}
}
}
#[test]
fn test_accounts_db_cache_clean_max_root() {
let max_clean_root = 5;
run_test_accounts_db_cache_clean_max_root(10, max_clean_root);
}
#[test]
fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit() {
let max_clean_root = 5;
// Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots
// will be flushed
run_test_accounts_db_cache_clean_max_root(
MAX_CACHE_SLOTS + max_clean_root as usize + 2,
max_clean_root,
);
}
fn run_flush_rooted_accounts_cache(should_clean: bool) {
let num_slots = 10;
let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(num_slots);
let mut cleaned_bytes = 0;
let mut cleaned_accounts = 0;
let should_clean_tracker = if should_clean {
Some((&mut cleaned_bytes, &mut cleaned_accounts))
} else {
None
};
// If no cleaning is specified, then flush everything
accounts_db.flush_rooted_accounts_cache(None, should_clean_tracker);
for slot in &slots {
let slot_accounts = if let ScanStorageResult::Stored(slot_accounts) = accounts_db
.scan_account_storage(
*slot as Slot,
|_| Some(0),
|slot_account: &DashSet<Pubkey>, loaded_account: LoadedAccount| {
slot_account.insert(*loaded_account.pubkey());
},
) {
slot_accounts.into_iter().collect::<HashSet<Pubkey>>()
} else {
panic!("All roots should have been flushed to storage");
};
if !should_clean || slot == slots.last().unwrap() {
// The slot was not cleaned before being flushed to storage,
// so it also contains all the original updates.
assert_eq!(
slot_accounts,
keys[*slot as usize..]
.iter()
.cloned()
.collect::<HashSet<Pubkey>>()
);
} else {
// If clean was specified, only the latest slot should have all the updates.
// All these other slots have been cleaned before flush
assert_eq!(
slot_accounts,
std::iter::once(keys[*slot as usize])
.into_iter()
.collect::<HashSet<Pubkey>>()
);
}
}
}
#[test]
fn test_flush_rooted_accounts_cache_with_clean() {
run_flush_rooted_accounts_cache(true);
}
#[test]
fn test_flush_rooted_accounts_cache_without_clean() {
run_flush_rooted_accounts_cache(false);
}
}

View File

@ -956,6 +956,23 @@ impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
std::mem::replace(&mut w_roots_tracker.previous_uncleaned_roots, cleaned_roots)
}
#[cfg(test)]
pub fn clear_uncleaned_roots(&self, max_clean_root: Option<Slot>) -> HashSet<Slot> {
let mut cleaned_roots = HashSet::new();
let mut w_roots_tracker = self.roots_tracker.write().unwrap();
w_roots_tracker.uncleaned_roots.retain(|root| {
let is_cleaned = max_clean_root
.map(|max_clean_root| *root <= max_clean_root)
.unwrap_or(true);
if is_cleaned {
cleaned_roots.insert(*root);
}
// Only keep the slots that have yet to be cleaned
!is_cleaned
});
cleaned_roots
}
pub fn is_uncleaned_root(&self, slot: Slot) -> bool {
self.roots_tracker
.read()

View File

@ -3909,14 +3909,17 @@ impl Bank {
}
pub fn force_flush_accounts_cache(&self) {
self.rc.accounts.accounts_db.force_flush_accounts_cache()
self.rc
.accounts
.accounts_db
.flush_accounts_cache(true, Some(self.slot()))
}
pub fn flush_accounts_cache_if_needed(&self) {
self.rc
.accounts
.accounts_db
.flush_accounts_cache_if_needed()
.flush_accounts_cache(false, Some(self.slot()))
}
fn store_account_and_update_capitalization(&self, pubkey: &Pubkey, new_account: &Account) {
@ -10241,7 +10244,8 @@ pub(crate) mod tests {
fn get_shrink_account_size() -> usize {
let (genesis_config, _mint_keypair) = create_genesis_config(1_000_000_000);
// Set root for bank 0, with caching enabled
// Set root for bank 0, with caching disabled so we can get the size
// of the storage for this slot
let mut bank0 = Arc::new(Bank::new_with_config(
&genesis_config,
HashSet::new(),
@ -10279,12 +10283,16 @@ pub(crate) mod tests {
bank0.restore_old_behavior_for_fragile_tests();
let pubkey0_size = get_shrink_account_size();
let account0 = Account::new(1000, pubkey0_size as usize, &Pubkey::new_unique());
bank0.store_account(&pubkey0, &account0);
goto_end_of_slot(Arc::<Bank>::get_mut(&mut bank0).unwrap());
bank0.freeze();
bank0.squash();
// Flush now so that accounts cache cleaning doesn't clean up bank 0 when later
// slots add updates to the cache
bank0.force_flush_accounts_cache();
// Store some lamports in bank 1
let some_lamports = 123;
@ -10292,6 +10300,11 @@ pub(crate) mod tests {
bank1.deposit(&pubkey1, some_lamports);
bank1.deposit(&pubkey2, some_lamports);
goto_end_of_slot(Arc::<Bank>::get_mut(&mut bank1).unwrap());
bank1.freeze();
bank1.squash();
// Flush now so that accounts cache cleaning doesn't clean up bank 0 when later
// slots add updates to the cache
bank1.force_flush_accounts_cache();
// Store some lamports for pubkey1 in bank 2, root bank 2
let mut bank2 = Arc::new(new_from_parent(&bank1));
@ -11809,43 +11822,45 @@ pub(crate) mod tests {
#[test]
fn test_store_scan_consistency_root() {
test_store_scan_consistency(
false,
|bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| {
let mut current_bank = bank0.clone();
let mut prev_bank = bank0;
loop {
let lamports_this_round = current_bank.slot() + starting_lamports + 1;
let account = Account::new(lamports_this_round, 0, &program_id);
for key in pubkeys_to_modify.iter() {
current_bank.store_account(key, &account);
for accounts_db_caching_enabled in &[false, true] {
test_store_scan_consistency(
*accounts_db_caching_enabled,
|bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| {
let mut current_bank = bank0.clone();
let mut prev_bank = bank0;
loop {
let lamports_this_round = current_bank.slot() + starting_lamports + 1;
let account = Account::new(lamports_this_round, 0, &program_id);
for key in pubkeys_to_modify.iter() {
current_bank.store_account(key, &account);
}
current_bank.freeze();
// Send the previous bank to the scan thread to perform the scan.
// Meanwhile this thread will squash and update roots immediately after
// so the roots will update while scanning.
//
// The capacity of the channel is 1 so that this thread will wait for the scan to finish before starting
// the next iteration, allowing the scan to stay in sync with these updates
// such that every scan will see this interruption.
if bank_to_scan_sender.send(prev_bank).is_err() {
// Channel was disconnected, exit
return;
}
current_bank.squash();
if current_bank.slot() % 2 == 0 {
current_bank.force_flush_accounts_cache();
current_bank.clean_accounts(true);
}
prev_bank = current_bank.clone();
current_bank = Arc::new(Bank::new_from_parent(
&current_bank,
&solana_sdk::pubkey::new_rand(),
current_bank.slot() + 1,
));
}
current_bank.freeze();
// Send the previous bank to the scan thread to perform the scan.
// Meanwhile this thread will squash and update roots immediately after
// so the roots will update while scanning.
//
// The capacity of the channel is 1 so that this thread will wait for the scan to finish before starting
// the next iteration, allowing the scan to stay in sync with these updates
// such that every scan will see this interruption.
if bank_to_scan_sender.send(prev_bank).is_err() {
// Channel was disconnected, exit
return;
}
current_bank.freeze();
current_bank.squash();
current_bank.force_flush_accounts_cache();
current_bank.clean_accounts(true);
prev_bank = current_bank.clone();
current_bank = Arc::new(Bank::new_from_parent(
&current_bank,
&solana_sdk::pubkey::new_rand(),
current_bank.slot() + 1,
));
}
},
);
},
);
}
}
#[test]