handle race condition with flush and delete (#20255)

This commit is contained in:
Jeff Washington (jwash)
2021-09-27 16:55:15 -05:00
committed by GitHub
parent bbd2f9672d
commit f6812523cd

View File

@ -199,6 +199,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
// then they think the item is still in the index and can make modifications. // then they think the item is still in the index and can make modifications.
// We have to have a write lock to the map here, which means nobody else can get // We have to have a write lock to the map here, which means nobody else can get
// the arc, but someone may already have retreived a clone of it. // the arc, but someone may already have retreived a clone of it.
// account index in_mem flushing is one such possibility
self.delete_disk_key(occupied.key()); self.delete_disk_key(occupied.key());
self.stats().insert_or_delete_mem(false, self.bin); self.stats().insert_or_delete_mem(false, self.bin);
occupied.remove(); occupied.remove();
@ -617,6 +618,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
let mut updates = Vec::default(); let mut updates = Vec::default();
let m = Measure::start("flush_scan"); let m = Measure::start("flush_scan");
let mut flush_entries_updated_on_disk = 0;
// scan and update loop // scan and update loop
{ {
let map = self.map().read().unwrap(); let map = self.map().read().unwrap();
@ -636,25 +638,24 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
removes.push(*k); removes.push(*k);
} }
} }
} Self::update_time_stat(&self.stats().flush_scan_us, m);
Self::update_time_stat(&self.stats().flush_scan_us, m);
let mut flush_entries_updated_on_disk = 0; // happens inside of lock on in-mem cache. This is because of deleting items
// happens outside of lock on in-mem cache // it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
// it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. let m = Measure::start("flush_update");
let m = Measure::start("flush_update"); for (k, v) in updates.into_iter() {
for (k, v) in updates.into_iter() { if v.dirty() {
if v.dirty() { continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed
continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed }
flush_entries_updated_on_disk += 1;
disk.insert(&k, (&v.slot_list.read().unwrap(), v.ref_count()));
} }
flush_entries_updated_on_disk += 1; Self::update_time_stat(&self.stats().flush_update_us, m);
disk.insert(&k, (&v.slot_list.read().unwrap(), v.ref_count())); Self::update_stat(
&self.stats().flush_entries_updated_on_disk,
flush_entries_updated_on_disk,
);
} }
Self::update_time_stat(&self.stats().flush_update_us, m);
Self::update_stat(
&self.stats().flush_entries_updated_on_disk,
flush_entries_updated_on_disk,
);
let m = Measure::start("flush_remove"); let m = Measure::start("flush_remove");
if !self.flush_remove_from_cache(removes, current_age, startup) { if !self.flush_remove_from_cache(removes, current_age, startup) {