Remove bloat from secondary indexes (#17048)

This commit is contained in:
carllin
2021-05-12 15:29:30 -07:00
committed by GitHub
parent 597373f5fa
commit 239ab8799c
3 changed files with 259 additions and 460 deletions

View File

@@ -1,157 +1,121 @@
use crate::contains::Contains;
use dashmap::{mapref::entry::Entry::Occupied, DashMap};
use log::*;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use solana_sdk::pubkey::Pubkey;
use std::{
borrow::Borrow,
collections::{hash_map, HashMap, HashSet},
collections::HashSet,
fmt::Debug,
sync::{Arc, RwLock},
sync::{
atomic::{AtomicU64, Ordering},
RwLock,
},
};
pub type SecondaryReverseIndexEntry = RwLock<HashMap<Slot, Pubkey>>;
// The only cases where an inner key should map to a different outer key is
// if the key had different account data for the indexed key across different
// slots. As this is rare, it should be ok to use a Vec here over a HashSet, even
// though we are running some key existence checks.
pub type SecondaryReverseIndexEntry = RwLock<Vec<Pubkey>>;
pub trait SecondaryIndexEntry: Debug {
fn get_or_create(&self, key: &Pubkey, f: &dyn Fn(&RwLock<HashSet<Slot>>));
fn get<T>(&self, key: &Pubkey, f: &dyn Fn(Option<&RwLock<HashSet<Slot>>>) -> T) -> T;
fn remove_key_if_empty(&self, key: &Pubkey);
fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64);
// Removes a value from the set. Returns whether the value was present in the set.
fn remove_inner_key(&self, key: &Pubkey) -> bool;
fn is_empty(&self) -> bool;
fn keys(&self) -> Vec<Pubkey>;
fn len(&self) -> usize;
}
#[derive(Debug, Default)]
pub struct SecondaryIndexStats {
last_report: AtomicU64,
num_inner_keys: AtomicU64,
}
#[derive(Debug, Default)]
pub struct DashMapSecondaryIndexEntry {
pubkey_to_slot_set: DashMap<Pubkey, RwLock<HashSet<Slot>>>,
account_keys: DashMap<Pubkey, ()>,
}
impl SecondaryIndexEntry for DashMapSecondaryIndexEntry {
fn get_or_create(&self, key: &Pubkey, f: &dyn Fn(&RwLock<HashSet<Slot>>)) {
let slot_set = self.pubkey_to_slot_set.get(key).unwrap_or_else(|| {
self.pubkey_to_slot_set
.entry(*key)
.or_insert(RwLock::new(HashSet::new()))
.downgrade()
});
f(&slot_set)
}
fn get<T>(&self, key: &Pubkey, f: &dyn Fn(Option<&RwLock<HashSet<Slot>>>) -> T) -> T {
let slot_set = self.pubkey_to_slot_set.get(key);
f(slot_set.as_ref().map(|entry_ref| entry_ref.value()))
}
fn remove_key_if_empty(&self, key: &Pubkey) {
if let Occupied(key_entry) = self.pubkey_to_slot_set.entry(*key) {
// Delete the `key` if the slot set is empty
let slot_set = key_entry.get();
// Write lock on `key_entry` above through the `entry`
// means nobody else has access to this lock at this time,
// so this check for empty -> remove() is atomic
if slot_set.read().unwrap().is_empty() {
key_entry.remove();
}
fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64) {
if self.account_keys.get(key).is_none() {
self.account_keys.entry(*key).or_insert_with(|| {
inner_keys_count.fetch_add(1, Ordering::Relaxed);
});
}
}
fn remove_inner_key(&self, key: &Pubkey) -> bool {
self.account_keys.remove(key).is_some()
}
fn is_empty(&self) -> bool {
self.pubkey_to_slot_set.is_empty()
self.account_keys.is_empty()
}
fn keys(&self) -> Vec<Pubkey> {
self.pubkey_to_slot_set
self.account_keys
.iter()
.map(|entry_ref| *entry_ref.key())
.collect()
}
fn len(&self) -> usize {
self.pubkey_to_slot_set.len()
self.account_keys.len()
}
}
#[derive(Debug, Default)]
pub struct RwLockSecondaryIndexEntry {
pubkey_to_slot_set: RwLock<HashMap<Pubkey, Arc<RwLock<HashSet<Slot>>>>>,
account_keys: RwLock<HashSet<Pubkey>>,
}
impl SecondaryIndexEntry for RwLockSecondaryIndexEntry {
fn get_or_create(&self, key: &Pubkey, f: &dyn Fn(&RwLock<HashSet<Slot>>)) {
let slot_set = self.pubkey_to_slot_set.read().unwrap().get(key).cloned();
let slot_set = {
if let Some(slot_set) = slot_set {
slot_set
} else {
self.pubkey_to_slot_set
.write()
.unwrap()
.entry(*key)
.or_insert_with(|| Arc::new(RwLock::new(HashSet::new())))
.clone()
}
fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64) {
let exists = self.account_keys.read().unwrap().contains(key);
if !exists {
let mut w_account_keys = self.account_keys.write().unwrap();
w_account_keys.insert(*key);
inner_keys_count.fetch_add(1, Ordering::Relaxed);
};
f(&slot_set)
}
fn get<T>(&self, key: &Pubkey, f: &dyn Fn(Option<&RwLock<HashSet<Slot>>>) -> T) -> T {
let slot_set = self.pubkey_to_slot_set.read().unwrap().get(key).cloned();
f(slot_set.as_deref())
}
fn remove_key_if_empty(&self, key: &Pubkey) {
if let hash_map::Entry::Occupied(key_entry) =
self.pubkey_to_slot_set.write().unwrap().entry(*key)
{
// Delete the `key` if the slot set is empty
let slot_set = key_entry.get();
// Write lock on `key_entry` above through the `entry`
// means nobody else has access to this lock at this time,
// so this check for empty -> remove() is atomic
if slot_set.read().unwrap().is_empty() {
key_entry.remove();
}
}
fn remove_inner_key(&self, key: &Pubkey) -> bool {
self.account_keys.write().unwrap().remove(key)
}
fn is_empty(&self) -> bool {
self.pubkey_to_slot_set.read().unwrap().is_empty()
self.account_keys.read().unwrap().is_empty()
}
fn keys(&self) -> Vec<Pubkey> {
self.pubkey_to_slot_set
.read()
.unwrap()
.keys()
.cloned()
.collect()
self.account_keys.read().unwrap().iter().cloned().collect()
}
fn len(&self) -> usize {
self.pubkey_to_slot_set.read().unwrap().len()
self.account_keys.read().unwrap().len()
}
}
#[derive(Debug, Default)]
pub struct SecondaryIndex<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send> {
metrics_name: &'static str,
// Map from index keys to index values
pub index: DashMap<Pubkey, SecondaryIndexEntryType>,
// Map from index values back to index keys, used for cleanup.
// Alternative is to store Option<Pubkey> in each AccountInfo in the
// AccountsIndex if something is an SPL account with a mint, but then
// every AccountInfo would have to allocate `Option<Pubkey>`
pub reverse_index: DashMap<Pubkey, SecondaryReverseIndexEntry>,
stats: SecondaryIndexStats,
}
impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
SecondaryIndex<SecondaryIndexEntryType>
{
pub fn insert(&self, key: &Pubkey, inner_key: &Pubkey, slot: Slot) {
pub fn new(metrics_name: &'static str) -> Self {
Self {
metrics_name,
..Self::default()
}
}
pub fn insert(&self, key: &Pubkey, inner_key: &Pubkey) {
{
let pubkeys_map = self.index.get(key).unwrap_or_else(|| {
self.index
@@ -160,92 +124,70 @@ impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
.downgrade()
});
pubkeys_map.get_or_create(inner_key, &|slots_set: &RwLock<HashSet<Slot>>| {
let contains_key = slots_set.read().unwrap().contains(&slot);
if !contains_key {
slots_set.write().unwrap().insert(slot);
}
});
pubkeys_map.insert_if_not_exists(inner_key, &self.stats.num_inner_keys);
}
let prev_key = {
let slots_map = self.reverse_index.get(inner_key).unwrap_or_else(|| {
self.reverse_index
.entry(*inner_key)
.or_insert(RwLock::new(HashMap::new()))
.downgrade()
});
let should_insert = {
// Most of the time, key should already exist and match
// the one in the update
if let Some(existing_key) = slots_map.read().unwrap().get(&slot) {
existing_key != key
} else {
// If there is no key yet, then insert
true
}
};
if should_insert {
slots_map.write().unwrap().insert(slot, *key)
} else {
None
}
};
let outer_keys = self.reverse_index.get(inner_key).unwrap_or_else(|| {
self.reverse_index
.entry(*inner_key)
.or_insert(RwLock::new(Vec::with_capacity(1)))
.downgrade()
});
if let Some(prev_key) = prev_key {
// If the inner key was moved to a different primary key, remove
// the previous index entry.
// Check is necessary because another thread's writes could feasibly be
// interleaved between `should_insert = { ... slots_map.get(...) ... }` and
// `prev_key = { ... slots_map.insert(...) ... }`
// Currently this isn't possible due to current AccountsIndex's (pubkey, slot)-per-thread
// exclusive-locking, but check is here for future-proofing a more relaxed implementation
if prev_key != *key {
self.remove_index_entries(&prev_key, inner_key, &[slot]);
let should_insert = !outer_keys.read().unwrap().contains(&key);
if should_insert {
let mut w_outer_keys = outer_keys.write().unwrap();
if !w_outer_keys.contains(&key) {
w_outer_keys.push(*key);
}
}
let now = solana_sdk::timing::timestamp();
let last = self.stats.last_report.load(Ordering::Relaxed);
let should_report = now.saturating_sub(last) > 1000
&& self.stats.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last);
if should_report {
datapoint_info!(
self.metrics_name,
("num_secondary_keys", self.index.len() as i64, i64),
(
"num_inner_keys",
self.stats.num_inner_keys.load(Ordering::Relaxed) as i64,
i64
),
(
"num_reverse_index_keys",
self.reverse_index.len() as i64,
i64
),
);
}
}
pub fn remove_index_entries(&self, key: &Pubkey, inner_key: &Pubkey, slots: &[Slot]) {
let is_key_empty = if let Some(inner_key_map) = self.index.get(&key) {
// Delete the slot from the slot set
let is_inner_key_empty =
inner_key_map.get(&inner_key, &|slot_set: Option<&RwLock<HashSet<Slot>>>| {
if let Some(slot_set) = slot_set {
let mut w_slot_set = slot_set.write().unwrap();
for slot in slots.iter() {
let is_present = w_slot_set.remove(slot);
if !is_present {
warn!("Reverse index is missing previous entry for key {}, inner_key: {}, slot: {}",
key, inner_key, slot);
}
}
w_slot_set.is_empty()
} else {
false
}
});
// Check if `key` is empty
if is_inner_key_empty {
// Write lock on `inner_key_entry` above through the `entry`
// means nobody else has access to this lock at this time,
// so this check for empty -> remove() is atomic
inner_key_map.remove_key_if_empty(inner_key);
inner_key_map.is_empty()
} else {
false
}
} else {
false
// Only safe to call from `remove_by_inner_key()` due to asserts
fn remove_index_entries(&self, outer_key: &Pubkey, removed_inner_key: &Pubkey) {
let is_outer_key_empty = {
let inner_key_map = self
.index
.get_mut(&outer_key)
.expect("If we're removing a key, then it must have an entry in the map");
// If we deleted a pubkey from the reverse_index, then the corresponding entry
// better exist in this index as well or the two indexes are out of sync!
assert!(inner_key_map.value().remove_inner_key(&removed_inner_key));
inner_key_map.is_empty()
};
// Delete the `key` if the set of inner keys is empty
if is_key_empty {
if is_outer_key_empty {
// Other threads may have interleaved writes to this `key`,
// so double-check again for its emptiness
if let Occupied(key_entry) = self.index.entry(*key) {
if let Occupied(key_entry) = self.index.entry(*outer_key) {
if key_entry.get().is_empty() {
key_entry.remove();
}
@@ -253,70 +195,31 @@ impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
}
}
// Specifying `slots_to_remove` == Some will only remove keys for those specific slots
// found for the `inner_key` in the reverse index. Otherwise, passing `None`
// will remove all keys that are found for the `inner_key` in the reverse index.
// Note passing `None` is dangerous unless you're sure there's no other competing threads
// writing updates to the index for this Pubkey at the same time!
pub fn remove_by_inner_key<'a, C>(&'a self, inner_key: &Pubkey, slots_to_remove: Option<&'a C>)
where
C: Contains<'a, Slot>,
{
pub fn remove_by_inner_key(&self, inner_key: &Pubkey) {
// Save off which keys in `self.index` had slots removed so we can remove them
// after we purge the reverse index
let mut key_to_removed_slots: HashMap<Pubkey, Vec<Slot>> = HashMap::new();
let mut removed_outer_keys: HashSet<Pubkey> = HashSet::new();
// Check if the entry for `inner_key` in the reverse index is empty
// and can be removed
let needs_remove = {
if let Some(slots_to_remove) = slots_to_remove {
self.reverse_index
.get(inner_key)
.map(|slots_map| {
// Ideally we use a concurrent map here as well to prevent clean
// from blocking writes, but memory usage of DashMap is high
let mut w_slots_map = slots_map.value().write().unwrap();
for slot in slots_to_remove.contains_iter() {
if let Some(removed_key) = w_slots_map.remove(slot.borrow()) {
key_to_removed_slots
.entry(removed_key)
.or_default()
.push(*slot.borrow());
}
}
w_slots_map.is_empty()
})
.unwrap_or(false)
} else {
if let Some((_, removed_slot_map)) = self.reverse_index.remove(inner_key) {
for (slot, removed_key) in removed_slot_map.into_inner().unwrap().into_iter() {
key_to_removed_slots
.entry(removed_key)
.or_default()
.push(slot);
}
}
// We just removed the key, no need to remove it again
false
}
};
if needs_remove {
// Other threads may have interleaved writes to this `inner_key`, between
// releasing the `self.reverse_index.get(inner_key)` lock and now,
// so double-check again for emptiness
if let Occupied(slot_map) = self.reverse_index.entry(*inner_key) {
if slot_map.get().read().unwrap().is_empty() {
slot_map.remove();
}
if let Some((_, outer_keys_set)) = self.reverse_index.remove(inner_key) {
for removed_outer_key in outer_keys_set.into_inner().unwrap().into_iter() {
removed_outer_keys.insert(removed_outer_key);
}
}
// Remove this value from those keys
for (key, slots) in key_to_removed_slots {
self.remove_index_entries(&key, inner_key, &slots);
for outer_key in &removed_outer_keys {
self.remove_index_entries(outer_key, inner_key);
}
// Safe to `fetch_sub()` here because a dead key cannot be removed more than once,
// and the `num_inner_keys` must have been incremented by exactly removed_outer_keys.len()
// in previous unique insertions of `inner_key` into `self.index` for each key
// in `removed_outer_keys`
self.stats
.num_inner_keys
.fetch_sub(removed_outer_keys.len() as u64, Ordering::Relaxed);
}
pub fn get(&self, key: &Pubkey) -> Vec<Pubkey> {