Fix pubkey refcount for shrink + clean (#14987) (#15108)

(cherry picked from commit e4d0d4bfae)

Co-authored-by: carllin <wumu727@gmail.com>
This commit is contained in:
mergify[bot]
2021-02-04 22:11:57 +00:00
committed by GitHub
parent 7af7d5f22c
commit fea0bd234c
2 changed files with 240 additions and 104 deletions

View File

@ -50,7 +50,7 @@ use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::{
borrow::Cow,
boxed::Box,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
convert::{TryFrom, TryInto},
io::{Error as IOError, Result as IOResult},
ops::RangeBounds,
@ -332,6 +332,11 @@ impl AccountStorage {
self.0.get(&slot).map(|result| result.value().clone())
}
fn get_slot_storage_entries(&self, slot: Slot) -> Option<Vec<Arc<AccountStorageEntry>>> {
self.get_slot_stores(slot)
.map(|res| res.read().unwrap().values().cloned().collect())
}
fn slot_store_count(&self, slot: Slot, store_id: AppendVecId) -> Option<usize> {
self.get_account_storage_entry(slot, store_id)
.map(|store| store.count())
@ -1663,61 +1668,83 @@ impl AccountsDB {
where
I: Iterator<Item = &'a Arc<AccountStorageEntry>>,
{
struct FoundStoredAccount {
account: Account,
account_hash: Hash,
account_size: usize,
store_id: AppendVecId,
offset: usize,
write_version: u64,
}
debug!("do_shrink_slot_stores: slot: {}", slot);
let mut stored_accounts = vec![];
let mut stored_accounts: HashMap<Pubkey, FoundStoredAccount> = HashMap::new();
let mut original_bytes = 0;
for store in stores {
let mut start = 0;
original_bytes += store.total_bytes();
while let Some((account, next)) = store.accounts.get_account(start) {
stored_accounts.push((
account.meta.pubkey,
account.clone_account(),
*account.hash,
next - start,
(store.append_vec_id(), account.offset),
account.meta.write_version,
));
match stored_accounts.entry(account.meta.pubkey) {
Entry::Occupied(mut occupied_entry) => {
if account.meta.write_version > occupied_entry.get().write_version {
occupied_entry.insert(FoundStoredAccount {
account: account.clone_account(),
account_hash: *account.hash,
account_size: next - start,
store_id: store.append_vec_id(),
offset: account.offset,
write_version: account.meta.write_version,
});
}
}
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(FoundStoredAccount {
account: account.clone_account(),
account_hash: *account.hash,
account_size: next - start,
store_id: store.append_vec_id(),
offset: account.offset,
write_version: account.meta.write_version,
});
}
}
start = next;
}
}
let mut index_read_elapsed = Measure::start("index_read_elapsed");
let mut alive_total = 0;
let alive_accounts: Vec<_> = {
stored_accounts
.iter()
.filter(
|(
pubkey,
_account,
_account_hash,
_storage_size,
(store_id, offset),
_write_version,
)| {
if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None)
{
locked_entry
.slot_list()
.iter()
.any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset)
.filter(|(pubkey, stored_account)| {
let FoundStoredAccount {
account_size,
store_id,
offset,
..
} = stored_account;
if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None) {
let is_alive = locked_entry
.slot_list()
.iter()
.any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset);
if !is_alive {
// This pubkey was found in the storage, but no longer exists in the index.
// It would have had a ref to the storage from the initial store, but it will
// not exist in the re-written slot. Unref it to keep the index consistent with
// rewriting the storage entries.
locked_entry.unref()
} else {
false
alive_total += *account_size as u64;
}
},
)
is_alive
} else {
false
}
})
.collect()
};
index_read_elapsed.stop();
let alive_total: u64 = alive_accounts
.iter()
.map(
|(_pubkey, _account, _account_hash, account_size, _location, _write_version)| {
*account_size as u64
},
)
.sum();
let aligned_total: u64 = self.page_align(alive_total);
let total_starting_accounts = stored_accounts.len();
@ -1743,11 +1770,10 @@ impl AccountsDB {
let mut hashes = Vec::with_capacity(alive_accounts.len());
let mut write_versions = Vec::with_capacity(alive_accounts.len());
for (pubkey, account, account_hash, _size, _location, write_version) in &alive_accounts
{
accounts.push((pubkey, account));
hashes.push(*account_hash);
write_versions.push(*write_version);
for (pubkey, alive_account) in alive_accounts {
accounts.push((pubkey, &alive_account.account));
hashes.push(alive_account.account_hash);
write_versions.push(alive_account.write_version);
}
start.stop();
find_alive_elapsed = start.as_us();
@ -2095,8 +2121,7 @@ impl AccountsDB {
// the cache *after* we've finished flushing in `flush_slot_cache`.
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_stores(slot)
.map(|res| res.read().unwrap().values().cloned().collect())
.get_slot_storage_entries(slot)
.unwrap_or_default();
self.thread_pool.install(|| {
storage_maps
@ -4494,8 +4519,7 @@ impl AccountsDB {
}
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_stores(*slot)
.map(|res| res.read().unwrap().values().cloned().collect())
.get_slot_storage_entries(*slot)
.unwrap_or_default();
let num_accounts = storage_maps
.iter()
@ -4681,9 +4705,17 @@ impl AccountsDB {
// Requires all stores in the slot to be re-written otherwise the accounts_index
// store ref count could become incorrect.
fn do_shrink_slot_v1(&self, slot: Slot, forced: bool) -> usize {
struct FoundStoredAccount {
account: Account,
account_hash: Hash,
account_size: usize,
store_id: AppendVecId,
offset: usize,
write_version: u64,
}
trace!("shrink_stale_slot: slot: {}", slot);
let mut stored_accounts = vec![];
let mut stored_accounts: HashMap<Pubkey, FoundStoredAccount> = HashMap::new();
let mut storage_read_elapsed = Measure::start("storage_read_elapsed");
{
if let Some(stores_lock) = self.storage.get_slot_stores(slot) {
@ -4723,14 +4755,30 @@ impl AccountsDB {
for store in stores.values() {
let mut start = 0;
while let Some((account, next)) = store.accounts.get_account(start) {
stored_accounts.push((
account.meta.pubkey,
account.clone_account(),
*account.hash,
next - start,
(store.append_vec_id(), account.offset),
account.meta.write_version,
));
match stored_accounts.entry(account.meta.pubkey) {
Entry::Occupied(mut occupied_entry) => {
if account.meta.write_version > occupied_entry.get().write_version {
occupied_entry.insert(FoundStoredAccount {
account: account.clone_account(),
account_hash: *account.hash,
account_size: next - start,
store_id: store.append_vec_id(),
offset: account.offset,
write_version: account.meta.write_version,
});
}
}
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(FoundStoredAccount {
account: account.clone_account(),
account_hash: *account.hash,
account_size: next - start,
store_id: store.append_vec_id(),
offset: account.offset,
write_version: account.meta.write_version,
});
}
}
start = next;
}
}
@ -4739,48 +4787,46 @@ impl AccountsDB {
storage_read_elapsed.stop();
let mut index_read_elapsed = Measure::start("index_read_elapsed");
let mut alive_total = 0;
let alive_accounts: Vec<_> = {
stored_accounts
.iter()
.filter(
|(
pubkey,
_account,
_account_hash,
_storage_size,
(store_id, offset),
_write_version,
)| {
if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None)
{
locked_entry
.slot_list()
.iter()
.any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset)
.filter(|(pubkey, stored_account)| {
let FoundStoredAccount {
account_size,
store_id,
offset,
..
} = stored_account;
if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None) {
let is_alive = locked_entry
.slot_list()
.iter()
.any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset);
if !is_alive {
// This pubkey was found in the storage, but no longer exists in the index.
// It would have had a ref to the storage from the initial store, but it will
// not exist in the re-written slot. Unref it to keep the index consistent with
// rewriting the storage entries.
locked_entry.unref()
} else {
false
alive_total += *account_size as u64;
}
},
)
is_alive
} else {
false
}
})
.collect()
};
index_read_elapsed.stop();
let alive_total: u64 = alive_accounts
.iter()
.map(
|(_pubkey, _account, _account_hash, account_size, _location, _write_verion)| {
*account_size as u64
},
)
.sum();
let aligned_total: u64 = self.page_align(alive_total);
let alive_accounts_len = alive_accounts.len();
debug!(
"shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})",
slot,
stored_accounts.len(),
alive_accounts.len(),
alive_accounts_len,
alive_total,
aligned_total
);
@ -4793,15 +4839,14 @@ impl AccountsDB {
let mut store_accounts_timing = StoreAccountsTiming::default();
if aligned_total > 0 {
let mut start = Measure::start("find_alive_elapsed");
let mut accounts = Vec::with_capacity(alive_accounts.len());
let mut hashes = Vec::with_capacity(alive_accounts.len());
let mut write_versions = Vec::with_capacity(alive_accounts.len());
let mut accounts = Vec::with_capacity(alive_accounts_len);
let mut hashes = Vec::with_capacity(alive_accounts_len);
let mut write_versions = Vec::with_capacity(alive_accounts_len);
for (pubkey, account, account_hash, _size, _location, write_version) in &alive_accounts
{
accounts.push((pubkey, account));
hashes.push(*account_hash);
write_versions.push(*write_version);
for (pubkey, alive_account) in alive_accounts {
accounts.push((pubkey, &alive_account.account));
hashes.push(alive_account.account_hash);
write_versions.push(alive_account.write_version);
}
start.stop();
find_alive_elapsed = start.as_us();
@ -4910,7 +4955,7 @@ impl AccountsDB {
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats.report();
alive_accounts.len()
alive_accounts_len
}
fn do_reset_uncleaned_roots_v1(
@ -8410,11 +8455,9 @@ pub mod tests {
}
fn slot_stores(db: &AccountsDB, slot: Slot) -> Vec<Arc<AccountStorageEntry>> {
if let Some(x) = db.storage.get_slot_stores(slot) {
x.read().unwrap().values().cloned().collect()
} else {
vec![]
}
db.storage
.get_slot_storage_entries(slot)
.unwrap_or_default()
}
#[test]
@ -8610,8 +8653,7 @@ pub mod tests {
let mut storage_maps: Vec<Arc<AccountStorageEntry>> = accounts_db
.storage
.get_slot_stores(slot)
.map(|res| res.read().unwrap().values().cloned().collect())
.get_slot_storage_entries(slot)
.unwrap_or_default();
// Flushing cache should only create one storage entry
@ -9024,6 +9066,90 @@ pub mod tests {
run_flush_rooted_accounts_cache(false);
}
fn run_test_shrink_unref(do_intra_cache_clean: bool) {
// Enable caching so that we use the straightforward implementation
// of shrink that will shrink all candidate slots
let caching_enabled = true;
let db = AccountsDB::new_with_config(
Vec::new(),
&ClusterType::Development,
HashSet::default(),
caching_enabled,
);
let account_key1 = Pubkey::new_unique();
let account_key2 = Pubkey::new_unique();
let account1 = Account::new(1, 0, &Account::default().owner);
// Store into slot 0
db.store_cached(0, &[(&account_key1, &account1)]);
db.store_cached(0, &[(&account_key2, &account1)]);
db.add_root(0);
if !do_intra_cache_clean {
// If we don't want the cache doing purges before flush,
// then we cannot flush multiple roots at once, otherwise the later
// roots will clean the earlier roots before they are stored.
// Thus flush the roots individually
db.flush_accounts_cache(true, None);
// Add an additional ref within the same slot to pubkey 1
db.store_uncached(0, &[(&account_key1, &account1)]);
}
// Make account_key1 in slot 0 outdated by updating in rooted slot 1
db.store_cached(1, &[(&account_key1, &account1)]);
db.add_root(1);
// Flushes all roots
db.flush_accounts_cache(true, None);
db.get_accounts_delta_hash(0);
db.get_accounts_delta_hash(1);
// Clean to remove outdated entry from slot 0
db.clean_accounts(Some(1));
// Shrink Slot 0
let mut slot0_stores = db.storage.get_slot_storage_entries(0).unwrap();
assert_eq!(slot0_stores.len(), 1);
let slot0_store = slot0_stores.pop().unwrap();
{
let mut shrink_candidate_slots = db.shrink_candidate_slots.lock().unwrap();
shrink_candidate_slots
.entry(0)
.or_default()
.insert(slot0_store.append_vec_id(), slot0_store);
}
db.shrink_candidate_slots();
// Make slot 0 dead by updating the remaining key
db.store_cached(2, &[(&account_key2, &account1)]);
db.add_root(2);
// Flushes all roots
db.flush_accounts_cache(true, None);
// Should be one store before clean for slot 0
assert_eq!(db.storage.get_slot_storage_entries(0).unwrap().len(), 1);
db.get_accounts_delta_hash(2);
db.clean_accounts(Some(2));
// No stores should exist for slot 0 after clean
assert!(db.storage.get_slot_storage_entries(0).is_none());
// Ref count for `account_key1` (account removed earlier by shrink)
// should be 1, since it was only stored in slot 0 and 1, and slot 0
// is now dead
assert_eq!(db.accounts_index.ref_count_from_storage(&account_key1), 1);
}
#[test]
fn test_shrink_unref() {
run_test_shrink_unref(false)
}
#[test]
fn test_shrink_unref_with_intra_slot_cleaning() {
run_test_shrink_unref(true)
}
#[test]
fn test_partial_clean() {
solana_logger::setup();

View File

@ -101,6 +101,10 @@ impl<T: Clone> ReadAccountMapEntry<T> {
pub fn ref_count(&self) -> &AtomicU64 {
&self.borrow_owned_entry_contents().ref_count
}
pub fn unref(&self) {
self.ref_count().fetch_sub(1, Ordering::Relaxed);
}
}
#[self_referencing]
@ -110,7 +114,7 @@ pub struct WriteAccountMapEntry<T: 'static> {
slot_list_guard: RwLockWriteGuard<'this, SlotList<T>>,
}
impl<T: 'static + Clone> WriteAccountMapEntry<T> {
impl<T: 'static + Clone + IsCached> WriteAccountMapEntry<T> {
pub fn from_account_map_entry(account_map_entry: AccountMapEntry<T>) -> Self {
WriteAccountMapEntryBuilder {
owned_entry: account_map_entry,
@ -147,12 +151,18 @@ impl<T: 'static + Clone> WriteAccountMapEntry<T> {
.collect();
assert!(same_slot_previous_updates.len() <= 1);
if let Some((list_index, (s, previous_update_value))) = same_slot_previous_updates.pop() {
let is_flush_from_cache =
previous_update_value.is_cached() && !account_info.is_cached();
reclaims.push((*s, previous_update_value.clone()));
self.slot_list_mut(|list| list.remove(list_index));
} else {
// Only increment ref count if the account was not prevously updated in this slot
if is_flush_from_cache {
self.ref_count().fetch_add(1, Ordering::Relaxed);
}
} else if !account_info.is_cached() {
// If it's the first non-cache insert, also bump the stored ref count
self.ref_count().fetch_add(1, Ordering::Relaxed);
}
self.slot_list_mut(|list| list.push((slot, account_info)));
}
}
@ -915,7 +925,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
pub fn unref_from_storage(&self, pubkey: &Pubkey) {
if let Some(locked_entry) = self.get_account_read_entry(pubkey) {
locked_entry.ref_count().fetch_sub(1, Ordering::Relaxed);
locked_entry.unref();
}
}