From fd68b4e7a8177a348a3c72c4c5d67c4a87a6391e Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 7 Jun 2021 09:37:55 +0000 Subject: [PATCH] Support out of band dumping of unrooted slots in AccountsDb (#17269) (#17777) * Accounts dumping logic * Add test for interaction between cache flush and remove_unrooted_slot() * Update comments * Rename * renaming * Add more comments * Renaming * Fixup test and bad check (cherry picked from commit bbcdf073ba2e2e944f6abdb25a538f272944c738) Co-authored-by: carllin --- core/src/repair_service.rs | 2 +- runtime/src/accounts_db.rs | 442 ++++++++++++++++++++++++++++--------- runtime/src/bank.rs | 4 +- 3 files changed, 344 insertions(+), 104 deletions(-) diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 0bb96983bf..a916f62b23 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -576,7 +576,7 @@ impl RepairService { root_bank.clear_slot_signatures(*slot); // Clear the accounts for this slot - root_bank.remove_unrooted_slot(*slot); + root_bank.remove_unrooted_slots(&[*slot]); // Clear the slot-related data in blockstore. This will: // 1) Clear old shreds allowing new ones to be inserted diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 6695f3eecc..b4fdb39abd 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -63,7 +63,7 @@ use std::{ ops::{Range, RangeBounds}, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - sync::{Arc, Mutex, MutexGuard, RwLock}, + sync::{Arc, Condvar, Mutex, MutexGuard, RwLock}, thread::Builder, time::Instant, }; @@ -755,6 +755,16 @@ impl RecycleStores { } } +/// Removing unrooted slots in Accounts Background Service needs to be synchronized with flushing +/// slots from the Accounts Cache. This keeps track of those slots and the Mutex + Condvar for +/// synchronization. +#[derive(Debug, Default)] +struct RemoveUnrootedSlotsSynchronization { + // slots being flushed from the cache or being purged + slots_under_contention: Mutex>, + signal: Condvar, +} + // This structure handles the load/store of the accounts #[derive(Debug)] pub struct AccountsDb { @@ -833,6 +843,11 @@ pub struct AccountsDb { load_limit: AtomicU64, is_bank_drop_callback_enabled: AtomicBool, + + /// Set of slots currently being flushed by `flush_slot_cache()` or removed + /// by `remove_unrooted_slot()`. Used to ensure `remove_unrooted_slots(slots)` + /// can safely clear the set of unrooted slots `slots`. + remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization, } #[derive(Debug, Default)] @@ -975,7 +990,6 @@ struct FlushStats { num_flushed: usize, num_purged: usize, total_size: u64, - did_flush: bool, } #[derive(Debug, Default)] @@ -1264,6 +1278,7 @@ impl Default for AccountsDb { #[cfg(test)] load_limit: AtomicU64::default(), is_bank_drop_callback_enabled: AtomicBool::default(), + remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization::default(), } } } @@ -3396,30 +3411,94 @@ impl AccountsDb { .report("external_purge_slots_stats", Some(1000)); } - // TODO: This is currently: - // 1. Unsafe with scan because it can remove a slot in the middle - // of a scan. - // 2. Doesn't handle cache flushes that happen during the slot deletion (see comment below). - pub fn remove_unrooted_slot(&self, remove_slot: Slot) { - if self.accounts_index.is_root(remove_slot) { - panic!("Trying to remove accounts for rooted slot {}", remove_slot); + // TODO: This is currently unsafe with scan because it can remove a slot in the middle + /// Remove the set of slots in `remove_slots` from both the cache and storage. This requires + /// we know the contents of the slot are either: + /// + /// 1) Completely in the cache + /// 2) Have been completely flushed from the cache + /// + /// in order to guarantee that when this function returns, the contents of the slot have + /// been completely and not only partially removed. Thus synchronization with `flush_slot_cache()` + /// through `self.remove_unrooted_slots_synchronization` is necessary. + pub fn remove_unrooted_slots(&self, remove_slots: &[Slot]) { + let rooted_slots = self + .accounts_index + .get_rooted_from_list(remove_slots.iter()); + assert!( + rooted_slots.is_empty(), + "Trying to remove accounts for rooted slots {:?}", + rooted_slots + ); + + let RemoveUnrootedSlotsSynchronization { + slots_under_contention, + signal, + } = &self.remove_unrooted_slots_synchronization; + + { + // Slots that are currently being flushed by flush_slot_cache() + let mut currently_contended_slots = slots_under_contention.lock().unwrap(); + + // Slots that are currently being flushed by flush_slot_cache() AND + // we want to remove in this function + let mut remaining_contended_flush_slots: Vec = remove_slots + .iter() + .filter(|remove_slot| { + let is_being_flushed = currently_contended_slots.contains(remove_slot); + if !is_being_flushed { + // Reserve the slots that we want to purge that aren't currently + // being flushed to prevent cache from flushing those slots in + // the future. + // + // Note that the single replay thread has to remove a specific slot `N` + // before another version of the same slot can be replayed. This means + // multiple threads should not call `remove_unrooted_slots()` simultaneously + // with the same slot. + currently_contended_slots.insert(**remove_slot); + } + // If the cache is currently flushing this slot, add it to the list + is_being_flushed + }) + .cloned() + .collect(); + + // Wait for cache flushes to finish + loop { + if !remaining_contended_flush_slots.is_empty() { + // Wait for the signal that the cache has finished flushing a slot + // + // Don't wait if the remaining_contended_flush_slots is empty, otherwise + // we may never get a signal since there's no cache flush thread to + // do the signaling + currently_contended_slots = signal.wait(currently_contended_slots).unwrap(); + } else { + // There are no slots being flushed to wait on, so it's safe to continue + // to purging the slots we want to purge! + break; + } + + // For each slot the cache flush has finished, mark that we're about to start + // purging these slots by reserving it in `currently_contended_slots`. + remaining_contended_flush_slots.retain(|flush_slot| { + let is_being_flushed = currently_contended_slots.contains(flush_slot); + if !is_being_flushed { + // Mark that we're about to delete this slot now + currently_contended_slots.insert(*flush_slot); + } + is_being_flushed + }); + } } - // TODO: Handle if the slot was flushed to storage while we were removing the cached - // slot above, i.e. it's possible the storage contains partial version of the current - // slot. One way to handle this is to augment slots to contain a "version", That way, - // 1) We clean older versions via the natural clean() pipeline - // without having to call this function out of band. - // 2) This deletion doesn't have to block on scan - // Reads will then always read the latest version of a slot. Scans will also know - // which version their parents because banks will also be augmented with this version, - // which handles cases where a deletion of one version happens in the middle of the scan. let remove_unrooted_purge_stats = PurgeStats::default(); - self.purge_slots_from_cache_and_store( - std::iter::once(&remove_slot), - &remove_unrooted_purge_stats, - ); + self.purge_slots_from_cache_and_store(remove_slots.iter(), &remove_unrooted_purge_stats); remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", Some(0)); + + let mut currently_contended_slots = slots_under_contention.lock().unwrap(); + for slot in remove_slots { + assert!(currently_contended_slots.remove(slot)); + } } pub fn hash_stored_account(slot: Slot, account: &StoredAccountMeta) -> Hash { @@ -3772,7 +3851,7 @@ impl AccountsDb { should_flush_f.as_mut() }; - if self.flush_slot_cache(root, should_flush_f).did_flush { + if self.flush_slot_cache(root, should_flush_f).is_some() { num_roots_flushed += 1; } @@ -3794,101 +3873,134 @@ impl AccountsDb { (num_new_roots, num_roots_flushed) } - // `should_flush_f` is an optional closure that determines whether a given - // account should be flushed. Passing `None` will by default flush all - // accounts - fn flush_slot_cache( + fn do_flush_slot_cache( &self, slot: Slot, + slot_cache: &SlotCache, mut should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>, ) -> FlushStats { let mut num_purged = 0; let mut total_size = 0; let mut num_flushed = 0; - let did_flush = if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) { - let iter_items: Vec<_> = slot_cache.iter().collect(); - let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new(); - let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![]; - let (accounts, hashes): (Vec<(&Pubkey, &AccountSharedData)>, Vec) = iter_items - .iter() - .filter_map(|iter_item| { - let key = iter_item.key(); - let account = &iter_item.value().account; - let should_flush = should_flush_f - .as_mut() - .map(|should_flush_f| should_flush_f(key, account)) - .unwrap_or(true); - if should_flush { - let hash = iter_item.value().hash(); - total_size += (account.data().len() + STORE_META_OVERHEAD) as u64; - num_flushed += 1; - Some(((key, account), hash)) - } else { - // If we don't flush, we have to remove the entry from the - // index, since it's equivalent to purging - purged_slot_pubkeys.insert((slot, *key)); - pubkey_to_slot_set.push((*key, slot)); - num_purged += 1; - None - } - }) - .unzip(); + let iter_items: Vec<_> = slot_cache.iter().collect(); + let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new(); + let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![]; + let (accounts, hashes): (Vec<(&Pubkey, &AccountSharedData)>, Vec) = iter_items + .iter() + .filter_map(|iter_item| { + let key = iter_item.key(); + let account = &iter_item.value().account; + let should_flush = should_flush_f + .as_mut() + .map(|should_flush_f| should_flush_f(key, account)) + .unwrap_or(true); + if should_flush { + let hash = iter_item.value().hash(); + total_size += (account.data().len() + STORE_META_OVERHEAD) as u64; + num_flushed += 1; + Some(((key, account), hash)) + } else { + // If we don't flush, we have to remove the entry from the + // index, since it's equivalent to purging + purged_slot_pubkeys.insert((slot, *key)); + pubkey_to_slot_set.push((*key, slot)); + num_purged += 1; + None + } + }) + .unzip(); - let is_dead_slot = accounts.is_empty(); - // Remove the account index entries from earlier roots that are outdated by later roots. - // Safe because queries to the index will be reading updates from later roots. - self.purge_slot_cache_pubkeys( + let is_dead_slot = accounts.is_empty(); + // Remove the account index entries from earlier roots that are outdated by later roots. + // Safe because queries to the index will be reading updates from later roots. + self.purge_slot_cache_pubkeys(slot, purged_slot_pubkeys, pubkey_to_slot_set, is_dead_slot); + + if !is_dead_slot { + let aligned_total_size = self.page_align(total_size); + // This ensures that all updates are written to an AppendVec, before any + // updates to the index happen, so anybody that sees a real entry in the index, + // will be able to find the account in storage + let flushed_store = + self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache"); + self.store_accounts_frozen( slot, - purged_slot_pubkeys, - pubkey_to_slot_set, - is_dead_slot, + &accounts, + Some(&hashes), + Some(Box::new(move |_, _| flushed_store.clone())), + None, ); + // If the above sizing function is correct, just one AppendVec is enough to hold + // all the data for the slot + assert_eq!( + self.storage + .get_slot_stores(slot) + .unwrap() + .read() + .unwrap() + .len(), + 1 + ); + } - if !is_dead_slot { - let aligned_total_size = self.page_align(total_size); - // This ensures that all updates are written to an AppendVec, before any - // updates to the index happen, so anybody that sees a real entry in the index, - // will be able to find the account in storage - let flushed_store = - self.create_and_insert_store(slot, aligned_total_size, "flush_slot_cache"); - self.store_accounts_frozen( - slot, - &accounts, - Some(&hashes), - Some(Box::new(move |_, _| flushed_store.clone())), - None, - ); - // If the above sizing function is correct, just one AppendVec is enough to hold - // all the data for the slot - assert_eq!( - self.storage - .get_slot_stores(slot) - .unwrap() - .read() - .unwrap() - .len(), - 1 - ); - } - - // Remove this slot from the cache, which will to AccountsDb's new readers should look like an - // atomic switch from the cache to storage. - // There is some racy condition for existing readers who just has read exactly while - // flushing. That case is handled by retry_to_get_account_accessor() - assert!(self.accounts_cache.remove_slot(slot).is_some()); - true - } else { - false - }; + // Remove this slot from the cache, which will to AccountsDb's new readers should look like an + // atomic switch from the cache to storage. + // There is some racy condition for existing readers who just has read exactly while + // flushing. That case is handled by retry_to_get_account_accessor() + assert!(self.accounts_cache.remove_slot(slot).is_some()); FlushStats { slot, num_flushed, num_purged, total_size, - did_flush, } } + /// `should_flush_f` is an optional closure that determines whether a given + /// account should be flushed. Passing `None` will by default flush all + /// accounts + fn flush_slot_cache( + &self, + slot: Slot, + should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>, + ) -> Option { + self.accounts_cache.slot_cache(slot).and_then(|slot_cache| { + let is_being_purged = { + let mut slots_under_contention = self + .remove_unrooted_slots_synchronization + .slots_under_contention + .lock() + .unwrap(); + // If we're purging this slot, don't flush it here + if slots_under_contention.contains(&slot) { + true + } else { + slots_under_contention.insert(slot); + false + } + }; + if !is_being_purged { + let flush_stats = self.do_flush_slot_cache(slot, &slot_cache, should_flush_f); + // Nobody else should have been purging this slot, so should not have been removed + // from `self.remove_unrooted_slots_synchronization`. + assert!(self + .remove_unrooted_slots_synchronization + .slots_under_contention + .lock() + .unwrap() + .remove(&slot)); + + // Signal to any threads blocked on `remove_unrooted_slots(slot)` that we have finished + // flushing + self.remove_unrooted_slots_synchronization + .signal + .notify_all(); + Some(flush_stats) + } else { + None + } + }) + } + fn write_accounts_to_cache( &self, slot: Slot, @@ -6360,7 +6472,7 @@ pub mod tests { assert_load_account(&db, unrooted_slot, key, 1); // Purge the slot - db.remove_unrooted_slot(unrooted_slot); + db.remove_unrooted_slots(&[unrooted_slot]); assert!(db.load_without_fixed_root(&ancestors, &key).is_none()); assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none()); assert!(db.accounts_cache.slot_cache(unrooted_slot).is_none()); @@ -6397,7 +6509,7 @@ pub mod tests { db.store_uncached(unrooted_slot, &[(&key, &account0)]); // Purge the slot - db.remove_unrooted_slot(unrooted_slot); + db.remove_unrooted_slots(&[unrooted_slot]); // Add a new root let key2 = solana_sdk::pubkey::new_rand(); @@ -10466,6 +10578,134 @@ pub mod tests { do_test_load_account_and_shrink_race(false); } + #[test] + fn test_cache_flush_remove_unrooted_race() { + let caching_enabled = true; + let db = AccountsDb::new_with_config( + Vec::new(), + &ClusterType::Development, + AccountSecondaryIndexes::default(), + caching_enabled, + ); + let db = Arc::new(db); + let num_cached_slots = 100; + + let num_trials = 100; + let (new_trial_start_sender, new_trial_start_receiver) = unbounded(); + let (flush_done_sender, flush_done_receiver) = unbounded(); + // Start up a thread to flush the accounts cache + let t_flush_cache = { + let db = db.clone(); + + std::thread::Builder::new() + .name("account-cache-flush".to_string()) + .spawn(move || loop { + // Wait for the signal to start a trial + if new_trial_start_receiver.recv().is_err() { + return; + } + for slot in 0..num_cached_slots { + db.flush_slot_cache(slot, None::<&mut fn(&_, &_) -> bool>); + } + flush_done_sender.send(()).unwrap(); + }) + .unwrap() + }; + + let exit = Arc::new(AtomicBool::new(false)); + + let t_spurious_signal = { + let db = db.clone(); + let exit = exit.clone(); + std::thread::Builder::new() + .name("account-cache-flush".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + return; + } + // Simulate spurious wake-up that can happen, but is too rare to + // otherwise depend on in tests. + db.remove_unrooted_slots_synchronization.signal.notify_all(); + }) + .unwrap() + }; + + // Run multiple trials. Has the added benefit of rewriting the same slots after we've + // dumped them in previous trials. + for _ in 0..num_trials { + // Store an account + let lamports = 42; + let mut account = AccountSharedData::new(1, 0, AccountSharedData::default().owner()); + account.set_lamports(lamports); + + // Pick random 50% of the slots to pass to `remove_unrooted_slots()` + let mut all_slots: Vec = (0..num_cached_slots).collect(); + all_slots.shuffle(&mut rand::thread_rng()); + let slots_to_dump = &all_slots[0..num_cached_slots as usize / 2]; + let slots_to_keep = &all_slots[num_cached_slots as usize / 2..]; + + // Set up a one account per slot across many different slots, track which + // pubkey was stored in each slot. + let slot_to_pubkey_map: HashMap = (0..num_cached_slots) + .map(|slot| { + let pubkey = Pubkey::new_unique(); + db.store_cached(slot, &[(&pubkey, &account)]); + (slot, pubkey) + }) + .collect(); + + // Signal the flushing shred to start flushing + new_trial_start_sender.send(()).unwrap(); + + // Here we want to test both: + // 1) Flush thread starts flushing a slot before we try dumping it. + // 2) Flushing thread trying to flush while/after we're trying to dump the slot, + // in which case flush should ignore/move past the slot to be dumped + // + // Hence, we split into chunks to get the dumping of each chunk to race with the + // flushes. If we were to dump the entire chunk at once, then this lessens the possibility + // of the flush occurring first since the dumping logic reserves all the slots it's about + // to dump immediately. + for chunks in slots_to_dump.chunks(slots_to_dump.len() / 2) { + db.remove_unrooted_slots(chunks); + } + + // Check that all the slots in `slots_to_dump` were completely removed from the + // cache, storage, and index + for slot in slots_to_dump { + assert!(db.storage.get_slot_storage_entries(*slot).is_none()); + assert!(db.accounts_cache.slot_cache(*slot).is_none()); + let account_in_slot = slot_to_pubkey_map[slot]; + assert!(db + .accounts_index + .get_account_read_entry(&account_in_slot) + .is_none()); + } + + // Wait for flush to finish before starting next trial + flush_done_receiver.recv().unwrap(); + + for slot in slots_to_keep { + let account_in_slot = slot_to_pubkey_map[slot]; + assert!(db + .load( + &Ancestors::from(vec![(*slot, 0)]), + &account_in_slot, + LoadHint::FixedMaxRoot + ) + .is_some()); + // Clear for next iteration so that `assert!(self.storage.get_slot_stores(purged_slot).is_none());` + // in `purge_slot_pubkeys()` doesn't trigger + db.remove_unrooted_slots(&[*slot]); + } + } + + exit.store(true, Ordering::Relaxed); + drop(new_trial_start_sender); + t_flush_cache.join().unwrap(); + t_spurious_signal.join().unwrap(); + } + #[test] fn test_collect_uncleaned_slots_up_to_slot() { solana_logger::setup(); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 2471f5e29f..e59d61926d 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -2652,8 +2652,8 @@ impl Bank { } } - pub fn remove_unrooted_slot(&self, slot: Slot) { - self.rc.accounts.accounts_db.remove_unrooted_slot(slot) + pub fn remove_unrooted_slots(&self, slots: &[Slot]) { + self.rc.accounts.accounts_db.remove_unrooted_slots(slots) } pub fn set_shrink_paths(&self, paths: Vec) {