Fix race in remove_unrooted_race and flush_slot_cache (#18785)
This commit is contained in:
@ -3691,6 +3691,7 @@ impl AccountsDb {
|
|||||||
|
|
||||||
{
|
{
|
||||||
// Slots that are currently being flushed by flush_slot_cache()
|
// Slots that are currently being flushed by flush_slot_cache()
|
||||||
|
|
||||||
let mut currently_contended_slots = slots_under_contention.lock().unwrap();
|
let mut currently_contended_slots = slots_under_contention.lock().unwrap();
|
||||||
|
|
||||||
// Slots that are currently being flushed by flush_slot_cache() AND
|
// Slots that are currently being flushed by flush_slot_cache() AND
|
||||||
@ -4229,42 +4230,53 @@ impl AccountsDb {
|
|||||||
slot: Slot,
|
slot: Slot,
|
||||||
should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>,
|
should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>,
|
||||||
) -> Option<FlushStats> {
|
) -> Option<FlushStats> {
|
||||||
self.accounts_cache.slot_cache(slot).and_then(|slot_cache| {
|
let is_being_purged = {
|
||||||
let is_being_purged = {
|
let mut slots_under_contention = self
|
||||||
let mut slots_under_contention = self
|
.remove_unrooted_slots_synchronization
|
||||||
.remove_unrooted_slots_synchronization
|
.slots_under_contention
|
||||||
.slots_under_contention
|
.lock()
|
||||||
.lock()
|
.unwrap();
|
||||||
.unwrap();
|
// If we're purging this slot, don't flush it here
|
||||||
// If we're purging this slot, don't flush it here
|
if slots_under_contention.contains(&slot) {
|
||||||
if slots_under_contention.contains(&slot) {
|
true
|
||||||
true
|
|
||||||
} else {
|
|
||||||
slots_under_contention.insert(slot);
|
|
||||||
false
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if !is_being_purged {
|
|
||||||
let flush_stats = self.do_flush_slot_cache(slot, &slot_cache, should_flush_f);
|
|
||||||
// Nobody else should have been purging this slot, so should not have been removed
|
|
||||||
// from `self.remove_unrooted_slots_synchronization`.
|
|
||||||
assert!(self
|
|
||||||
.remove_unrooted_slots_synchronization
|
|
||||||
.slots_under_contention
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.remove(&slot));
|
|
||||||
|
|
||||||
// Signal to any threads blocked on `remove_unrooted_slots(slot)` that we have finished
|
|
||||||
// flushing
|
|
||||||
self.remove_unrooted_slots_synchronization
|
|
||||||
.signal
|
|
||||||
.notify_all();
|
|
||||||
Some(flush_stats)
|
|
||||||
} else {
|
} else {
|
||||||
None
|
slots_under_contention.insert(slot);
|
||||||
|
false
|
||||||
}
|
}
|
||||||
})
|
};
|
||||||
|
|
||||||
|
if !is_being_purged {
|
||||||
|
let flush_stats = self.accounts_cache.slot_cache(slot).map(|slot_cache| {
|
||||||
|
#[cfg(test)]
|
||||||
|
{
|
||||||
|
// Give some time for cache flushing to occur here for unit tests
|
||||||
|
sleep(Duration::from_millis(self.load_delay));
|
||||||
|
}
|
||||||
|
// Since we added the slot to `slots_under_contention` AND this slot
|
||||||
|
// still exists in the cache, we know the slot cannot be removed
|
||||||
|
// by any other threads past this point. We are now responsible for
|
||||||
|
// flushing this slot.
|
||||||
|
self.do_flush_slot_cache(slot, &slot_cache, should_flush_f)
|
||||||
|
});
|
||||||
|
|
||||||
|
// Nobody else should have been purging this slot, so should not have been removed
|
||||||
|
// from `self.remove_unrooted_slots_synchronization`.
|
||||||
|
assert!(self
|
||||||
|
.remove_unrooted_slots_synchronization
|
||||||
|
.slots_under_contention
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.remove(&slot));
|
||||||
|
|
||||||
|
// Signal to any threads blocked on `remove_unrooted_slots(slot)` that we have finished
|
||||||
|
// flushing
|
||||||
|
self.remove_unrooted_slots_synchronization
|
||||||
|
.signal
|
||||||
|
.notify_all();
|
||||||
|
flush_stats
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_accounts_to_cache(
|
fn write_accounts_to_cache(
|
||||||
@ -11516,7 +11528,79 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_cache_flush_remove_unrooted_race() {
|
fn test_cache_flush_delayed_remove_unrooted_race() {
|
||||||
|
let caching_enabled = true;
|
||||||
|
let mut db = AccountsDb::new_with_config(
|
||||||
|
Vec::new(),
|
||||||
|
&ClusterType::Development,
|
||||||
|
AccountSecondaryIndexes::default(),
|
||||||
|
caching_enabled,
|
||||||
|
AccountShrinkThreshold::default(),
|
||||||
|
);
|
||||||
|
db.load_delay = RACY_SLEEP_MS;
|
||||||
|
let db = Arc::new(db);
|
||||||
|
let slot = 10;
|
||||||
|
let bank_id = 10;
|
||||||
|
|
||||||
|
let lamports = 42;
|
||||||
|
let mut account = AccountSharedData::new(1, 0, AccountSharedData::default().owner());
|
||||||
|
account.set_lamports(lamports);
|
||||||
|
|
||||||
|
// Start up a thread to flush the accounts cache
|
||||||
|
let (flush_trial_start_sender, flush_trial_start_receiver) = unbounded();
|
||||||
|
let (flush_done_sender, flush_done_receiver) = unbounded();
|
||||||
|
let t_flush_cache = {
|
||||||
|
let db = db.clone();
|
||||||
|
std::thread::Builder::new()
|
||||||
|
.name("account-cache-flush".to_string())
|
||||||
|
.spawn(move || loop {
|
||||||
|
// Wait for the signal to start a trial
|
||||||
|
if flush_trial_start_receiver.recv().is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
db.flush_slot_cache(10, None::<&mut fn(&_, &_) -> bool>);
|
||||||
|
flush_done_sender.send(()).unwrap();
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start up a thread remove the slot
|
||||||
|
let (remove_trial_start_sender, remove_trial_start_receiver) = unbounded();
|
||||||
|
let (remove_done_sender, remove_done_receiver) = unbounded();
|
||||||
|
let t_remove = {
|
||||||
|
let db = db.clone();
|
||||||
|
std::thread::Builder::new()
|
||||||
|
.name("account-remove".to_string())
|
||||||
|
.spawn(move || loop {
|
||||||
|
// Wait for the signal to start a trial
|
||||||
|
if remove_trial_start_receiver.recv().is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
db.remove_unrooted_slots(&[(slot, bank_id)]);
|
||||||
|
remove_done_sender.send(()).unwrap();
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
|
let num_trials = 10;
|
||||||
|
for _ in 0..num_trials {
|
||||||
|
let pubkey = Pubkey::new_unique();
|
||||||
|
db.store_cached(slot, &[(&pubkey, &account)]);
|
||||||
|
// Wait for both threads to finish
|
||||||
|
flush_trial_start_sender.send(()).unwrap();
|
||||||
|
remove_trial_start_sender.send(()).unwrap();
|
||||||
|
let _ = flush_done_receiver.recv();
|
||||||
|
let _ = remove_done_receiver.recv();
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(flush_trial_start_sender);
|
||||||
|
drop(remove_trial_start_sender);
|
||||||
|
t_flush_cache.join().unwrap();
|
||||||
|
t_remove.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_cache_flush_remove_unrooted_race_multiple_slots() {
|
||||||
let caching_enabled = true;
|
let caching_enabled = true;
|
||||||
let db = AccountsDb::new_with_config(
|
let db = AccountsDb::new_with_config(
|
||||||
Vec::new(),
|
Vec::new(),
|
||||||
@ -11606,15 +11690,17 @@ pub mod tests {
|
|||||||
// in which case flush should ignore/move past the slot to be dumped
|
// in which case flush should ignore/move past the slot to be dumped
|
||||||
//
|
//
|
||||||
// Hence, we split into chunks to get the dumping of each chunk to race with the
|
// Hence, we split into chunks to get the dumping of each chunk to race with the
|
||||||
// flushes. If we were to dump the entire chunk at once, then this lessens the possibility
|
// flushes. If we were to dump the entire chunk at once, then this reduces the possibility
|
||||||
// of the flush occurring first since the dumping logic reserves all the slots it's about
|
// of the flush occurring first since the dumping logic reserves all the slots it's about
|
||||||
// to dump immediately.
|
// to dump immediately.
|
||||||
|
|
||||||
for chunks in slots_to_dump.chunks(slots_to_dump.len() / 2) {
|
for chunks in slots_to_dump.chunks(slots_to_dump.len() / 2) {
|
||||||
db.remove_unrooted_slots(chunks);
|
db.remove_unrooted_slots(chunks);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that all the slots in `slots_to_dump` were completely removed from the
|
// Check that all the slots in `slots_to_dump` were completely removed from the
|
||||||
// cache, storage, and index
|
// cache, storage, and index
|
||||||
|
|
||||||
for (slot, _) in slots_to_dump {
|
for (slot, _) in slots_to_dump {
|
||||||
assert!(db.storage.get_slot_storage_entries(*slot).is_none());
|
assert!(db.storage.get_slot_storage_entries(*slot).is_none());
|
||||||
assert!(db.accounts_cache.slot_cache(*slot).is_none());
|
assert!(db.accounts_cache.slot_cache(*slot).is_none());
|
||||||
@ -11626,6 +11712,7 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wait for flush to finish before starting next trial
|
// Wait for flush to finish before starting next trial
|
||||||
|
|
||||||
flush_done_receiver.recv().unwrap();
|
flush_done_receiver.recv().unwrap();
|
||||||
|
|
||||||
for (slot, bank_id) in slots_to_keep {
|
for (slot, bank_id) in slots_to_keep {
|
||||||
@ -11646,6 +11733,7 @@ pub mod tests {
|
|||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
drop(new_trial_start_sender);
|
drop(new_trial_start_sender);
|
||||||
t_flush_cache.join().unwrap();
|
t_flush_cache.join().unwrap();
|
||||||
|
|
||||||
t_spurious_signal.join().unwrap();
|
t_spurious_signal.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user