Fix race condition between shrinking and cleaning (for 1.1) (#11234)
* Fix race condition between shrinking and cleaning * Fix compile error... * fix ci * Update comments
This commit is contained in:
@ -31,7 +31,13 @@ impl AccountsBackgroundService {
|
|||||||
bank.process_dead_slots();
|
bank.process_dead_slots();
|
||||||
|
|
||||||
// Currently, given INTERVAL_MS, we process 1 slot/100 ms
|
// Currently, given INTERVAL_MS, we process 1 slot/100 ms
|
||||||
bank.process_stale_slot();
|
let shrunken_account_count = bank.process_stale_slot();
|
||||||
|
if shrunken_account_count > 0 {
|
||||||
|
datapoint_info!(
|
||||||
|
"stale_slot_shrink",
|
||||||
|
("accounts", shrunken_account_count, i64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
sleep(Duration::from_millis(INTERVAL_MS));
|
sleep(Duration::from_millis(INTERVAL_MS));
|
||||||
})
|
})
|
||||||
|
@ -50,7 +50,7 @@ use std::{
|
|||||||
ops::RangeBounds,
|
ops::RangeBounds,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
|
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
|
||||||
sync::{Arc, Mutex, RwLock},
|
sync::{Arc, Mutex, MutexGuard, RwLock},
|
||||||
};
|
};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
@ -795,6 +795,11 @@ impl AccountsDB {
|
|||||||
// Only remove those accounts where the entire rooted history of the account
|
// Only remove those accounts where the entire rooted history of the account
|
||||||
// can be purged because there are no live append vecs in the ancestors
|
// can be purged because there are no live append vecs in the ancestors
|
||||||
pub fn clean_accounts(&self) {
|
pub fn clean_accounts(&self) {
|
||||||
|
// hold a lock to prevent slot shrinking from running because it might modify some rooted
|
||||||
|
// slot storages which can not happen as long as we're cleaning accounts because we're also
|
||||||
|
// modifying the rooted slot storages!
|
||||||
|
let _candidates = self.shrink_candidate_slots.lock().unwrap();
|
||||||
|
|
||||||
self.report_store_stats();
|
self.report_store_stats();
|
||||||
|
|
||||||
let no_ancestors = HashMap::new();
|
let no_ancestors = HashMap::new();
|
||||||
@ -951,7 +956,7 @@ impl AccountsDB {
|
|||||||
|
|
||||||
// Reads all accounts in given slot's AppendVecs and filter only to alive,
|
// Reads all accounts in given slot's AppendVecs and filter only to alive,
|
||||||
// then create a minimum AppendVed filled with the alive.
|
// then create a minimum AppendVed filled with the alive.
|
||||||
fn shrink_stale_slot(&self, slot: Slot) {
|
fn do_shrink_stale_slot(&self, slot: Slot) -> usize {
|
||||||
trace!("shrink_stale_slot: slot: {}", slot);
|
trace!("shrink_stale_slot: slot: {}", slot);
|
||||||
|
|
||||||
let mut stored_accounts = vec![];
|
let mut stored_accounts = vec![];
|
||||||
@ -979,7 +984,7 @@ impl AccountsDB {
|
|||||||
alive_count,
|
alive_count,
|
||||||
stored_accounts.len()
|
stored_accounts.len()
|
||||||
);
|
);
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1022,7 +1027,7 @@ impl AccountsDB {
|
|||||||
let mut hashes = Vec::with_capacity(alive_accounts.len());
|
let mut hashes = Vec::with_capacity(alive_accounts.len());
|
||||||
let mut write_versions = Vec::with_capacity(alive_accounts.len());
|
let mut write_versions = Vec::with_capacity(alive_accounts.len());
|
||||||
|
|
||||||
for (pubkey, account, _size, _location, write_version) in alive_accounts {
|
for (pubkey, account, _size, _location, write_version) in &alive_accounts {
|
||||||
accounts.push((pubkey, account));
|
accounts.push((pubkey, account));
|
||||||
hashes.push(account.hash);
|
hashes.push(account.hash);
|
||||||
write_versions.push(*write_version);
|
write_versions.push(*write_version);
|
||||||
@ -1049,28 +1054,39 @@ impl AccountsDB {
|
|||||||
slot_storage.retain(|_key, store| store.count() > 0);
|
slot_storage.retain(|_key, store| store.count() > 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
alive_accounts.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn shrink_stale_slot(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> usize {
|
||||||
|
if let Some(slot) = self.do_next_shrink_slot(candidates) {
|
||||||
|
self.do_shrink_stale_slot(slot)
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Infinitely returns rooted roots in cyclic order
|
// Infinitely returns rooted roots in cyclic order
|
||||||
fn next_shrink_slot(&self) -> Option<Slot> {
|
fn do_next_shrink_slot(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> Option<Slot> {
|
||||||
let next = {
|
let next = candidates.pop();
|
||||||
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
|
|
||||||
candidates.pop()
|
|
||||||
};
|
|
||||||
|
|
||||||
if next.is_some() {
|
if next.is_some() {
|
||||||
next
|
next
|
||||||
} else {
|
} else {
|
||||||
let mut new_all_slots = self.all_root_slots_in_index();
|
let mut new_all_slots = self.all_root_slots_in_index();
|
||||||
let next = new_all_slots.pop();
|
let next = new_all_slots.pop();
|
||||||
|
**candidates = new_all_slots;
|
||||||
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
|
|
||||||
*candidates = new_all_slots;
|
|
||||||
|
|
||||||
next
|
next
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
fn next_shrink_slot(&self) -> Option<Slot> {
|
||||||
|
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
|
||||||
|
self.do_next_shrink_slot(&mut candidates)
|
||||||
|
}
|
||||||
|
|
||||||
fn all_root_slots_in_index(&self) -> Vec<Slot> {
|
fn all_root_slots_in_index(&self) -> Vec<Slot> {
|
||||||
let index = self.accounts_index.read().unwrap();
|
let index = self.accounts_index.read().unwrap();
|
||||||
index.roots.iter().cloned().collect()
|
index.roots.iter().cloned().collect()
|
||||||
@ -1081,15 +1097,27 @@ impl AccountsDB {
|
|||||||
storage.0.keys().cloned().collect()
|
storage.0.keys().cloned().collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn process_stale_slot(&self) {
|
pub fn process_stale_slot(&self) -> usize {
|
||||||
if let Some(slot) = self.next_shrink_slot() {
|
let mut measure = Measure::start("stale_slot_shrink-ms");
|
||||||
self.shrink_stale_slot(slot);
|
let candidates = self.shrink_candidate_slots.try_lock();
|
||||||
|
if candidates.is_err() {
|
||||||
|
// skip and return immediately if locked by clean_accounts()
|
||||||
|
// the calling background thread will just retry later.
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
// hold this lock as long as this shrinking process is running to avoid conflicts
|
||||||
|
// with clean_accounts().
|
||||||
|
let mut candidates = candidates.unwrap();
|
||||||
|
|
||||||
|
let count = self.shrink_stale_slot(&mut candidates);
|
||||||
|
measure.stop();
|
||||||
|
inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize);
|
||||||
|
count
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn shrink_all_stale_slots(&self) {
|
pub fn shrink_all_stale_slots(&self) {
|
||||||
for slot in self.all_slots_in_storage() {
|
for slot in self.all_slots_in_storage() {
|
||||||
self.shrink_stale_slot(slot);
|
self.do_shrink_stale_slot(slot);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4016,4 +4044,57 @@ pub mod tests {
|
|||||||
assert!(store_counts[&x] >= 1);
|
assert!(store_counts[&x] >= 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_shrink_and_clean() {
|
||||||
|
solana_logger::setup();
|
||||||
|
|
||||||
|
// repeat the whole test scenario
|
||||||
|
for _ in 0..5 {
|
||||||
|
let accounts = Arc::new(AccountsDB::new_single());
|
||||||
|
let accounts_for_shrink = accounts.clone();
|
||||||
|
|
||||||
|
// spawn the slot shrinking background thread
|
||||||
|
let exit = Arc::new(AtomicBool::default());
|
||||||
|
let exit_for_shrink = exit.clone();
|
||||||
|
let shrink_thread = std::thread::spawn(move || loop {
|
||||||
|
if exit_for_shrink.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
accounts_for_shrink.process_stale_slot();
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut alive_accounts = vec![];
|
||||||
|
let owner = Pubkey::default();
|
||||||
|
|
||||||
|
// populate the AccountsDB with plenty of food for slot shrinking
|
||||||
|
// also this simulates realistic some heavy spike account updates in the wild
|
||||||
|
for current_slot in 0..1000 {
|
||||||
|
while alive_accounts.len() <= 10 {
|
||||||
|
alive_accounts.push((
|
||||||
|
Pubkey::new_rand(),
|
||||||
|
Account::new(thread_rng().gen_range(0, 50), 0, &owner),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
alive_accounts.retain(|(_pubkey, account)| account.lamports >= 1);
|
||||||
|
|
||||||
|
for (pubkey, account) in alive_accounts.iter_mut() {
|
||||||
|
account.lamports -= 1;
|
||||||
|
accounts.store(current_slot, &[(&pubkey, &account)]);
|
||||||
|
}
|
||||||
|
accounts.add_root(current_slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
// let's dance.
|
||||||
|
for _ in 0..10 {
|
||||||
|
accounts.clean_accounts();
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanup
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
shrink_thread.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2714,8 +2714,8 @@ impl Bank {
|
|||||||
self.rc.accounts.accounts_db.process_dead_slots();
|
self.rc.accounts.accounts_db.process_dead_slots();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn process_stale_slot(&self) {
|
pub fn process_stale_slot(&self) -> usize {
|
||||||
self.rc.accounts.accounts_db.process_stale_slot();
|
self.rc.accounts.accounts_db.process_stale_slot()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn shrink_all_stale_slots(&self) {
|
pub fn shrink_all_stale_slots(&self) {
|
||||||
|
Reference in New Issue
Block a user