From 6842e3778057cf4c89aeb8688917f610ef0bb544 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 29 Jul 2020 06:06:34 +0900 Subject: [PATCH] Fix race condition between shrinking and cleaning (for 1.1) (#11234) * Fix race condition between shrinking and cleaning * Fix compile error... * fix ci * Update comments --- core/src/accounts_background_service.rs | 8 +- runtime/src/accounts_db.rs | 113 ++++++++++++++++++++---- runtime/src/bank.rs | 4 +- 3 files changed, 106 insertions(+), 19 deletions(-) diff --git a/core/src/accounts_background_service.rs b/core/src/accounts_background_service.rs index af8024df13..b1a2ad4bc5 100644 --- a/core/src/accounts_background_service.rs +++ b/core/src/accounts_background_service.rs @@ -31,7 +31,13 @@ impl AccountsBackgroundService { bank.process_dead_slots(); // Currently, given INTERVAL_MS, we process 1 slot/100 ms - bank.process_stale_slot(); + let shrunken_account_count = bank.process_stale_slot(); + if shrunken_account_count > 0 { + datapoint_info!( + "stale_slot_shrink", + ("accounts", shrunken_account_count, i64) + ); + } sleep(Duration::from_millis(INTERVAL_MS)); }) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index a465369b19..7d8bcbb612 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -50,7 +50,7 @@ use std::{ ops::RangeBounds, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex, MutexGuard, RwLock}, }; use tempfile::TempDir; @@ -795,6 +795,11 @@ impl AccountsDB { // Only remove those accounts where the entire rooted history of the account // can be purged because there are no live append vecs in the ancestors pub fn clean_accounts(&self) { + // hold a lock to prevent slot shrinking from running because it might modify some rooted + // slot storages which can not happen as long as we're cleaning accounts because we're also + // modifying the rooted slot storages! + let _candidates = self.shrink_candidate_slots.lock().unwrap(); + self.report_store_stats(); let no_ancestors = HashMap::new(); @@ -951,7 +956,7 @@ impl AccountsDB { // Reads all accounts in given slot's AppendVecs and filter only to alive, // then create a minimum AppendVed filled with the alive. - fn shrink_stale_slot(&self, slot: Slot) { + fn do_shrink_stale_slot(&self, slot: Slot) -> usize { trace!("shrink_stale_slot: slot: {}", slot); let mut stored_accounts = vec![]; @@ -979,7 +984,7 @@ impl AccountsDB { alive_count, stored_accounts.len() ); - return; + return 0; } } } @@ -1022,7 +1027,7 @@ impl AccountsDB { let mut hashes = Vec::with_capacity(alive_accounts.len()); let mut write_versions = Vec::with_capacity(alive_accounts.len()); - for (pubkey, account, _size, _location, write_version) in alive_accounts { + for (pubkey, account, _size, _location, write_version) in &alive_accounts { accounts.push((pubkey, account)); hashes.push(account.hash); write_versions.push(*write_version); @@ -1049,28 +1054,39 @@ impl AccountsDB { slot_storage.retain(|_key, store| store.count() > 0); } } + + alive_accounts.len() + } + + fn shrink_stale_slot(&self, candidates: &mut MutexGuard>) -> usize { + if let Some(slot) = self.do_next_shrink_slot(candidates) { + self.do_shrink_stale_slot(slot) + } else { + 0 + } } // Infinitely returns rooted roots in cyclic order - fn next_shrink_slot(&self) -> Option { - let next = { - let mut candidates = self.shrink_candidate_slots.lock().unwrap(); - candidates.pop() - }; + fn do_next_shrink_slot(&self, candidates: &mut MutexGuard>) -> Option { + let next = candidates.pop(); if next.is_some() { next } else { let mut new_all_slots = self.all_root_slots_in_index(); let next = new_all_slots.pop(); - - let mut candidates = self.shrink_candidate_slots.lock().unwrap(); - *candidates = new_all_slots; + **candidates = new_all_slots; next } } + #[cfg(test)] + fn next_shrink_slot(&self) -> Option { + let mut candidates = self.shrink_candidate_slots.lock().unwrap(); + self.do_next_shrink_slot(&mut candidates) + } + fn all_root_slots_in_index(&self) -> Vec { let index = self.accounts_index.read().unwrap(); index.roots.iter().cloned().collect() @@ -1081,15 +1097,27 @@ impl AccountsDB { storage.0.keys().cloned().collect() } - pub fn process_stale_slot(&self) { - if let Some(slot) = self.next_shrink_slot() { - self.shrink_stale_slot(slot); + pub fn process_stale_slot(&self) -> usize { + let mut measure = Measure::start("stale_slot_shrink-ms"); + let candidates = self.shrink_candidate_slots.try_lock(); + if candidates.is_err() { + // skip and return immediately if locked by clean_accounts() + // the calling background thread will just retry later. + return 0; } + // hold this lock as long as this shrinking process is running to avoid conflicts + // with clean_accounts(). + let mut candidates = candidates.unwrap(); + + let count = self.shrink_stale_slot(&mut candidates); + measure.stop(); + inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize); + count } pub fn shrink_all_stale_slots(&self) { for slot in self.all_slots_in_storage() { - self.shrink_stale_slot(slot); + self.do_shrink_stale_slot(slot); } } @@ -4016,4 +4044,57 @@ pub mod tests { assert!(store_counts[&x] >= 1); } } + + #[test] + fn test_shrink_and_clean() { + solana_logger::setup(); + + // repeat the whole test scenario + for _ in 0..5 { + let accounts = Arc::new(AccountsDB::new_single()); + let accounts_for_shrink = accounts.clone(); + + // spawn the slot shrinking background thread + let exit = Arc::new(AtomicBool::default()); + let exit_for_shrink = exit.clone(); + let shrink_thread = std::thread::spawn(move || loop { + if exit_for_shrink.load(Ordering::Relaxed) { + break; + } + accounts_for_shrink.process_stale_slot(); + }); + + let mut alive_accounts = vec![]; + let owner = Pubkey::default(); + + // populate the AccountsDB with plenty of food for slot shrinking + // also this simulates realistic some heavy spike account updates in the wild + for current_slot in 0..1000 { + while alive_accounts.len() <= 10 { + alive_accounts.push(( + Pubkey::new_rand(), + Account::new(thread_rng().gen_range(0, 50), 0, &owner), + )); + } + + alive_accounts.retain(|(_pubkey, account)| account.lamports >= 1); + + for (pubkey, account) in alive_accounts.iter_mut() { + account.lamports -= 1; + accounts.store(current_slot, &[(&pubkey, &account)]); + } + accounts.add_root(current_slot); + } + + // let's dance. + for _ in 0..10 { + accounts.clean_accounts(); + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + // cleanup + exit.store(true, Ordering::Relaxed); + shrink_thread.join().unwrap(); + } + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index e3f214c5fe..e5bda635ca 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -2714,8 +2714,8 @@ impl Bank { self.rc.accounts.accounts_db.process_dead_slots(); } - pub fn process_stale_slot(&self) { - self.rc.accounts.accounts_db.process_stale_slot(); + pub fn process_stale_slot(&self) -> usize { + self.rc.accounts.accounts_db.process_stale_slot() } pub fn shrink_all_stale_slots(&self) {