use crate::contains::Contains; use dashmap::{mapref::entry::Entry::Occupied, DashMap}; use log::*; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use std::{ borrow::Borrow, collections::{hash_map, HashMap, HashSet}, fmt::Debug, sync::{Arc, RwLock}, }; pub type SecondaryReverseIndexEntry = RwLock>; pub trait SecondaryIndexEntry: Debug { fn get_or_create(&self, key: &Pubkey, f: &dyn Fn(&RwLock>)); fn get(&self, key: &Pubkey, f: &dyn Fn(Option<&RwLock>>) -> T) -> T; fn remove_key_if_empty(&self, key: &Pubkey); fn is_empty(&self) -> bool; fn keys(&self) -> Vec; fn len(&self) -> usize; } #[derive(Debug, Default)] pub struct DashMapSecondaryIndexEntry { pubkey_to_slot_set: DashMap>>, } impl SecondaryIndexEntry for DashMapSecondaryIndexEntry { fn get_or_create(&self, key: &Pubkey, f: &dyn Fn(&RwLock>)) { let slot_set = self.pubkey_to_slot_set.get(key).unwrap_or_else(|| { self.pubkey_to_slot_set .entry(*key) .or_insert(RwLock::new(HashSet::new())) .downgrade() }); f(&slot_set) } fn get(&self, key: &Pubkey, f: &dyn Fn(Option<&RwLock>>) -> T) -> T { let slot_set = self.pubkey_to_slot_set.get(key); f(slot_set.as_ref().map(|entry_ref| entry_ref.value())) } fn remove_key_if_empty(&self, key: &Pubkey) { if let Occupied(key_entry) = self.pubkey_to_slot_set.entry(*key) { // Delete the `key` if the slot set is empty let slot_set = key_entry.get(); // Write lock on `key_entry` above through the `entry` // means nobody else has access to this lock at this time, // so this check for empty -> remove() is atomic if slot_set.read().unwrap().is_empty() { key_entry.remove(); } } } fn is_empty(&self) -> bool { self.pubkey_to_slot_set.is_empty() } fn keys(&self) -> Vec { self.pubkey_to_slot_set .iter() .map(|entry_ref| *entry_ref.key()) .collect() } fn len(&self) -> usize { self.pubkey_to_slot_set.len() } } #[derive(Debug, Default)] pub struct RwLockSecondaryIndexEntry { pubkey_to_slot_set: RwLock>>>>, } impl SecondaryIndexEntry for RwLockSecondaryIndexEntry { fn get_or_create(&self, key: &Pubkey, f: &dyn Fn(&RwLock>)) { let slot_set = self.pubkey_to_slot_set.read().unwrap().get(key).cloned(); let slot_set = { if let Some(slot_set) = slot_set { slot_set } else { self.pubkey_to_slot_set .write() .unwrap() .entry(*key) .or_insert_with(|| Arc::new(RwLock::new(HashSet::new()))) .clone() } }; f(&slot_set) } fn get(&self, key: &Pubkey, f: &dyn Fn(Option<&RwLock>>) -> T) -> T { let slot_set = self.pubkey_to_slot_set.read().unwrap().get(key).cloned(); f(slot_set.as_deref()) } fn remove_key_if_empty(&self, key: &Pubkey) { if let hash_map::Entry::Occupied(key_entry) = self.pubkey_to_slot_set.write().unwrap().entry(*key) { // Delete the `key` if the slot set is empty let slot_set = key_entry.get(); // Write lock on `key_entry` above through the `entry` // means nobody else has access to this lock at this time, // so this check for empty -> remove() is atomic if slot_set.read().unwrap().is_empty() { key_entry.remove(); } } } fn is_empty(&self) -> bool { self.pubkey_to_slot_set.read().unwrap().is_empty() } fn keys(&self) -> Vec { self.pubkey_to_slot_set .read() .unwrap() .keys() .cloned() .collect() } fn len(&self) -> usize { self.pubkey_to_slot_set.read().unwrap().len() } } #[derive(Debug, Default)] pub struct SecondaryIndex { // Map from index keys to index values pub index: DashMap, // Map from index values back to index keys, used for cleanup. // Alternative is to store Option in each AccountInfo in the // AccountsIndex if something is an SPL account with a mint, but then // every AccountInfo would have to allocate `Option` pub reverse_index: DashMap, } impl SecondaryIndex { pub fn insert(&self, key: &Pubkey, inner_key: &Pubkey, slot: Slot) { { let pubkeys_map = self.index.get(key).unwrap_or_else(|| { self.index .entry(*key) .or_insert(SecondaryIndexEntryType::default()) .downgrade() }); pubkeys_map.get_or_create(inner_key, &|slots_set: &RwLock>| { let contains_key = slots_set.read().unwrap().contains(&slot); if !contains_key { slots_set.write().unwrap().insert(slot); } }); } let prev_key = { let slots_map = self.reverse_index.get(inner_key).unwrap_or_else(|| { self.reverse_index .entry(*inner_key) .or_insert(RwLock::new(HashMap::new())) .downgrade() }); let should_insert = { // Most of the time, key should already exist and match // the one in the update if let Some(existing_key) = slots_map.read().unwrap().get(&slot) { existing_key != key } else { // If there is no key yet, then insert true } }; if should_insert { slots_map.write().unwrap().insert(slot, *key) } else { None } }; if let Some(prev_key) = prev_key { // If the inner key was moved to a different primary key, remove // the previous index entry. // Check is necessary because another thread's writes could feasibly be // interleaved between `should_insert = { ... slots_map.get(...) ... }` and // `prev_key = { ... slots_map.insert(...) ... }` // Currently this isn't possible due to current AccountsIndex's (pubkey, slot)-per-thread // exclusive-locking, but check is here for future-proofing a more relaxed implementation if prev_key != *key { self.remove_index_entries(&prev_key, inner_key, &[slot]); } } } pub fn remove_index_entries(&self, key: &Pubkey, inner_key: &Pubkey, slots: &[Slot]) { let is_key_empty = if let Some(inner_key_map) = self.index.get(&key) { // Delete the slot from the slot set let is_inner_key_empty = inner_key_map.get(&inner_key, &|slot_set: Option<&RwLock>>| { if let Some(slot_set) = slot_set { let mut w_slot_set = slot_set.write().unwrap(); for slot in slots.iter() { let is_present = w_slot_set.remove(slot); if !is_present { warn!("Reverse index is missing previous entry for key {}, inner_key: {}, slot: {}", key, inner_key, slot); } } w_slot_set.is_empty() } else { false } }); // Check if `key` is empty if is_inner_key_empty { // Write lock on `inner_key_entry` above through the `entry` // means nobody else has access to this lock at this time, // so this check for empty -> remove() is atomic inner_key_map.remove_key_if_empty(inner_key); inner_key_map.is_empty() } else { false } } else { false }; // Delete the `key` if the set of inner keys is empty if is_key_empty { // Other threads may have interleaved writes to this `key`, // so double-check again for its emptiness if let Occupied(key_entry) = self.index.entry(*key) { if key_entry.get().is_empty() { key_entry.remove(); } } } } // Specifying `slots_to_remove` == Some will only remove keys for those specific slots // found for the `inner_key` in the reverse index. Otherwise, passing `None` // will remove all keys that are found for the `inner_key` in the reverse index. // Note passing `None` is dangerous unless you're sure there's no other competing threads // writing updates to the index for this Pubkey at the same time! pub fn remove_by_inner_key<'a, C>(&'a self, inner_key: &Pubkey, slots_to_remove: Option<&'a C>) where C: Contains<'a, Slot>, { // Save off which keys in `self.index` had slots removed so we can remove them // after we purge the reverse index let mut key_to_removed_slots: HashMap> = HashMap::new(); // Check if the entry for `inner_key` in the reverse index is empty // and can be removed let needs_remove = { if let Some(slots_to_remove) = slots_to_remove { self.reverse_index .get(inner_key) .map(|slots_map| { // Ideally we use a concurrent map here as well to prevent clean // from blocking writes, but memory usage of DashMap is high let mut w_slots_map = slots_map.value().write().unwrap(); for slot in slots_to_remove.contains_iter() { if let Some(removed_key) = w_slots_map.remove(slot.borrow()) { key_to_removed_slots .entry(removed_key) .or_default() .push(*slot.borrow()); } } w_slots_map.is_empty() }) .unwrap_or(false) } else { if let Some((_, removed_slot_map)) = self.reverse_index.remove(inner_key) { for (slot, removed_key) in removed_slot_map.into_inner().unwrap().into_iter() { key_to_removed_slots .entry(removed_key) .or_default() .push(slot); } } // We just removed the key, no need to remove it again false } }; if needs_remove { // Other threads may have interleaved writes to this `inner_key`, between // releasing the `self.reverse_index.get(inner_key)` lock and now, // so double-check again for emptiness if let Occupied(slot_map) = self.reverse_index.entry(*inner_key) { if slot_map.get().read().unwrap().is_empty() { slot_map.remove(); } } } // Remove this value from those keys for (key, slots) in key_to_removed_slots { self.remove_index_entries(&key, inner_key, &slots); } } pub fn get(&self, key: &Pubkey) -> Vec { if let Some(inner_keys_map) = self.index.get(key) { inner_keys_map.keys() } else { vec![] } } }