Snapshots missing slots from accounts cache clean optimization (#14852) (#14878)

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
mergify[bot]
2021-01-26 22:20:07 -08:00
committed by GitHub
parent 676da0a836
commit c276670a94
2 changed files with 245 additions and 111 deletions

View File

@ -100,8 +100,23 @@ impl SnapshotRequestHandler {
let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
if accounts_db_caching_enabled {
// Force flush all the roots from the cache so that the snapshot can be taken.
// Forced cache flushing MUST flush all roots <= snapshot_root_bank.slot().
// That's because `snapshot_root_bank.slot()` must be root at this point,
// and contains relevant updates because each bank has at least 1 account update due
// to sysvar maintenance. Otherwise, this would cause missing storages in the snapshot
snapshot_root_bank.force_flush_accounts_cache();
// Ensure all roots <= `self.slot()` have been flushed.
// Note `max_flush_root` could be larger than self.slot() if there are
// `> MAX_CACHE_SLOT` cached and rooted slots which triggered earlier flushes.
assert!(
snapshot_root_bank.slot()
<= snapshot_root_bank
.rc
.accounts
.accounts_db
.accounts_cache
.fetch_max_flush_root()
);
}
flush_accounts_cache_time.stop();

View File

@ -2996,27 +2996,31 @@ impl AccountsDB {
self.accounts_cache.report_size();
}
// `force_flush` flushes all the cached roots `<= max_clean_root`. It also then
// `force_flush` flushes all the cached roots `<= requested_flush_root`. It also then
// flushes:
// 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache,
// 2) It there are still > MAX_CACHE_SLOTS remaining slots in the cache, the excess
// unrooted slots
pub fn flush_accounts_cache(&self, force_flush: bool, max_clean_root: Option<Slot>) {
pub fn flush_accounts_cache(&self, force_flush: bool, requested_flush_root: Option<Slot>) {
#[cfg(not(test))]
assert!(max_clean_root.is_some());
assert!(requested_flush_root.is_some());
if !force_flush && self.accounts_cache.num_slots() <= MAX_CACHE_SLOTS {
return;
}
// Flush only the roots <= max_clean_root, so that snapshotting has all
// Flush only the roots <= requested_flush_root, so that snapshotting has all
// the relevant roots in storage.
let mut flush_roots_elapsed = Measure::start("flush_roots_elapsed");
let mut account_bytes_saved = 0;
let mut num_accounts_saved = 0;
// Note even if force_flush is false, we will still flush all roots <= the
// given `requested_flush_root`, even if some of the later roots cannot be used for
// cleaning due to an ongoing scan
let (total_new_cleaned_roots, num_cleaned_roots_flushed) = self
.flush_rooted_accounts_cache(
max_clean_root,
requested_flush_root,
Some((&mut account_bytes_saved, &mut num_accounts_saved)),
);
flush_roots_elapsed.stop();
@ -3030,7 +3034,7 @@ impl AccountsDB {
if self.accounts_cache.num_slots() > MAX_CACHE_SLOTS {
// Start by flushing the roots
//
// Cannot do any cleaning on roots past `max_clean_root` because future
// Cannot do any cleaning on roots past `requested_flush_root` because future
// snapshots may need updates from those later slots, hence we pass `None`
// for `should_clean`.
self.flush_rooted_accounts_cache(None, None)
@ -3093,16 +3097,14 @@ impl AccountsDB {
fn flush_rooted_accounts_cache(
&self,
mut max_flush_root: Option<Slot>,
requested_flush_root: Option<Slot>,
should_clean: Option<(&mut usize, &mut usize)>,
) -> (usize, usize) {
if should_clean.is_some() {
max_flush_root = self.max_clean_root(max_flush_root);
}
// If there is a long running scan going on, this could prevent any cleaning
// past `max_flush_root`.
let cached_roots: BTreeSet<Slot> = self.accounts_cache.clear_roots(max_flush_root);
let max_clean_root = should_clean.as_ref().and_then(|_| {
// If there is a long running scan going on, this could prevent any cleaning
// based on updates from slots > `max_clean_root`.
self.max_clean_root(requested_flush_root)
});
// Use HashMap because HashSet doesn't provide Entry api
let mut written_accounts = HashMap::new();
@ -3112,7 +3114,6 @@ impl AccountsDB {
let mut should_flush_f = should_clean.map(|(account_bytes_saved, num_accounts_saved)| {
move |&pubkey: &Pubkey, account: &Account| {
use std::collections::hash_map::Entry::{Occupied, Vacant};
let should_flush = match written_accounts.entry(pubkey) {
Vacant(vacant_entry) => {
vacant_entry.insert(());
@ -3130,11 +3131,27 @@ impl AccountsDB {
}
});
// Always flush up to `requested_flush_root`, which is necessary for things like snapshotting.
let cached_roots: BTreeSet<Slot> = self.accounts_cache.clear_roots(requested_flush_root);
// Iterate from highest to lowest so that we don't need to flush earlier
// outdated updates in earlier roots
let mut num_roots_flushed = 0;
for &root in cached_roots.iter().rev() {
if self.flush_slot_cache(root, should_flush_f.as_mut()) {
let should_flush_f = if let Some(max_clean_root) = max_clean_root {
if root > max_clean_root {
// Only if the root is greater than the `max_clean_root` do we
// have to prevent cleaning, otherwise, just default to `should_flush_f`
// for any slots <= `max_clean_root`
None
} else {
should_flush_f.as_mut()
}
} else {
should_flush_f.as_mut()
};
if self.flush_slot_cache(root, should_flush_f) {
num_roots_flushed += 1;
}
@ -4390,7 +4407,7 @@ pub mod tests {
use std::{
iter::FromIterator,
str::FromStr,
thread::{sleep, Builder},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
};
@ -7431,54 +7448,37 @@ pub mod tests {
.is_none());
}
#[test]
fn test_scan_flush_accounts_cache_then_clean_drop() {
let caching_enabled = true;
let db = Arc::new(AccountsDB::new_with_config(
Vec::new(),
&ClusterType::Development,
HashSet::new(),
caching_enabled,
));
let db_ = db.clone();
let account_key = Pubkey::new_unique();
let account_key2 = Pubkey::new_unique();
let zero_lamport_account = Account::new(0, 0, &Account::default().owner);
let slot1_account = Account::new(1, 1, &Account::default().owner);
let slot2_account = Account::new(2, 1, &Account::default().owner);
struct ScanTracker {
t_scan: JoinHandle<()>,
exit: Arc<AtomicBool>,
}
impl ScanTracker {
fn exit(self) -> thread::Result<()> {
self.exit.store(true, Ordering::Relaxed);
self.t_scan.join()
}
}
fn setup_scan(
db: Arc<AccountsDB>,
scan_ancestors: Arc<Ancestors>,
stall_key: Pubkey,
) -> ScanTracker {
let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone();
let ready = Arc::new(AtomicBool::new(false));
let ready_ = ready.clone();
/*
Store zero lamport account into slots 0, 1, 2 where
root slots are 0, 2, and slot 1 is unrooted.
0 (root)
/ \
1 2 (root)
*/
db.store_cached(0, &[(&account_key, &zero_lamport_account)]);
db.store_cached(1, &[(&account_key, &slot1_account)]);
db.store_cached(2, &[(&account_key, &slot2_account)]);
// Fodder for the scan so that the lock on `account_key` is not held
db.store_cached(2, &[(&account_key2, &slot2_account)]);
db.get_accounts_delta_hash(0);
db.add_root(0);
let max_scan_root = 0;
let scan_ancestors: Arc<Ancestors> = Arc::new(vec![(0, 1), (1, 1)].into_iter().collect());
let scan_ancestors_ = scan_ancestors.clone();
let t_scan = Builder::new()
.name("scan".to_string())
.spawn(move || {
db_.scan_accounts(
&scan_ancestors_,
db.scan_accounts(
&scan_ancestors,
|_collector: &mut Vec<(Pubkey, Account)>, maybe_account| {
ready_.store(true, Ordering::Relaxed);
if let Some((pubkey, _, _)) = maybe_account {
// Do the wait on account_key2, because clean is happening
// on account_key1's index and we don't want to block the clean.
if *pubkey == account_key2 {
if *pubkey == stall_key {
loop {
if exit_.load(Ordering::Relaxed) {
break;
@ -7498,15 +7498,68 @@ pub mod tests {
sleep(Duration::from_millis(10));
}
// Add a new root 2
db.get_accounts_delta_hash(2);
db.add_root(2);
ScanTracker { t_scan, exit }
}
// Flush the cache, slot 1 should remain in the cache, everything else should be flushed
db.flush_accounts_cache(true, None);
#[test]
fn test_scan_flush_accounts_cache_then_clean_drop() {
let caching_enabled = true;
let db = Arc::new(AccountsDB::new_with_config(
Vec::new(),
&ClusterType::Development,
HashSet::new(),
caching_enabled,
));
let account_key = Pubkey::new_unique();
let account_key2 = Pubkey::new_unique();
let zero_lamport_account = Account::new(0, 0, &Account::default().owner);
let slot1_account = Account::new(1, 1, &Account::default().owner);
let slot2_account = Account::new(2, 1, &Account::default().owner);
/*
Store zero lamport account into slots 0, 1, 2 where
root slots are 0, 2, and slot 1 is unrooted.
0 (root)
/ \
1 2 (root)
*/
db.store_cached(0, &[(&account_key, &zero_lamport_account)]);
db.store_cached(1, &[(&account_key, &slot1_account)]);
// Fodder for the scan so that the lock on `account_key` is not held
db.store_cached(1, &[(&account_key2, &slot1_account)]);
db.store_cached(2, &[(&account_key, &slot2_account)]);
db.get_accounts_delta_hash(0);
let max_scan_root = 0;
db.add_root(max_scan_root);
let scan_ancestors: Arc<Ancestors> = Arc::new(vec![(0, 1), (1, 1)].into_iter().collect());
let scan_tracker = setup_scan(db.clone(), scan_ancestors.clone(), account_key2);
// Add a new root 2
let new_root = 2;
db.get_accounts_delta_hash(new_root);
db.add_root(new_root);
// Check that the scan is properly set up
assert_eq!(
db.accounts_index.min_ongoing_scan_root().unwrap(),
max_scan_root
);
// If we specify a requested_flush_root == 2, then `slot 2 <= max_flush_slot` will
// be flushed even though `slot 2 > max_scan_root`. The unrooted slot 1 should
// remain in the cache
db.flush_accounts_cache(true, Some(new_root));
assert_eq!(db.accounts_cache.num_slots(), 1);
assert!(db.accounts_cache.slot_cache(1).is_some());
// Intra cache cleaning should not clean the entry for `account_key` from slot 0,
// even though it was updated in slot `2` because of the ongoing scan
let account = db
.do_load(&Ancestors::default(), &account_key, Some(0))
.unwrap();
assert_eq!(account.0.lamports, zero_lamport_account.lamports);
// Run clean, unrooted slot 1 should not be purged, and still readable from the cache,
// because we're still doing a scan on it.
db.clean_accounts(None);
@ -7517,8 +7570,7 @@ pub mod tests {
// When the scan is over, clean should not panic and should not purge something
// still in the cache.
exit.store(true, Ordering::Relaxed);
t_scan.join().unwrap();
scan_tracker.exit().unwrap();
db.clean_accounts(None);
let account = db
.do_load(&scan_ancestors, &account_key, Some(max_scan_root))
@ -7585,40 +7637,63 @@ pub mod tests {
}
}
fn setup_accounts_db_cache_clean(num_slots: usize) -> (AccountsDB, Vec<Pubkey>, Vec<Slot>) {
fn setup_accounts_db_cache_clean(
num_slots: usize,
scan_slot: Option<Slot>,
) -> (Arc<AccountsDB>, Vec<Pubkey>, Vec<Slot>, Option<ScanTracker>) {
let caching_enabled = true;
let accounts_db = AccountsDB::new_with_config(
let accounts_db = Arc::new(AccountsDB::new_with_config(
Vec::new(),
&ClusterType::Development,
HashSet::new(),
caching_enabled,
);
));
let slots: Vec<_> = (0..num_slots as Slot).into_iter().collect();
let stall_slot = num_slots as Slot;
let scan_stall_key = Pubkey::new_unique();
let keys: Vec<Pubkey> = std::iter::repeat_with(Pubkey::new_unique)
.take(num_slots)
.collect();
if scan_slot.is_some() {
accounts_db.store_cached(
// Store it in a slot that isn't returned in `slots`
stall_slot,
&[(&scan_stall_key, &Account::new(1, 0, &Pubkey::default()))],
);
}
// Store some subset of the keys in slots 0..num_slots
let mut scan_tracker = None;
for slot in &slots {
for key in &keys[*slot as usize..] {
accounts_db.store_cached(*slot, &[(key, &Account::new(1, 0, &Pubkey::default()))]);
}
accounts_db.add_root(*slot as Slot);
if Some(*slot) == scan_slot {
let ancestors = Arc::new(vec![(stall_slot, 1), (*slot, 1)].into_iter().collect());
scan_tracker = Some(setup_scan(accounts_db.clone(), ancestors, scan_stall_key));
assert_eq!(
accounts_db.accounts_index.min_ongoing_scan_root().unwrap(),
*slot
);
}
}
accounts_db.accounts_cache.remove_slot(stall_slot);
// If there's <= MAX_CACHE_SLOTS, no slots should be flushed
if accounts_db.accounts_cache.num_slots() <= MAX_CACHE_SLOTS {
accounts_db.flush_accounts_cache(false, None);
assert_eq!(accounts_db.accounts_cache.num_slots(), num_slots);
}
(accounts_db, keys, slots)
(accounts_db, keys, slots, scan_tracker)
}
#[test]
fn test_accounts_db_cache_clean_dead_slots() {
let num_slots = 10;
let (accounts_db, keys, mut slots) = setup_accounts_db_cache_clean(num_slots);
let (accounts_db, keys, mut slots, _) = setup_accounts_db_cache_clean(num_slots, None);
let last_dead_slot = (num_slots - 1) as Slot;
assert_eq!(*slots.last().unwrap(), last_dead_slot);
let alive_slot = last_dead_slot as Slot + 1;
@ -7685,7 +7760,7 @@ pub mod tests {
#[test]
fn test_accounts_db_cache_clean() {
let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(10);
let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(10, None);
// If no `max_clean_root` is specified, cleaning should purge all flushed slots
accounts_db.flush_accounts_cache(true, None);
@ -7719,21 +7794,27 @@ pub mod tests {
}
}
fn run_test_accounts_db_cache_clean_max_root(num_slots: usize, max_clean_root: Slot) {
assert!(max_clean_root < (num_slots as Slot));
let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(num_slots);
let is_cache_at_limit = num_slots - max_clean_root as usize - 1 > MAX_CACHE_SLOTS;
fn run_test_accounts_db_cache_clean_max_root(
num_slots: usize,
requested_flush_root: Slot,
scan_root: Option<Slot>,
) {
assert!(requested_flush_root < (num_slots as Slot));
let (accounts_db, keys, slots, scan_tracker) =
setup_accounts_db_cache_clean(num_slots, scan_root);
let is_cache_at_limit = num_slots - requested_flush_root as usize - 1 > MAX_CACHE_SLOTS;
// If:
// 1) `max_clean_root` is specified,
// 1) `requested_flush_root` is specified,
// 2) not at the cache limit, i.e. `is_cache_at_limit == false`, then
// `flush_accounts_cache()` should clean and flushed only slots < max_clean_root,
accounts_db.flush_accounts_cache(true, Some(max_clean_root));
// `flush_accounts_cache()` should clean and flush only slots <= requested_flush_root,
accounts_db.flush_accounts_cache(true, Some(requested_flush_root));
if !is_cache_at_limit {
// Should flush all slots between 0..=max_clean_root
// Should flush all slots between 0..=requested_flush_root
assert_eq!(
accounts_db.accounts_cache.num_slots(),
slots.len() - max_clean_root as usize - 1
slots.len() - requested_flush_root as usize - 1
);
} else {
// Otherwise, if we are at the cache limit, all roots will be flushed
@ -7747,9 +7828,9 @@ pub mod tests {
.collect::<Vec<_>>();
uncleaned_roots.sort_unstable();
let expected_max_clean_root = if !is_cache_at_limit {
// Should flush all slots between 0..=max_clean_root
max_clean_root
let expected_max_flushed_root = if !is_cache_at_limit {
// Should flush all slots between 0..=requested_flush_root
requested_flush_root
} else {
// Otherwise, if we are at the cache limit, all roots will be flushed
num_slots as Slot - 1
@ -7757,14 +7838,13 @@ pub mod tests {
assert_eq!(
uncleaned_roots,
slots[0..=expected_max_clean_root as usize].to_vec()
slots[0..=expected_max_flushed_root as usize].to_vec()
);
assert_eq!(
accounts_db.accounts_cache.fetch_max_flush_root(),
expected_max_clean_root,
expected_max_flushed_root,
);
// Updates from slots > max_clean_root should still be flushed to storage
for slot in &slots {
let slot_accounts = accounts_db.scan_account_storage(
*slot as Slot,
@ -7774,7 +7854,9 @@ pub mod tests {
"When cache is at limit, all roots should have been flushed to storage"
);
}
assert!(*slot > max_clean_root);
// All slots <= requested_flush_root should have been flushed, regardless
// of ongoing scans
assert!(*slot > requested_flush_root);
Some(*loaded_account.pubkey())
},
|slot_accounts: &DashSet<Pubkey>, loaded_account: LoadedAccount| {
@ -7782,7 +7864,7 @@ pub mod tests {
if !is_cache_at_limit {
// Only true when the limit hasn't been reached and there are still
// slots left in the cache
assert!(*slot <= max_clean_root);
assert!(*slot <= requested_flush_root);
}
},
);
@ -7795,56 +7877,93 @@ pub mod tests {
slot_accounts.into_iter().collect::<HashSet<Pubkey>>()
}
};
if *slot >= max_clean_root {
// 1) If slot > `max_clean_root`, then either:
// a) If `is_cache_at_limit == true`, still in the cache
// b) if `is_cache_at_limit == false`, were not cleaned before being flushed to storage.
//
// In both cases all the *original* updates at index `slot` were uncleaned and thus
// should be discoverable by this scan.
//
// 2) If slot == `max_clean_root`, the slot was not cleaned before being flushed to storage,
// so it also contains all the original updates.
assert_eq!(
slot_accounts,
let expected_accounts =
if *slot >= requested_flush_root || *slot >= scan_root.unwrap_or(Slot::MAX) {
// 1) If slot > `requested_flush_root`, then either:
// a) If `is_cache_at_limit == false`, still in the cache
// b) if `is_cache_at_limit == true`, were not cleaned before being flushed to storage.
//
// In both cases all the *original* updates at index `slot` were uncleaned and thus
// should be discoverable by this scan.
//
// 2) If slot == `requested_flush_root`, the slot was not cleaned before being flushed to storage,
// so it also contains all the original updates.
//
// 3) If *slot >= scan_root, then we should not clean it either
keys[*slot as usize..]
.iter()
.cloned()
.collect::<HashSet<Pubkey>>()
);
} else {
// Slots less than `max_clean_root` were cleaned in the cache before being flushed
// to storage, should only contain one account
assert_eq!(
slot_accounts,
} else {
// Slots less than `requested_flush_root` and `scan_root` were cleaned in the cache before being flushed
// to storage, should only contain one account
std::iter::once(keys[*slot as usize])
.into_iter()
.collect::<HashSet<Pubkey>>()
);
}
};
assert_eq!(slot_accounts, expected_accounts);
}
if let Some(scan_tracker) = scan_tracker {
scan_tracker.exit().unwrap();
}
}
#[test]
fn test_accounts_db_cache_clean_max_root() {
let max_clean_root = 5;
run_test_accounts_db_cache_clean_max_root(10, max_clean_root);
let requested_flush_root = 5;
run_test_accounts_db_cache_clean_max_root(10, requested_flush_root, None);
}
#[test]
fn test_accounts_db_cache_clean_max_root_with_scan() {
let requested_flush_root = 5;
run_test_accounts_db_cache_clean_max_root(
10,
requested_flush_root,
Some(requested_flush_root - 1),
);
run_test_accounts_db_cache_clean_max_root(
10,
requested_flush_root,
Some(requested_flush_root + 1),
);
}
#[test]
fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit() {
let max_clean_root = 5;
let requested_flush_root = 5;
// Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots
// will be flushed
run_test_accounts_db_cache_clean_max_root(
MAX_CACHE_SLOTS + max_clean_root as usize + 2,
max_clean_root,
MAX_CACHE_SLOTS + requested_flush_root as usize + 2,
requested_flush_root,
None,
);
}
#[test]
fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit_and_scan() {
let requested_flush_root = 5;
// Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots
// will be flushed
run_test_accounts_db_cache_clean_max_root(
MAX_CACHE_SLOTS + requested_flush_root as usize + 2,
requested_flush_root,
Some(requested_flush_root - 1),
);
run_test_accounts_db_cache_clean_max_root(
MAX_CACHE_SLOTS + requested_flush_root as usize + 2,
requested_flush_root,
Some(requested_flush_root + 1),
);
}
fn run_flush_rooted_accounts_cache(should_clean: bool) {
let num_slots = 10;
let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(num_slots);
let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(num_slots, None);
let mut cleaned_bytes = 0;
let mut cleaned_accounts = 0;
let should_clean_tracker = if should_clean {