Add secondary indexes (#14212) (#14382)

(cherry picked from commit 5affd8aa72)

Co-authored-by: carllin <wumu727@gmail.com>
This commit is contained in:
mergify[bot]
2021-01-01 07:42:47 +00:00
committed by GitHub
parent 46f9822d62
commit f6de92c346
20 changed files with 1875 additions and 234 deletions

View File

@@ -19,7 +19,7 @@
//! commit for each slot entry would be indexed.
use crate::{
accounts_index::{AccountsIndex, Ancestors, SlotList, SlotSlice},
accounts_index::{AccountIndex, AccountsIndex, Ancestors, IndexKey, SlotList, SlotSlice},
append_vec::{AppendVec, StoredAccount, StoredMeta},
};
use blake3::traits::digest::Digest;
@@ -41,7 +41,7 @@ use solana_sdk::{
use std::convert::TryFrom;
use std::{
boxed::Box,
collections::{HashMap, HashSet},
collections::{BTreeMap, HashMap, HashSet},
convert::TryInto,
io::{Error as IOError, Result as IOResult},
ops::RangeBounds,
@@ -467,6 +467,8 @@ pub struct AccountsDB {
stats: AccountsStats,
pub cluster_type: Option<ClusterType>,
pub account_indexes: HashSet<AccountIndex>,
}
#[derive(Debug, Default)]
@@ -545,17 +547,27 @@ impl Default for AccountsDB {
frozen_accounts: HashMap::new(),
stats: AccountsStats::default(),
cluster_type: None,
account_indexes: HashSet::new(),
}
}
}
impl AccountsDB {
pub fn new(paths: Vec<PathBuf>, cluster_type: &ClusterType) -> Self {
AccountsDB::new_with_indexes(paths, cluster_type, HashSet::new())
}
pub fn new_with_indexes(
paths: Vec<PathBuf>,
cluster_type: &ClusterType,
account_indexes: HashSet<AccountIndex>,
) -> Self {
let new = if !paths.is_empty() {
Self {
paths,
temp_paths: None,
cluster_type: Some(*cluster_type),
account_indexes,
..Self::default()
}
} else {
@@ -566,6 +578,7 @@ impl AccountsDB {
paths,
temp_paths: Some(temp_dirs),
cluster_type: Some(*cluster_type),
account_indexes,
..Self::default()
}
};
@@ -638,6 +651,7 @@ impl AccountsDB {
&pubkey,
&mut reclaims,
max_clean_root,
&self.account_indexes,
);
}
reclaims
@@ -745,7 +759,9 @@ impl AccountsDB {
let mut dead_keys = Vec::new();
for (pubkey, slots_set) in pubkey_to_slot_set {
let (new_reclaims, is_empty) = self.accounts_index.purge_exact(&pubkey, slots_set);
let (new_reclaims, is_empty) =
self.accounts_index
.purge_exact(&pubkey, slots_set, &self.account_indexes);
if is_empty {
dead_keys.push(pubkey);
}
@@ -918,7 +934,8 @@ impl AccountsDB {
let (reclaims, dead_keys) = self.purge_keys_exact(pubkey_to_slot_set);
self.accounts_index.handle_dead_keys(&dead_keys);
self.accounts_index
.handle_dead_keys(&dead_keys, &self.account_indexes);
self.handle_reclaims(&reclaims, None, false, None);
@@ -1389,6 +1406,30 @@ impl AccountsDB {
collector
}
pub fn index_scan_accounts<F, A>(
&self,
ancestors: &Ancestors,
index_key: IndexKey,
scan_func: F,
) -> A
where
F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>),
A: Default,
{
let mut collector = A::default();
self.accounts_index.index_scan_accounts(
ancestors,
index_key,
|pubkey, (account_info, slot)| {
let account_slot = self
.get_account_from_storage(slot, account_info)
.map(|account| (pubkey, account, slot));
scan_func(&mut collector, account_slot)
},
);
collector
}
/// Scan a specific slot through all the account storage in parallel
pub fn scan_account_storage<F, B>(&self, slot: Slot, scan_func: F) -> Vec<B>
where
@@ -1414,8 +1455,8 @@ impl AccountsDB {
.map(|storage| {
let accounts = storage.accounts.accounts(0);
let mut retval = B::default();
accounts.iter().for_each(|stored_account| {
scan_func(stored_account, storage.append_vec_id(), &mut retval)
accounts.into_iter().for_each(|stored_account| {
scan_func(&stored_account, storage.append_vec_id(), &mut retval)
});
retval
})
@@ -1847,6 +1888,7 @@ impl AccountsDB {
remove_slot,
pubkey,
&mut reclaims,
&self.account_indexes,
);
}
}
@@ -2512,8 +2554,15 @@ impl AccountsDB {
let mut reclaims = SlotList::<AccountInfo>::with_capacity(infos.len() * 2);
for (info, pubkey_account) in infos.into_iter().zip(accounts.iter()) {
let pubkey = pubkey_account.0;
self.accounts_index
.upsert(slot, pubkey, info, &mut reclaims);
self.accounts_index.upsert(
slot,
pubkey,
&pubkey_account.1.owner,
&pubkey_account.1.data,
&self.account_indexes,
info,
&mut reclaims,
);
}
reclaims
}
@@ -2926,6 +2975,8 @@ impl AccountsDB {
}
pub fn generate_index(&self) {
type AccountsMap<'a> =
DashMap<Pubkey, Mutex<BTreeMap<u64, (AppendVecId, StoredAccount<'a>)>>>;
let mut slots = self.storage.all_slots();
#[allow(clippy::stable_sort_primitive)]
slots.sort();
@@ -2937,52 +2988,68 @@ impl AccountsDB {
info!("generating index: {}/{} slots...", index, slots.len());
last_log_update = now;
}
let accumulator: Vec<HashMap<Pubkey, Vec<(u64, AccountInfo)>>> = self
.scan_account_storage_inner(
*slot,
|stored_account: &StoredAccount,
store_id: AppendVecId,
accum: &mut HashMap<Pubkey, Vec<(u64, AccountInfo)>>| {
let accounts_map: AccountsMap = AccountsMap::new();
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_stores(*slot)
.map(|res| res.read().unwrap().values().cloned().collect())
.unwrap_or_default();
self.thread_pool.install(|| {
storage_maps.par_iter().for_each(|storage| {
let accounts = storage.accounts.accounts(0);
accounts.into_iter().for_each(|stored_account| {
let entry = accounts_map
.get(&stored_account.meta.pubkey)
.unwrap_or_else(|| {
accounts_map
.entry(stored_account.meta.pubkey)
.or_insert(Mutex::new(BTreeMap::new()))
.downgrade()
});
assert!(
// There should only be one update per write version for a specific slot
// and account
entry
.lock()
.unwrap()
.insert(
stored_account.meta.write_version,
(storage.append_vec_id(), stored_account)
)
.is_none()
);
})
});
});
// Need to restore indexes even with older write versions which may
// be shielding other accounts. When they are then purged, the
// original non-shielded account value will be visible when the account
// is restored from the append-vec
if !accounts_map.is_empty() {
let mut _reclaims: Vec<(u64, AccountInfo)> = vec![];
for (pubkey, account_infos) in accounts_map.into_iter() {
for (_, (store_id, stored_account)) in
account_infos.into_inner().unwrap().into_iter()
{
let account_info = AccountInfo {
store_id,
offset: stored_account.offset,
lamports: stored_account.account_meta.lamports,
};
let entry = accum
.entry(stored_account.meta.pubkey)
.or_insert_with(Vec::new);
entry.push((stored_account.meta.write_version, account_info));
},
);
let mut accounts_map: HashMap<Pubkey, Vec<(u64, AccountInfo)>> = HashMap::new();
for accumulator_entry in accumulator.iter() {
for (pubkey, storage_entry) in accumulator_entry {
let entry = accounts_map.entry(*pubkey).or_insert_with(Vec::new);
entry.extend(storage_entry.iter().cloned());
}
}
// Need to restore indexes even with older write versions which may
// be shielding other accounts. When they are then purged, the
// original non-shielded account value will be visible when the account
// is restored from the append-vec
if !accumulator.is_empty() {
let mut _reclaims: Vec<(u64, AccountInfo)> = vec![];
for (pubkey, account_infos) in accounts_map.iter_mut() {
account_infos.sort_by(|a, b| a.0.cmp(&b.0));
for (_, account_info) in account_infos {
self.accounts_index.upsert(
*slot,
pubkey,
account_info.clone(),
&pubkey,
&stored_account.account_meta.owner,
&stored_account.data,
&self.account_indexes,
account_info,
&mut _reclaims,
);
}
}
}
}
// Need to add these last, otherwise older updates will be cleaned
for slot in slots {
self.accounts_index.add_root(slot);
@@ -3085,9 +3152,13 @@ impl AccountsDB {
pub mod tests {
// TODO: all the bank tests are bank specific, issue: 2194
use super::*;
use crate::{accounts_index::RefCount, append_vec::AccountMeta};
use crate::{
accounts_index::tests::*, accounts_index::RefCount, append_vec::AccountMeta,
inline_spl_token_v2_0,
};
use assert_matches::assert_matches;
use rand::{thread_rng, Rng};
use solana_sdk::pubkey::PUBKEY_BYTES;
use solana_sdk::{account::Account, hash::HASH_BYTES};
use std::{fs, iter::FromIterator, str::FromStr};
@@ -3884,11 +3955,26 @@ pub mod tests {
fn test_clean_old_with_both_normal_and_zero_lamport_accounts() {
solana_logger::setup();
let accounts = AccountsDB::new(Vec::new(), &ClusterType::Development);
let accounts = AccountsDB::new_with_indexes(
Vec::new(),
&ClusterType::Development,
spl_token_mint_index_enabled(),
);
let pubkey1 = solana_sdk::pubkey::new_rand();
let pubkey2 = solana_sdk::pubkey::new_rand();
let normal_account = Account::new(1, 0, &Account::default().owner);
let zero_account = Account::new(0, 0, &Account::default().owner);
// Set up account to be added to secondary index
let mint_key = Pubkey::new_unique();
let mut account_data_with_mint =
vec![0; inline_spl_token_v2_0::state::Account::get_packed_len()];
account_data_with_mint[..PUBKEY_BYTES].clone_from_slice(&(mint_key.clone().to_bytes()));
let mut normal_account = Account::new(1, 0, &Account::default().owner);
normal_account.owner = inline_spl_token_v2_0::id();
normal_account.data = account_data_with_mint.clone();
let mut zero_account = Account::new(0, 0, &Account::default().owner);
zero_account.owner = inline_spl_token_v2_0::id();
zero_account.data = account_data_with_mint;
//store an account
accounts.store(0, &[(&pubkey1, &normal_account)]);
@@ -3907,6 +3993,19 @@ pub mod tests {
assert_eq!(accounts.alive_account_count_in_store(1), 1);
assert_eq!(accounts.alive_account_count_in_store(2), 1);
// Secondary index should still find both pubkeys
let mut found_accounts = HashSet::new();
accounts.accounts_index.index_scan_accounts(
&HashMap::new(),
IndexKey::SplTokenMint(mint_key),
|key, _| {
found_accounts.insert(*key);
},
);
assert_eq!(found_accounts.len(), 2);
assert!(found_accounts.contains(&pubkey1));
assert!(found_accounts.contains(&pubkey2));
accounts.clean_accounts(None);
//both zero lamport and normal accounts are cleaned up
@@ -3917,9 +4016,18 @@ pub mod tests {
assert_eq!(accounts.alive_account_count_in_store(1), 0);
assert_eq!(accounts.alive_account_count_in_store(2), 1);
// Pubkey 1, a zero lamport account, should no longer exist in accounts index
// because it has been removed
// `pubkey1`, a zero lamport account, should no longer exist in accounts index
// because it has been removed by the clean
assert!(accounts.accounts_index.get(&pubkey1, None, None).is_none());
// Secondary index should have purged `pubkey1` as well
let mut found_accounts = vec![];
accounts.accounts_index.index_scan_accounts(
&HashMap::new(),
IndexKey::SplTokenMint(mint_key),
|key, _| found_accounts.push(*key),
);
assert_eq!(found_accounts, vec![pubkey2]);
}
#[test]
@@ -4505,7 +4613,8 @@ pub mod tests {
let account2 = Account::new(3, 0, &key);
db.store(2, &[(&key1, &account2)]);
db.accounts_index.handle_dead_keys(&dead_keys);
db.accounts_index
.handle_dead_keys(&dead_keys, &HashSet::new());
db.print_accounts_stats("post");
let ancestors = vec![(2, 0)].into_iter().collect();
@@ -5468,12 +5577,60 @@ pub mod tests {
lamports: 0,
};
let mut reclaims = vec![];
accounts_index.upsert(0, &key0, info0, &mut reclaims);
accounts_index.upsert(1, &key0, info1.clone(), &mut reclaims);
accounts_index.upsert(1, &key1, info1, &mut reclaims);
accounts_index.upsert(2, &key1, info2.clone(), &mut reclaims);
accounts_index.upsert(2, &key2, info2, &mut reclaims);
accounts_index.upsert(3, &key2, info3, &mut reclaims);
accounts_index.upsert(
0,
&key0,
&Pubkey::default(),
&[],
&HashSet::new(),
info0,
&mut reclaims,
);
accounts_index.upsert(
1,
&key0,
&Pubkey::default(),
&[],
&HashSet::new(),
info1.clone(),
&mut reclaims,
);
accounts_index.upsert(
1,
&key1,
&Pubkey::default(),
&[],
&HashSet::new(),
info1,
&mut reclaims,
);
accounts_index.upsert(
2,
&key1,
&Pubkey::default(),
&[],
&HashSet::new(),
info2.clone(),
&mut reclaims,
);
accounts_index.upsert(
2,
&key2,
&Pubkey::default(),
&[],
&HashSet::new(),
info2,
&mut reclaims,
);
accounts_index.upsert(
3,
&key2,
&Pubkey::default(),
&[],
&HashSet::new(),
info3,
&mut reclaims,
);
accounts_index.add_root(0);
accounts_index.add_root(1);
accounts_index.add_root(2);