Cache account stores, flush from AccountsBackgroundService (#13140)

This commit is contained in:
carllin
2021-01-11 17:00:23 -08:00
committed by GitHub
parent 4a66e3eddc
commit 6dfad0652f
25 changed files with 2604 additions and 833 deletions

View File

@@ -34,6 +34,22 @@ pub type AccountMap<K, V> = BTreeMap<K, V>;
type AccountMapEntry<T> = Arc<AccountMapEntryInner<T>>;
pub trait IsCached {
fn is_cached(&self) -> bool;
}
impl IsCached for bool {
fn is_cached(&self) -> bool {
false
}
}
impl IsCached for u64 {
fn is_cached(&self) -> bool {
false
}
}
enum ScanTypes<R: RangeBounds<Pubkey>> {
Unindexed(Option<R>),
Indexed(IndexKey),
@@ -220,7 +236,7 @@ pub struct AccountsIndex<T> {
ongoing_scan_roots: RwLock<BTreeMap<Slot, u64>>,
}
impl<T: 'static + Clone> AccountsIndex<T> {
impl<T: 'static + Clone + IsCached> AccountsIndex<T> {
fn iter<R>(&self, range: Option<R>) -> AccountsIndexIterator<T>
where
R: RangeBounds<Pubkey>,
@@ -592,22 +608,24 @@ impl<T: 'static + Clone> AccountsIndex<T> {
pub fn purge_exact(
&self,
pubkey: &Pubkey,
slots: HashSet<Slot>,
slots_to_purge: &HashSet<Slot>,
reclaims: &mut SlotList<T>,
account_indexes: &HashSet<AccountIndex>,
) -> (SlotList<T>, bool) {
) -> bool {
let res = {
let mut write_account_map_entry = self.get_account_write_entry(pubkey).unwrap();
write_account_map_entry.slot_list_mut(|slot_list| {
let reclaims = slot_list
.iter()
.filter(|(slot, _)| slots.contains(&slot))
.cloned()
.collect();
slot_list.retain(|(slot, _)| !slots.contains(slot));
(reclaims, slot_list.is_empty())
slot_list.retain(|(slot, item)| {
let should_purge = slots_to_purge.contains(&slot);
if should_purge {
reclaims.push((*slot, item.clone()));
}
!should_purge
});
slot_list.is_empty()
})
};
self.purge_secondary_indexes_by_inner_key(pubkey, Some(&slots), account_indexes);
self.purge_secondary_indexes_by_inner_key(pubkey, Some(&slots_to_purge), account_indexes);
res
}
@@ -820,7 +838,7 @@ impl<T: 'static + Clone> AccountsIndex<T> {
let mut purged_slots: HashSet<Slot> = HashSet::new();
list.retain(|(slot, value)| {
let should_purge = Self::can_purge(max_root, *slot);
let should_purge = Self::can_purge(max_root, *slot) && !value.is_cached();
if should_purge {
reclaims.push((*slot, value.clone()));
purged_slots.insert(*slot);
@@ -831,6 +849,7 @@ impl<T: 'static + Clone> AccountsIndex<T> {
self.purge_secondary_indexes_by_inner_key(pubkey, Some(&purged_slots), account_indexes);
}
// `is_cached` closure is needed to work around the generic (`T`) indexed type.
pub fn clean_rooted_entries(
&self,
pubkey: &Pubkey,
@@ -851,28 +870,6 @@ impl<T: 'static + Clone> AccountsIndex<T> {
}
}
pub fn clean_unrooted_entries_by_slot(
&self,
purge_slot: Slot,
pubkey: &Pubkey,
reclaims: &mut SlotList<T>,
account_indexes: &HashSet<AccountIndex>,
) {
if let Some(mut locked_entry) = self.get_account_write_entry(pubkey) {
locked_entry.slot_list_mut(|slot_list| {
slot_list.retain(|(slot, entry)| {
if *slot == purge_slot {
reclaims.push((*slot, entry.clone()));
}
*slot != purge_slot
});
});
}
let purge_slot: HashSet<Slot> = vec![purge_slot].into_iter().collect();
self.purge_secondary_indexes_by_inner_key(pubkey, Some(&purge_slot), account_indexes);
}
pub fn can_purge(max_root: Slot, slot: Slot) -> bool {
slot < max_root
}
@@ -881,11 +878,24 @@ impl<T: 'static + Clone> AccountsIndex<T> {
self.roots_tracker.read().unwrap().roots.contains(&slot)
}
pub fn add_root(&self, slot: Slot) {
pub fn add_root(&self, slot: Slot, caching_enabled: bool) {
let mut w_roots_tracker = self.roots_tracker.write().unwrap();
w_roots_tracker.roots.insert(slot);
w_roots_tracker.uncleaned_roots.insert(slot);
w_roots_tracker.max_root = std::cmp::max(slot, w_roots_tracker.max_root);
// we delay cleaning until flushing!
if !caching_enabled {
w_roots_tracker.uncleaned_roots.insert(slot);
}
// `AccountsDb::flush_accounts_cache()` relies on roots being added in order
assert!(slot >= w_roots_tracker.max_root);
w_roots_tracker.max_root = slot;
}
pub fn add_uncleaned_roots<I>(&self, roots: I)
where
I: IntoIterator<Item = Slot>,
{
let mut w_roots_tracker = self.roots_tracker.write().unwrap();
w_roots_tracker.uncleaned_roots.extend(roots);
}
fn max_root(&self) -> Slot {
@@ -895,10 +905,29 @@ impl<T: 'static + Clone> AccountsIndex<T> {
/// Remove the slot when the storage for the slot is freed
/// Accounts no longer reference this slot.
pub fn clean_dead_slot(&self, slot: Slot) {
let mut w_roots_tracker = self.roots_tracker.write().unwrap();
w_roots_tracker.roots.remove(&slot);
w_roots_tracker.uncleaned_roots.remove(&slot);
w_roots_tracker.previous_uncleaned_roots.remove(&slot);
if self.is_root(slot) {
let (roots_len, uncleaned_roots_len, previous_uncleaned_roots_len) = {
let mut w_roots_tracker = self.roots_tracker.write().unwrap();
w_roots_tracker.roots.remove(&slot);
w_roots_tracker.uncleaned_roots.remove(&slot);
w_roots_tracker.previous_uncleaned_roots.remove(&slot);
(
w_roots_tracker.roots.len(),
w_roots_tracker.uncleaned_roots.len(),
w_roots_tracker.previous_uncleaned_roots.len(),
)
};
datapoint_info!(
"accounts_index_roots_len",
("roots_len", roots_len as i64, i64),
("uncleaned_roots_len", uncleaned_roots_len as i64, i64),
(
"previous_uncleaned_roots_len",
previous_uncleaned_roots_len as i64,
i64
),
);
}
}
pub fn reset_uncleaned_roots(&self, max_clean_root: Option<Slot>) -> HashSet<Slot> {
@@ -1136,7 +1165,7 @@ pub mod tests {
);
}
index.add_root(root_slot);
index.add_root(root_slot, false);
(index, pubkeys)
}
@@ -1272,7 +1301,7 @@ pub mod tests {
fn test_is_root() {
let index = AccountsIndex::<bool>::default();
assert!(!index.is_root(0));
index.add_root(0);
index.add_root(0, false);
assert!(index.is_root(0));
}
@@ -1292,7 +1321,7 @@ pub mod tests {
);
assert!(gc.is_empty());
index.add_root(0);
index.add_root(0, false);
let (list, idx) = index.get(&key.pubkey(), None, None).unwrap();
assert_eq!(list.slot_list()[idx], (0, true));
}
@@ -1300,8 +1329,8 @@ pub mod tests {
#[test]
fn test_clean_first() {
let index = AccountsIndex::<bool>::default();
index.add_root(0);
index.add_root(1);
index.add_root(0, false);
index.add_root(1, false);
index.clean_dead_slot(0);
assert!(index.is_root(1));
assert!(!index.is_root(0));
@@ -1311,8 +1340,8 @@ pub mod tests {
fn test_clean_last() {
//this behavior might be undefined, clean up should only occur on older slots
let index = AccountsIndex::<bool>::default();
index.add_root(0);
index.add_root(1);
index.add_root(0, false);
index.add_root(1, false);
index.clean_dead_slot(1);
assert!(!index.is_root(1));
assert!(index.is_root(0));
@@ -1322,8 +1351,8 @@ pub mod tests {
fn test_clean_and_unclean_slot() {
let index = AccountsIndex::<bool>::default();
assert_eq!(0, index.roots_tracker.read().unwrap().uncleaned_roots.len());
index.add_root(0);
index.add_root(1);
index.add_root(0, false);
index.add_root(1, false);
assert_eq!(2, index.roots_tracker.read().unwrap().uncleaned_roots.len());
assert_eq!(
@@ -1348,8 +1377,8 @@ pub mod tests {
.len()
);
index.add_root(2);
index.add_root(3);
index.add_root(2, false);
index.add_root(3, false);
assert_eq!(4, index.roots_tracker.read().unwrap().roots.len());
assert_eq!(2, index.roots_tracker.read().unwrap().uncleaned_roots.len());
assert_eq!(
@@ -1500,9 +1529,9 @@ pub mod tests {
true,
&mut gc,
);
index.add_root(0);
index.add_root(1);
index.add_root(3);
index.add_root(0, false);
index.add_root(1, false);
index.add_root(3, false);
index.upsert(
4,
&key.pubkey(),
@@ -1559,7 +1588,7 @@ pub mod tests {
let purges = index.purge_roots(&key.pubkey());
assert_eq!(purges, (vec![], false));
index.add_root(1);
index.add_root(1, false);
let purges = index.purge_roots(&key.pubkey());
assert_eq!(purges, (vec![(1, 10)], true));
@@ -1584,7 +1613,7 @@ pub mod tests {
assert!(index.latest_slot(None, &slot_slice, None).is_none());
// Given a root, should return the root
index.add_root(5);
index.add_root(5, false);
assert_eq!(index.latest_slot(None, &slot_slice, None).unwrap(), 1);
// Given a max_root == root, should still return the root
@@ -1666,7 +1695,12 @@ pub mod tests {
slots.len()
);
index.purge_exact(&account_key, slots.into_iter().collect(), account_index);
index.purge_exact(
&account_key,
&slots.into_iter().collect(),
&mut vec![],
account_index,
);
assert!(secondary_index.index.is_empty());
assert!(secondary_index.reverse_index.is_empty());
@@ -1716,9 +1750,9 @@ pub mod tests {
// Add a later root, earlier slots should be reclaimed
slot_list = vec![(1, true), (2, true), (5, true), (9, true)];
index.add_root(1);
index.add_root(1, false);
// Note 2 is not a root
index.add_root(5);
index.add_root(5, false);
reclaims = vec![];
index.purge_older_root_entries(
&Pubkey::default(),
@@ -1732,7 +1766,7 @@ pub mod tests {
// Add a later root that is not in the list, should not affect the outcome
slot_list = vec![(1, true), (2, true), (5, true), (9, true)];
index.add_root(6);
index.add_root(6, false);
reclaims = vec![];
index.purge_older_root_entries(
&Pubkey::default(),
@@ -1977,7 +2011,7 @@ pub mod tests {
assert!(secondary_index.get(&secondary_key1).is_empty());
assert_eq!(secondary_index.get(&secondary_key2), vec![account_key]);
// If another fork reintroduces secondary_key1, then it should be readded to the
// If another fork reintroduces secondary_key1, then it should be re-added to the
// index
let fork = slot + 1;
index.upsert(
@@ -1993,7 +2027,7 @@ pub mod tests {
// If we set a root at fork, and clean, then the secondary_key1 should no longer
// be findable
index.add_root(fork);
index.add_root(fork, false);
index
.get_account_write_entry(&account_key)
.unwrap()