From c276670a9418d923d0a911c2f0c6bcb390edbbd6 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 26 Jan 2021 22:20:07 -0800 Subject: [PATCH] Snapshots missing slots from accounts cache clean optimization (#14852) (#14878) Co-authored-by: Carl Lin --- runtime/src/accounts_background_service.rs | 17 +- runtime/src/accounts_db.rs | 339 ++++++++++++++------- 2 files changed, 245 insertions(+), 111 deletions(-) diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index dd5245fba9..3342c2205d 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -100,8 +100,23 @@ impl SnapshotRequestHandler { let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time"); if accounts_db_caching_enabled { - // Force flush all the roots from the cache so that the snapshot can be taken. + // Forced cache flushing MUST flush all roots <= snapshot_root_bank.slot(). + // That's because `snapshot_root_bank.slot()` must be root at this point, + // and contains relevant updates because each bank has at least 1 account update due + // to sysvar maintenance. Otherwise, this would cause missing storages in the snapshot snapshot_root_bank.force_flush_accounts_cache(); + // Ensure all roots <= `self.slot()` have been flushed. + // Note `max_flush_root` could be larger than self.slot() if there are + // `> MAX_CACHE_SLOT` cached and rooted slots which triggered earlier flushes. + assert!( + snapshot_root_bank.slot() + <= snapshot_root_bank + .rc + .accounts + .accounts_db + .accounts_cache + .fetch_max_flush_root() + ); } flush_accounts_cache_time.stop(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 7aed7bc5af..5b089ab279 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -2996,27 +2996,31 @@ impl AccountsDB { self.accounts_cache.report_size(); } - // `force_flush` flushes all the cached roots `<= max_clean_root`. It also then + // `force_flush` flushes all the cached roots `<= requested_flush_root`. It also then // flushes: // 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache, // 2) It there are still > MAX_CACHE_SLOTS remaining slots in the cache, the excess // unrooted slots - pub fn flush_accounts_cache(&self, force_flush: bool, max_clean_root: Option) { + pub fn flush_accounts_cache(&self, force_flush: bool, requested_flush_root: Option) { #[cfg(not(test))] - assert!(max_clean_root.is_some()); + assert!(requested_flush_root.is_some()); if !force_flush && self.accounts_cache.num_slots() <= MAX_CACHE_SLOTS { return; } - // Flush only the roots <= max_clean_root, so that snapshotting has all + // Flush only the roots <= requested_flush_root, so that snapshotting has all // the relevant roots in storage. let mut flush_roots_elapsed = Measure::start("flush_roots_elapsed"); let mut account_bytes_saved = 0; let mut num_accounts_saved = 0; + + // Note even if force_flush is false, we will still flush all roots <= the + // given `requested_flush_root`, even if some of the later roots cannot be used for + // cleaning due to an ongoing scan let (total_new_cleaned_roots, num_cleaned_roots_flushed) = self .flush_rooted_accounts_cache( - max_clean_root, + requested_flush_root, Some((&mut account_bytes_saved, &mut num_accounts_saved)), ); flush_roots_elapsed.stop(); @@ -3030,7 +3034,7 @@ impl AccountsDB { if self.accounts_cache.num_slots() > MAX_CACHE_SLOTS { // Start by flushing the roots // - // Cannot do any cleaning on roots past `max_clean_root` because future + // Cannot do any cleaning on roots past `requested_flush_root` because future // snapshots may need updates from those later slots, hence we pass `None` // for `should_clean`. self.flush_rooted_accounts_cache(None, None) @@ -3093,16 +3097,14 @@ impl AccountsDB { fn flush_rooted_accounts_cache( &self, - mut max_flush_root: Option, + requested_flush_root: Option, should_clean: Option<(&mut usize, &mut usize)>, ) -> (usize, usize) { - if should_clean.is_some() { - max_flush_root = self.max_clean_root(max_flush_root); - } - - // If there is a long running scan going on, this could prevent any cleaning - // past `max_flush_root`. - let cached_roots: BTreeSet = self.accounts_cache.clear_roots(max_flush_root); + let max_clean_root = should_clean.as_ref().and_then(|_| { + // If there is a long running scan going on, this could prevent any cleaning + // based on updates from slots > `max_clean_root`. + self.max_clean_root(requested_flush_root) + }); // Use HashMap because HashSet doesn't provide Entry api let mut written_accounts = HashMap::new(); @@ -3112,7 +3114,6 @@ impl AccountsDB { let mut should_flush_f = should_clean.map(|(account_bytes_saved, num_accounts_saved)| { move |&pubkey: &Pubkey, account: &Account| { use std::collections::hash_map::Entry::{Occupied, Vacant}; - let should_flush = match written_accounts.entry(pubkey) { Vacant(vacant_entry) => { vacant_entry.insert(()); @@ -3130,11 +3131,27 @@ impl AccountsDB { } }); + // Always flush up to `requested_flush_root`, which is necessary for things like snapshotting. + let cached_roots: BTreeSet = self.accounts_cache.clear_roots(requested_flush_root); + // Iterate from highest to lowest so that we don't need to flush earlier // outdated updates in earlier roots let mut num_roots_flushed = 0; for &root in cached_roots.iter().rev() { - if self.flush_slot_cache(root, should_flush_f.as_mut()) { + let should_flush_f = if let Some(max_clean_root) = max_clean_root { + if root > max_clean_root { + // Only if the root is greater than the `max_clean_root` do we + // have to prevent cleaning, otherwise, just default to `should_flush_f` + // for any slots <= `max_clean_root` + None + } else { + should_flush_f.as_mut() + } + } else { + should_flush_f.as_mut() + }; + + if self.flush_slot_cache(root, should_flush_f) { num_roots_flushed += 1; } @@ -4390,7 +4407,7 @@ pub mod tests { use std::{ iter::FromIterator, str::FromStr, - thread::{sleep, Builder}, + thread::{self, sleep, Builder, JoinHandle}, time::Duration, }; @@ -7431,54 +7448,37 @@ pub mod tests { .is_none()); } - #[test] - fn test_scan_flush_accounts_cache_then_clean_drop() { - let caching_enabled = true; - let db = Arc::new(AccountsDB::new_with_config( - Vec::new(), - &ClusterType::Development, - HashSet::new(), - caching_enabled, - )); - let db_ = db.clone(); - let account_key = Pubkey::new_unique(); - let account_key2 = Pubkey::new_unique(); - let zero_lamport_account = Account::new(0, 0, &Account::default().owner); - let slot1_account = Account::new(1, 1, &Account::default().owner); - let slot2_account = Account::new(2, 1, &Account::default().owner); + struct ScanTracker { + t_scan: JoinHandle<()>, + exit: Arc, + } + + impl ScanTracker { + fn exit(self) -> thread::Result<()> { + self.exit.store(true, Ordering::Relaxed); + self.t_scan.join() + } + } + + fn setup_scan( + db: Arc, + scan_ancestors: Arc, + stall_key: Pubkey, + ) -> ScanTracker { let exit = Arc::new(AtomicBool::new(false)); let exit_ = exit.clone(); let ready = Arc::new(AtomicBool::new(false)); let ready_ = ready.clone(); - /* - Store zero lamport account into slots 0, 1, 2 where - root slots are 0, 2, and slot 1 is unrooted. - 0 (root) - / \ - 1 2 (root) - */ - db.store_cached(0, &[(&account_key, &zero_lamport_account)]); - db.store_cached(1, &[(&account_key, &slot1_account)]); - db.store_cached(2, &[(&account_key, &slot2_account)]); - // Fodder for the scan so that the lock on `account_key` is not held - db.store_cached(2, &[(&account_key2, &slot2_account)]); - db.get_accounts_delta_hash(0); - db.add_root(0); - let max_scan_root = 0; - let scan_ancestors: Arc = Arc::new(vec![(0, 1), (1, 1)].into_iter().collect()); - let scan_ancestors_ = scan_ancestors.clone(); let t_scan = Builder::new() .name("scan".to_string()) .spawn(move || { - db_.scan_accounts( - &scan_ancestors_, + db.scan_accounts( + &scan_ancestors, |_collector: &mut Vec<(Pubkey, Account)>, maybe_account| { ready_.store(true, Ordering::Relaxed); if let Some((pubkey, _, _)) = maybe_account { - // Do the wait on account_key2, because clean is happening - // on account_key1's index and we don't want to block the clean. - if *pubkey == account_key2 { + if *pubkey == stall_key { loop { if exit_.load(Ordering::Relaxed) { break; @@ -7498,15 +7498,68 @@ pub mod tests { sleep(Duration::from_millis(10)); } - // Add a new root 2 - db.get_accounts_delta_hash(2); - db.add_root(2); + ScanTracker { t_scan, exit } + } - // Flush the cache, slot 1 should remain in the cache, everything else should be flushed - db.flush_accounts_cache(true, None); + #[test] + fn test_scan_flush_accounts_cache_then_clean_drop() { + let caching_enabled = true; + let db = Arc::new(AccountsDB::new_with_config( + Vec::new(), + &ClusterType::Development, + HashSet::new(), + caching_enabled, + )); + let account_key = Pubkey::new_unique(); + let account_key2 = Pubkey::new_unique(); + let zero_lamport_account = Account::new(0, 0, &Account::default().owner); + let slot1_account = Account::new(1, 1, &Account::default().owner); + let slot2_account = Account::new(2, 1, &Account::default().owner); + + /* + Store zero lamport account into slots 0, 1, 2 where + root slots are 0, 2, and slot 1 is unrooted. + 0 (root) + / \ + 1 2 (root) + */ + db.store_cached(0, &[(&account_key, &zero_lamport_account)]); + db.store_cached(1, &[(&account_key, &slot1_account)]); + // Fodder for the scan so that the lock on `account_key` is not held + db.store_cached(1, &[(&account_key2, &slot1_account)]); + db.store_cached(2, &[(&account_key, &slot2_account)]); + db.get_accounts_delta_hash(0); + + let max_scan_root = 0; + db.add_root(max_scan_root); + let scan_ancestors: Arc = Arc::new(vec![(0, 1), (1, 1)].into_iter().collect()); + let scan_tracker = setup_scan(db.clone(), scan_ancestors.clone(), account_key2); + + // Add a new root 2 + let new_root = 2; + db.get_accounts_delta_hash(new_root); + db.add_root(new_root); + + // Check that the scan is properly set up + assert_eq!( + db.accounts_index.min_ongoing_scan_root().unwrap(), + max_scan_root + ); + + // If we specify a requested_flush_root == 2, then `slot 2 <= max_flush_slot` will + // be flushed even though `slot 2 > max_scan_root`. The unrooted slot 1 should + // remain in the cache + db.flush_accounts_cache(true, Some(new_root)); assert_eq!(db.accounts_cache.num_slots(), 1); assert!(db.accounts_cache.slot_cache(1).is_some()); + // Intra cache cleaning should not clean the entry for `account_key` from slot 0, + // even though it was updated in slot `2` because of the ongoing scan + let account = db + .do_load(&Ancestors::default(), &account_key, Some(0)) + .unwrap(); + assert_eq!(account.0.lamports, zero_lamport_account.lamports); + // Run clean, unrooted slot 1 should not be purged, and still readable from the cache, // because we're still doing a scan on it. db.clean_accounts(None); @@ -7517,8 +7570,7 @@ pub mod tests { // When the scan is over, clean should not panic and should not purge something // still in the cache. - exit.store(true, Ordering::Relaxed); - t_scan.join().unwrap(); + scan_tracker.exit().unwrap(); db.clean_accounts(None); let account = db .do_load(&scan_ancestors, &account_key, Some(max_scan_root)) @@ -7585,40 +7637,63 @@ pub mod tests { } } - fn setup_accounts_db_cache_clean(num_slots: usize) -> (AccountsDB, Vec, Vec) { + fn setup_accounts_db_cache_clean( + num_slots: usize, + scan_slot: Option, + ) -> (Arc, Vec, Vec, Option) { let caching_enabled = true; - let accounts_db = AccountsDB::new_with_config( + let accounts_db = Arc::new(AccountsDB::new_with_config( Vec::new(), &ClusterType::Development, HashSet::new(), caching_enabled, - ); + )); let slots: Vec<_> = (0..num_slots as Slot).into_iter().collect(); + let stall_slot = num_slots as Slot; + let scan_stall_key = Pubkey::new_unique(); let keys: Vec = std::iter::repeat_with(Pubkey::new_unique) .take(num_slots) .collect(); + if scan_slot.is_some() { + accounts_db.store_cached( + // Store it in a slot that isn't returned in `slots` + stall_slot, + &[(&scan_stall_key, &Account::new(1, 0, &Pubkey::default()))], + ); + } // Store some subset of the keys in slots 0..num_slots + let mut scan_tracker = None; for slot in &slots { for key in &keys[*slot as usize..] { accounts_db.store_cached(*slot, &[(key, &Account::new(1, 0, &Pubkey::default()))]); } accounts_db.add_root(*slot as Slot); + if Some(*slot) == scan_slot { + let ancestors = Arc::new(vec![(stall_slot, 1), (*slot, 1)].into_iter().collect()); + scan_tracker = Some(setup_scan(accounts_db.clone(), ancestors, scan_stall_key)); + assert_eq!( + accounts_db.accounts_index.min_ongoing_scan_root().unwrap(), + *slot + ); + } } + accounts_db.accounts_cache.remove_slot(stall_slot); + // If there's <= MAX_CACHE_SLOTS, no slots should be flushed if accounts_db.accounts_cache.num_slots() <= MAX_CACHE_SLOTS { accounts_db.flush_accounts_cache(false, None); assert_eq!(accounts_db.accounts_cache.num_slots(), num_slots); } - (accounts_db, keys, slots) + (accounts_db, keys, slots, scan_tracker) } #[test] fn test_accounts_db_cache_clean_dead_slots() { let num_slots = 10; - let (accounts_db, keys, mut slots) = setup_accounts_db_cache_clean(num_slots); + let (accounts_db, keys, mut slots, _) = setup_accounts_db_cache_clean(num_slots, None); let last_dead_slot = (num_slots - 1) as Slot; assert_eq!(*slots.last().unwrap(), last_dead_slot); let alive_slot = last_dead_slot as Slot + 1; @@ -7685,7 +7760,7 @@ pub mod tests { #[test] fn test_accounts_db_cache_clean() { - let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(10); + let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(10, None); // If no `max_clean_root` is specified, cleaning should purge all flushed slots accounts_db.flush_accounts_cache(true, None); @@ -7719,21 +7794,27 @@ pub mod tests { } } - fn run_test_accounts_db_cache_clean_max_root(num_slots: usize, max_clean_root: Slot) { - assert!(max_clean_root < (num_slots as Slot)); - let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(num_slots); - let is_cache_at_limit = num_slots - max_clean_root as usize - 1 > MAX_CACHE_SLOTS; + fn run_test_accounts_db_cache_clean_max_root( + num_slots: usize, + requested_flush_root: Slot, + scan_root: Option, + ) { + assert!(requested_flush_root < (num_slots as Slot)); + let (accounts_db, keys, slots, scan_tracker) = + setup_accounts_db_cache_clean(num_slots, scan_root); + let is_cache_at_limit = num_slots - requested_flush_root as usize - 1 > MAX_CACHE_SLOTS; + // If: - // 1) `max_clean_root` is specified, + // 1) `requested_flush_root` is specified, // 2) not at the cache limit, i.e. `is_cache_at_limit == false`, then - // `flush_accounts_cache()` should clean and flushed only slots < max_clean_root, - accounts_db.flush_accounts_cache(true, Some(max_clean_root)); + // `flush_accounts_cache()` should clean and flush only slots <= requested_flush_root, + accounts_db.flush_accounts_cache(true, Some(requested_flush_root)); if !is_cache_at_limit { - // Should flush all slots between 0..=max_clean_root + // Should flush all slots between 0..=requested_flush_root assert_eq!( accounts_db.accounts_cache.num_slots(), - slots.len() - max_clean_root as usize - 1 + slots.len() - requested_flush_root as usize - 1 ); } else { // Otherwise, if we are at the cache limit, all roots will be flushed @@ -7747,9 +7828,9 @@ pub mod tests { .collect::>(); uncleaned_roots.sort_unstable(); - let expected_max_clean_root = if !is_cache_at_limit { - // Should flush all slots between 0..=max_clean_root - max_clean_root + let expected_max_flushed_root = if !is_cache_at_limit { + // Should flush all slots between 0..=requested_flush_root + requested_flush_root } else { // Otherwise, if we are at the cache limit, all roots will be flushed num_slots as Slot - 1 @@ -7757,14 +7838,13 @@ pub mod tests { assert_eq!( uncleaned_roots, - slots[0..=expected_max_clean_root as usize].to_vec() + slots[0..=expected_max_flushed_root as usize].to_vec() ); assert_eq!( accounts_db.accounts_cache.fetch_max_flush_root(), - expected_max_clean_root, + expected_max_flushed_root, ); - // Updates from slots > max_clean_root should still be flushed to storage for slot in &slots { let slot_accounts = accounts_db.scan_account_storage( *slot as Slot, @@ -7774,7 +7854,9 @@ pub mod tests { "When cache is at limit, all roots should have been flushed to storage" ); } - assert!(*slot > max_clean_root); + // All slots <= requested_flush_root should have been flushed, regardless + // of ongoing scans + assert!(*slot > requested_flush_root); Some(*loaded_account.pubkey()) }, |slot_accounts: &DashSet, loaded_account: LoadedAccount| { @@ -7782,7 +7864,7 @@ pub mod tests { if !is_cache_at_limit { // Only true when the limit hasn't been reached and there are still // slots left in the cache - assert!(*slot <= max_clean_root); + assert!(*slot <= requested_flush_root); } }, ); @@ -7795,56 +7877,93 @@ pub mod tests { slot_accounts.into_iter().collect::>() } }; - if *slot >= max_clean_root { - // 1) If slot > `max_clean_root`, then either: - // a) If `is_cache_at_limit == true`, still in the cache - // b) if `is_cache_at_limit == false`, were not cleaned before being flushed to storage. - // - // In both cases all the *original* updates at index `slot` were uncleaned and thus - // should be discoverable by this scan. - // - // 2) If slot == `max_clean_root`, the slot was not cleaned before being flushed to storage, - // so it also contains all the original updates. - assert_eq!( - slot_accounts, + + let expected_accounts = + if *slot >= requested_flush_root || *slot >= scan_root.unwrap_or(Slot::MAX) { + // 1) If slot > `requested_flush_root`, then either: + // a) If `is_cache_at_limit == false`, still in the cache + // b) if `is_cache_at_limit == true`, were not cleaned before being flushed to storage. + // + // In both cases all the *original* updates at index `slot` were uncleaned and thus + // should be discoverable by this scan. + // + // 2) If slot == `requested_flush_root`, the slot was not cleaned before being flushed to storage, + // so it also contains all the original updates. + // + // 3) If *slot >= scan_root, then we should not clean it either keys[*slot as usize..] .iter() .cloned() .collect::>() - ); - } else { - // Slots less than `max_clean_root` were cleaned in the cache before being flushed - // to storage, should only contain one account - assert_eq!( - slot_accounts, + } else { + // Slots less than `requested_flush_root` and `scan_root` were cleaned in the cache before being flushed + // to storage, should only contain one account std::iter::once(keys[*slot as usize]) .into_iter() .collect::>() - ); - } + }; + + assert_eq!(slot_accounts, expected_accounts); + } + + if let Some(scan_tracker) = scan_tracker { + scan_tracker.exit().unwrap(); } } #[test] fn test_accounts_db_cache_clean_max_root() { - let max_clean_root = 5; - run_test_accounts_db_cache_clean_max_root(10, max_clean_root); + let requested_flush_root = 5; + run_test_accounts_db_cache_clean_max_root(10, requested_flush_root, None); + } + + #[test] + fn test_accounts_db_cache_clean_max_root_with_scan() { + let requested_flush_root = 5; + run_test_accounts_db_cache_clean_max_root( + 10, + requested_flush_root, + Some(requested_flush_root - 1), + ); + run_test_accounts_db_cache_clean_max_root( + 10, + requested_flush_root, + Some(requested_flush_root + 1), + ); } #[test] fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit() { - let max_clean_root = 5; + let requested_flush_root = 5; // Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots // will be flushed run_test_accounts_db_cache_clean_max_root( - MAX_CACHE_SLOTS + max_clean_root as usize + 2, - max_clean_root, + MAX_CACHE_SLOTS + requested_flush_root as usize + 2, + requested_flush_root, + None, + ); + } + + #[test] + fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit_and_scan() { + let requested_flush_root = 5; + // Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots + // will be flushed + run_test_accounts_db_cache_clean_max_root( + MAX_CACHE_SLOTS + requested_flush_root as usize + 2, + requested_flush_root, + Some(requested_flush_root - 1), + ); + run_test_accounts_db_cache_clean_max_root( + MAX_CACHE_SLOTS + requested_flush_root as usize + 2, + requested_flush_root, + Some(requested_flush_root + 1), ); } fn run_flush_rooted_accounts_cache(should_clean: bool) { let num_slots = 10; - let (accounts_db, keys, slots) = setup_accounts_db_cache_clean(num_slots); + let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(num_slots, None); let mut cleaned_bytes = 0; let mut cleaned_accounts = 0; let should_clean_tracker = if should_clean {